分享一个自用的MongoDB Class类,使用的框架是EasySwoole,可以自行修改。
<?php
namespace App\Utility;
use MongoDB\Client;
use MongoDB\Collection;
use MongoDB\BSON\Regex;
use MongoDB\Driver\Manager;
use MongoDB\Driver\BulkWrite;
use MongoDB\Driver\Query;
use MongoDB\Driver\Command;
//自定义错误日志
use EasySwooleLib\Logger\Log as AppLog;
final class MongoDbHelper
{
private $manager = null;
private $dbname = null;
private $config;
private static $instances = [];
public function __construct($configKey = 'default') {
$config = config('mongo');
// 如果配置中存在指定的键,则使用它,否则使用 'default' 或整个配置
if (isset($config[$configKey])) {
$this->config = $config[$configKey];
} else {
$this->config = $config['default'] ?? $config;
}
$this->manager = new Manager("mongodb://".$this->config["user"].":".$this->config["password"]."@".$this->config["host"].":".$this->config["port"]."/".$this->config["dbname"]);
$this->dbname = $this->config["dbname"];
}
//查询单条数据返回
public function find(string $collection,array $filter, array $options = []){
$option = ['limit' => 1,];
$options = array_merge($option, $options);
$query = new Query($filter,$options);
try {
$cursor = $this->manager->executeQuery($this->dbname.'.'.$collection, $query);
$data = $cursor->toArray();
if(!empty($data)){
$data=$this::Json($data);
return $data[0];
}
return ;
} catch (\MongoDB\Driver\Exception\Exception $e) {
AppLog::error("MongoDB 查询错误: MongoDbHelper::find() " . $e->getMessage(), 'error');
return ;
}
}
//查询多条数据返回
public function findMany(string $collection,array $filter, array $options = []){
$query = new Query($filter,$options);
try {
$cursor = $this->manager->executeQuery($this->dbname.'.'.$collection, $query);
$data = $cursor->toArray();
if(!empty($data)){
$data=$this::Json($data);
return $data;
}
return ;
} catch (\Exception $e) {
AppLog::error("MongoDB 查询错误: MongoDbHelper::findMany() " . $e->getMessage(), 'error');
return ;
}
}
//只插入一条数据 重复就不再插入
public function insert(string $collection, array $data) {
$bulk = new BulkWrite;
$bulk->insert($data);
try {
$result = $this->manager->executeBulkWrite($this->dbname.'.'.$collection, $bulk);
return $result->getInsertedCount();
} catch (\MongoDB\Driver\Exception\BulkWriteException $e) {
// 检查是否是重复键错误(代码11000)
if ($e->getCode() == 11000) {
// 这是预期的行为 - 记录已存在,返回0表示没有新插入
# AppLog::info("MongoDB 重复键,跳过插入: " . $e->getMessage(), 'info');
return false;
} else {
// 其他类型的错误,记录并返回false
AppLog::error("MongoDB 插入错误: MongoDbHelper::insert() " . $e->getMessage(), 'error');
return false;
}
} catch (\MongoDB\Driver\Exception\Exception $e) {
// 捕获其他类型的MongoDB异常
AppLog::error("MongoDB 插入错误: MongoDbHelper::insert() " . $e->getMessage(), 'error');
return false;
}
}
//使用Client方法 插入多条数据,提升效率
public function insertMany(string $collection,array $data, $options = ['ordered' => false]){
$client = new Client("mongodb://".$this->config["user"].":".$this->config["password"]."@".$this->config["host"].":".$this->config["port"]."/".$this->config["dbname"]);
$database = $client->selectDatabase($this->dbname);
$Collection = $database->selectCollection($collection);
try {
$result = $Collection->insertMany($data, $options);
return $result->getInsertedCount();
} catch (\MongoDB\Driver\Exception\BulkWriteException $e) {
AppLog::error("MongoDB 插入错误: MongoDbHelper::insertMany() ". $e->getMessage(), 'error');
//使用 getWriteResult 获取成功插入的数量
return $e->getWriteResult()->getInsertedCount();
}
}
//根据条件 更新一组/多组数据 支持options可以自定义设置
//参数为 查询表 查询sql 需要更新的数据 更新的参数 不存在是否插入
//$options = [
// 'upsert' => true,
// 'multi' => true
// ];
//upsert(布尔型):如果不存在匹配文档是否插入新文档,默认false
//multi(布尔型):是否更新多条匹配文档,默认false
public function update(string $collection, array $filter, array $update, array $options = []){
$defaultOptions = [
'multi' => false,
'upsert' => true
];
$options = array_merge($defaultOptions, $options);
$bulk = new BulkWrite;
$bulk->update(
$filter,
['$set' => $update],
$options
);
try {
$result = $this->manager->executeBulkWrite($this->dbname.'.'.$collection, $bulk);
return $result->getModifiedCount();
} catch (\MongoDB\Driver\Exception\Exception $e) {
AppLog::error("MongoDB 更新错误: MongoDbHelper::update() ". $e->getMessage(), 'error');
return false;
}
}
/* _updateOne("Members",["mail" => $login["mail"] ],['$inc' => ['balance' => -$totalPrice]]);
* 支持变量变化 比如某个数值的增加和减少
* 不需要先去查询 直接在当前值上面计算
*/
public function _update(string $collection, array $filter, array $update, array $options = []){
//构造一个 Collection
$Collection = new Collection($this->manager, $this->dbname, $collection);
try {
$result = $Collection->updateOne($filter, $update, $options);
return $result->getModifiedCount();
} catch (\MongoDB\Driver\Exception\Exception $e) {
AppLog::error("MongoDB 更新错误: MongoDbHelper::_update() ". $e->getMessage(), 'error');
return -1;
}
}
/*
* 只删除一条数据
*/
public function delete(string $collection, array $filter, array $options = []){
$defaultOptions = ['limit' => intval(1)];
$options = array_merge($defaultOptions, $options);
$bulk = new BulkWrite;
$bulk->delete($filter,$options);
try {
$result = $this->manager->executeBulkWrite($this->dbname.'.'.$collection, $bulk);
return $result->getDeletedCount();
} catch (\MongoDB\Driver\Exception\Exception $e) {
AppLog::error("MongoDB 删除错误: MongoDbHelper::delete() ". $e->getMessage(), 'error');
return -1;
}
}
/* 删除全部数据
* mongo不支持删除部分数据
*/
public function deleteMany(string $collection, array $filter, array $options = []){
$defaultOptions = ['limit' => intval(0)];
$options = array_merge($defaultOptions, $options);
$bulk = new BulkWrite;
$bulk->delete($filter,$options);
try {
$result = $this->manager->executeBulkWrite($this->dbname.'.'.$collection, $bulk);
return $result->getDeletedCount();
} catch (\MongoDB\Driver\Exception\Exception $e) {
AppLog::error("MongoDB 删除错误: MongoDbHelper::deleteMany() ". $e->getMessage(), 'error');
return false;
}
}
/* 更新一个数据 并返回更新后的数据
*
*/
public function findAndUpdate(string $collection, array $filter, array $update, array $options = []) {
$defaultOptions = [
'returnDocument' => \MongoDB\Operation\FindOneAndUpdate::RETURN_DOCUMENT_AFTER,
'upsert' => true
];
$options = array_merge($defaultOptions, $options);
$Collection = new Collection($this->manager, $this->dbname, $collection);
try {
$result = $Collection->findOneAndUpdate($filter, ['$set' => $update], $options);
return $this::Json($result);
} catch (\Exception $e) {
AppLog::error("MongoDB 更新错误: MongoDbHelper::findOneAndUpdate() " . $e->getMessage(), 'error');
return false;
}
}
/*** 获取自增长id
**** 默认已经指定了集合 autoId
**** 返回的就是新的值 可以直接使用
***/
public function autoId(string $field){
$update = array('$inc'=>array("id"=>1));
$filter = array('field'=>$field);
$command = [
'findandmodify' => "autoId",
'update' => $update,
'query' => $filter,
'new' => true,
'upsert' => true
];
$cmd = new Command($command);
try {
$cursor = $this->manager->executeCommand($this->dbname, $cmd);
$data = $cursor->toArray();
if(isset($data[0]->ok) && $data[0]->ok ){
return $data[0]->value->id;
}
} catch (\MongoDB\Driver\Exception\Exception $e) {
AppLog::error("MongoDB 自增长Id: MongoDbHelper::autoId()-> ". $field . " ". $e->getMessage(), 'error');
}
return false;
}
public function count(string $collection,array $filter){
$Collection = new Collection($this->manager, $this->dbname, $collection);
try {
$result = $Collection->aggregate([
['$match' => $filter],
[
'$count' => 'total'
]
]);
$data = $result->toArray();
return $data[0]['total'] ?? 0;
} catch (\MongoDB\Driver\Exception\Exception $e) {
AppLog::error("MongoDB 统计数量: MongoDbHelper::count()-> ". json_encode($filter) . " " . $e->getMessage(), 'error');
return false;
}
}
//按照条件 去重复查询
public function distinct(string $collection,string $filed,array $filter){
$Collection = new Collection($this->manager, $this->dbname, $collection);
try {
$result = $Collection->distinct($filed, $filter);
return $result;
} catch (\MongoDB\Driver\Exception\Exception $e) {
AppLog::error("MongoDB 去重查询: MongoDbHelper::distinct()-> ". json_encode($filter) . " " . $e->getMessage(), 'error');
return false;
}
}
//按照条件 聚合查询
function pipe($collection,$filter,$pipsql){
$Collection = new Collection($this->manager, $this->dbname, $collection);
$pipeline = [
['$match' => $filter],
['$group' => $pipsql]
];
try {
$result = $Collection->aggregate($pipeline);
$data = $result->toArray();
if(!empty($data)){
$data=$this::Json($data);
return $data[0];
}
return false;
} catch (\MongoDB\Driver\Exception\Exception $e) {
AppLog::error("MongoDB 聚合查询: MongoDbHelper::pipe()-> ". json_encode($filter) . " PIPSQL -> " .json_encode($pipsql) . " " .$e->getMessage(), 'error');
return false;
}
}
public function push(string $collection, array $filter, array $update, string $field){
$options = [
'multi' => false,
'upsert' => false
];
$pushdate = [
'$push' => [
$field => $update
]
];
$bulk = new BulkWrite;
$bulk->update($filter, $pushdate, $options);
try {
$result = $this->manager->executeBulkWrite($this->dbname.'.'.$collection, $bulk);
return $result->getModifiedCount();
} catch (\MongoDB\Driver\Exception\Exception $e) {
AppLog::error("MongoDB 更新错误: MongoDbHelper::push() -> ". json_encode($filter) . " UPDATE -> " .json_encode($update) . " " .$e->getMessage(), 'error');
return false;
}
}
//判断集合是否存在 返回 true false
/*
列出所有集合名字
如果集合不存在,此时返回假
*/
public function exists(string $collection){
$cmd = new Command(["listCollections" => 1]);
$cursor = $this->manager->executeCommand($this->dbname, $cmd);
$data = $cursor->toArray();
$isExist = false;
foreach ($data as $row) {
if ($row->name == $collection) {
$isExist = true;
break;
}
}
return $isExist;
}
/**
* 创建单个或多个索引
* @param string $collection 集合名称
* @param array $indexes 索引配置数组
* [
* // 单个索引
* ['keys' => ['field1' => 1], 'options' => ['name' => 'index1', 'unique' => true]],
* // 复合索引
* ['keys' => ['field1' => 1, 'field2' => -1], 'options' => ['name' => 'index2']],
* // 文本索引
* ['keys' => ['content' => 'text'], 'options' => ['name' => 'text_index']],
* // 地理空间索引
* ['keys' => ['location' => '2dsphere'], 'options' => ['name' => 'geo_index']]
* ]
* @return bool|array 返回创建结果
*/
public function createIndexes(string $collection, array $indexes) {
$indexesToCreate = [];
foreach ($indexes as $index) {
if (!isset($index['keys'])) {
continue;
}
$indexConfig = [
'key' => $index['keys'],
'name' => $index['options']['name'] ?? null,
'unique' => $index['options']['unique'] ?? false,
];
// 添加可选的索引参数
if (isset($index['options']['sparse'])) {
$indexConfig['sparse'] = $index['options']['sparse'];
}
if (isset($index['options']['expireAfterSeconds'])) {
$indexConfig['expireAfterSeconds'] = $index['options']['expireAfterSeconds'];
}
if (isset($index['options']['partialFilterExpression'])) {
$indexConfig['partialFilterExpression'] = $index['options']['partialFilterExpression'];
}
if (isset($index['options']['collation'])) {
$indexConfig['collation'] = $index['options']['collation'];
}
$indexesToCreate[] = $indexConfig;
}
$command = [
'createIndexes' => $collection,
'indexes' => $indexesToCreate
];
try {
$result = $this->command($command);
return $this::Json($result->toArray());
} catch (\Exception $e) {
AppLog::error("MongoDB 创建索引错误: MongoDbHelper::createIndexes() " . $e->getMessage(), 'error');
return false;
}
}
/**
* 删除指定索引
* @param string $collection 集合名称
* @param string|array $indexName 索引名称或索引键数组
* @return bool
*/
public function dropIndex(string $collection, $indexName) {
$command = [
'dropIndexes' => $collection,
'index' => is_array($indexName) ? $indexName : $indexName
];
try {
$result = $this->command($command);
return true;
} catch (\Exception $e) {
AppLog::error("MongoDB 删除索引错误: MongoDbHelper::dropIndex() " . $e->getMessage(), 'error');
return false;
}
}
/**
* 删除集合所有索引
* @param string $collection 集合名称
* @return bool
*/
public function dropAllIndexes(string $collection) {
$command = [
'dropIndexes' => $collection,
'index' => '*'
];
try {
$result = $this->command($command);
return true;
} catch (\Exception $e) {
AppLog::error("MongoDB 删除所有索引错误: MongoDbHelper::dropAllIndexes() " . $e->getMessage(), 'error');
return false;
}
}
/**
* 获取集合的所有索引
* @param string $collection 集合名称
* @return array|bool
*/
public function getIndexes(string $collection) {
$command = [
'listIndexes' => $collection
];
try {
$result = $this->command($command);
return $this::Json($result->toArray());
} catch (\Exception $e) {
AppLog::error("MongoDB 获取索引错误: MongoDbHelper::getIndexes() " . $e->getMessage(), 'error');
return false;
}
}
/**
* 执行MongoDB命令
* @param array $param
* @return \MongoDB\Driver\Cursor
*/
public function command(array $param) {
$cmd = new Command($param);
try {
return $this->manager->executeCommand($this->dbname, $cmd);
} catch (\MongoDB\Driver\Exception\Exception $e) {
AppLog::error("MongoDB 执行MongoDB命令: MongoDbHelper::command()-> ". json_encode($param) . " " . $e->getMessage(), 'error');
return false;
}
}
//JSON 格式化BSON数据 返回为数组格式
public static function Json($bson){
$json= json_encode($bson);
$bejson=json_decode($json,true);
return $bejson;
}
//BSON Array数组格式化JSON数据,直接插入数据使用
public static function Bson($array){
$data=json_encode($array);
$bson = MongoDB\BSON\fromJSON($data);
$redata = MongoDB\BSON\toPHP($bson);
return $redata;
}
// 静态方法用于获取类的实例
public static function getInstance($configKey = 'default') {
if (!isset(self::$instances[$configKey])) {
self::$instances[$configKey] = new self($configKey);
}
return self::$instances[$configKey];
}
}