MENU

分享一个自用的MongoDB Class类

April 20, 2025 • 已被 40 位童鞋围观过 • 代码分享

分享一个自用的MongoDB Class类,使用的框架是EasySwoole,可以自行修改。

<?php
namespace App\Utility;

use MongoDB\Client;
use MongoDB\Collection;
use MongoDB\BSON\Regex;
use MongoDB\Driver\Manager;
use MongoDB\Driver\BulkWrite;
use MongoDB\Driver\Query;
use MongoDB\Driver\Command;
//自定义错误日志
use EasySwooleLib\Logger\Log as AppLog;

final class  MongoDbHelper
{
  
    private $manager          = null;
    private $dbname            = null;
    private $config;
    private static $instances = [];

    public function __construct($configKey = 'default') {
        $config = config('mongo');
    
        // 如果配置中存在指定的键,则使用它,否则使用 'default' 或整个配置
        if (isset($config[$configKey])) {
            $this->config = $config[$configKey];
        } else {
            $this->config = $config['default'] ?? $config;
        }
    
        $this->manager = new Manager("mongodb://".$this->config["user"].":".$this->config["password"]."@".$this->config["host"].":".$this->config["port"]."/".$this->config["dbname"]);
        $this->dbname = $this->config["dbname"];
    }

    //查询单条数据返回
    public function find(string $collection,array $filter, array $options = []){ 
        $option = ['limit' => 1,];
        $options = array_merge($option, $options);
        $query = new Query($filter,$options);
        try {
            $cursor = $this->manager->executeQuery($this->dbname.'.'.$collection, $query);
            $data = $cursor->toArray();
            if(!empty($data)){
                $data=$this::Json($data);
                return $data[0];
            }
            return ;
        } catch (\MongoDB\Driver\Exception\Exception $e) {
            AppLog::error("MongoDB 查询错误:  MongoDbHelper::find() " . $e->getMessage(), 'error');
            return ;
        }
    }

    //查询多条数据返回
    public function findMany(string $collection,array $filter, array $options = []){ 
        $query = new Query($filter,$options);
        try {
            $cursor = $this->manager->executeQuery($this->dbname.'.'.$collection, $query);
            $data = $cursor->toArray();
            if(!empty($data)){
                $data=$this::Json($data);
                return $data;
            }
            return ;
        } catch (\Exception $e) {
            AppLog::error("MongoDB 查询错误:  MongoDbHelper::findMany() " . $e->getMessage(), 'error');
            return ;
        }
    }

    //只插入一条数据 重复就不再插入
    public function insert(string $collection, array $data) { 
        $bulk = new BulkWrite;
        $bulk->insert($data);
        try {
            $result = $this->manager->executeBulkWrite($this->dbname.'.'.$collection, $bulk);
            return $result->getInsertedCount();
        } catch (\MongoDB\Driver\Exception\BulkWriteException $e) {
            // 检查是否是重复键错误(代码11000)
            if ($e->getCode() == 11000) {
                // 这是预期的行为 - 记录已存在,返回0表示没有新插入
                # AppLog::info("MongoDB 重复键,跳过插入: " . $e->getMessage(), 'info');
                return false;
            } else {
                // 其他类型的错误,记录并返回false
                AppLog::error("MongoDB 插入错误: MongoDbHelper::insert() " . $e->getMessage(), 'error');
                return false;
            }
        } catch (\MongoDB\Driver\Exception\Exception $e) {
            // 捕获其他类型的MongoDB异常
            AppLog::error("MongoDB 插入错误: MongoDbHelper::insert() " . $e->getMessage(), 'error');
            return false;
        }
    }

    //使用Client方法 插入多条数据,提升效率
    public function insertMany(string $collection,array $data, $options = ['ordered' => false]){ 
        $client = new Client("mongodb://".$this->config["user"].":".$this->config["password"]."@".$this->config["host"].":".$this->config["port"]."/".$this->config["dbname"]);
        $database = $client->selectDatabase($this->dbname);
        $Collection = $database->selectCollection($collection);
        try {
            $result = $Collection->insertMany($data, $options);
            return $result->getInsertedCount();
        } catch (\MongoDB\Driver\Exception\BulkWriteException $e) {
            AppLog::error("MongoDB 插入错误:  MongoDbHelper::insertMany() ". $e->getMessage(), 'error');
            //使用 getWriteResult 获取成功插入的数量
            return $e->getWriteResult()->getInsertedCount();
        }
    }

    //根据条件 更新一组/多组数据 支持options可以自定义设置
    //参数为 查询表 查询sql 需要更新的数据 更新的参数 不存在是否插入
    //$options = [
    //    'upsert' => true,
    //    'multi' => true
    // ];
    //upsert(布尔型):如果不存在匹配文档是否插入新文档,默认false
    //multi(布尔型):是否更新多条匹配文档,默认false
    public function update(string $collection, array $filter, array $update, array $options = []){ 
        $defaultOptions = [
            'multi' => false,
            'upsert' => true
        ];
        $options = array_merge($defaultOptions, $options);
        $bulk = new BulkWrite;
        $bulk->update(
            $filter,
            ['$set' => $update],
            $options
        );
    
        try {
            $result = $this->manager->executeBulkWrite($this->dbname.'.'.$collection, $bulk);
            return $result->getModifiedCount();
        } catch (\MongoDB\Driver\Exception\Exception $e) {
            AppLog::error("MongoDB 更新错误:  MongoDbHelper::update() ". $e->getMessage(), 'error');
            return false;
        }
    }
  
  
  
    /* _updateOne("Members",["mail" => $login["mail"] ],['$inc' => ['balance' => -$totalPrice]]);
    * 支持变量变化 比如某个数值的增加和减少
    * 不需要先去查询 直接在当前值上面计算
    */
    public function _update(string $collection, array $filter, array $update, array $options = []){ 
        //构造一个 Collection 
        $Collection = new Collection($this->manager, $this->dbname, $collection);
    
        try {
            $result = $Collection->updateOne($filter, $update, $options);
            return $result->getModifiedCount(); 
        } catch (\MongoDB\Driver\Exception\Exception $e) {
            AppLog::error("MongoDB 更新错误:  MongoDbHelper::_update() ". $e->getMessage(), 'error');
            return -1;
        }
    }
    /* 
    *  只删除一条数据
    */
    public function delete(string $collection, array $filter, array $options = []){ 
        $defaultOptions =   ['limit' => intval(1)];
        $options = array_merge($defaultOptions, $options);
        $bulk = new BulkWrite;
        $bulk->delete($filter,$options);
        try {
            $result = $this->manager->executeBulkWrite($this->dbname.'.'.$collection, $bulk);
            return $result->getDeletedCount();
        } catch (\MongoDB\Driver\Exception\Exception $e) {
            AppLog::error("MongoDB 删除错误:  MongoDbHelper::delete() ". $e->getMessage(), 'error');
            return -1;
        }
    }

    /* 删除全部数据
    *  mongo不支持删除部分数据
    */
    public function deleteMany(string $collection, array $filter, array $options = []){ 
        $defaultOptions =   ['limit' => intval(0)];
        $options = array_merge($defaultOptions, $options);
        $bulk = new BulkWrite;
        $bulk->delete($filter,$options);
        try {
            $result = $this->manager->executeBulkWrite($this->dbname.'.'.$collection, $bulk);
            return $result->getDeletedCount();
        } catch (\MongoDB\Driver\Exception\Exception $e) {
            AppLog::error("MongoDB 删除错误:  MongoDbHelper::deleteMany() ". $e->getMessage(), 'error');
            return false;
        }
    }

    /* 更新一个数据 并返回更新后的数据
    *
    */
    public function findAndUpdate(string $collection, array $filter, array $update, array $options = []) {
        $defaultOptions = [
            'returnDocument' => \MongoDB\Operation\FindOneAndUpdate::RETURN_DOCUMENT_AFTER,
            'upsert' => true
        ];
        $options = array_merge($defaultOptions, $options);
    
        $Collection = new Collection($this->manager, $this->dbname, $collection);
        try {
            $result = $Collection->findOneAndUpdate($filter, ['$set' => $update], $options);
            return $this::Json($result);
        } catch (\Exception $e) {
            AppLog::error("MongoDB 更新错误: MongoDbHelper::findOneAndUpdate() " . $e->getMessage(), 'error');
            return false;
        }
    }



    /*** 获取自增长id
    **** 默认已经指定了集合 autoId
    **** 返回的就是新的值 可以直接使用
    ***/ 
    public function autoId(string $field){
        $update = array('$inc'=>array("id"=>1));
        $filter = array('field'=>$field);
        $command = [
            'findandmodify' =>  "autoId",
            'update'        =>  $update,
            'query'         =>  $filter, 
            'new'           =>  true, 
            'upsert'        =>  true
        ];
        $cmd = new Command($command);
        try {
            $cursor = $this->manager->executeCommand($this->dbname, $cmd);
            $data = $cursor->toArray();
            if(isset($data[0]->ok) && $data[0]->ok ){
                return $data[0]->value->id;
            }
        } catch (\MongoDB\Driver\Exception\Exception $e) {
            AppLog::error("MongoDB 自增长Id:  MongoDbHelper::autoId()-> ". $field . " ". $e->getMessage(), 'error');
        }
        return false;
    }
  

    public function count(string $collection,array $filter){ 
        $Collection = new Collection($this->manager, $this->dbname, $collection);
        try {
            $result = $Collection->aggregate([
                ['$match' => $filter],
                [
                    '$count' => 'total' 
                ]
            ]);
            $data = $result->toArray();
            return $data[0]['total'] ?? 0;
        } catch (\MongoDB\Driver\Exception\Exception $e) {
            AppLog::error("MongoDB 统计数量:  MongoDbHelper::count()-> ". json_encode($filter) . " " . $e->getMessage(), 'error');
            return false;
        }
    }

    //按照条件 去重复查询
    public function distinct(string $collection,string $filed,array $filter){ 
        $Collection = new Collection($this->manager, $this->dbname, $collection);
        try {
            $result = $Collection->distinct($filed, $filter);
            return $result;
        } catch (\MongoDB\Driver\Exception\Exception $e) {
            AppLog::error("MongoDB 去重查询:  MongoDbHelper::distinct()-> ". json_encode($filter) . " " . $e->getMessage(), 'error');
            return false;
        }
    }
  
    //按照条件 聚合查询
    function pipe($collection,$filter,$pipsql){

        $Collection = new Collection($this->manager, $this->dbname, $collection);
        $pipeline = [
            ['$match' => $filter],
            ['$group' => $pipsql]
        ];

        try {
            $result = $Collection->aggregate($pipeline);
            $data = $result->toArray();
            if(!empty($data)){
                $data=$this::Json($data);
                return $data[0];
            }
            return false;
        
        } catch (\MongoDB\Driver\Exception\Exception $e) {
            AppLog::error("MongoDB 聚合查询:  MongoDbHelper::pipe()-> ". json_encode($filter) . " PIPSQL -> " .json_encode($pipsql) . " " .$e->getMessage(), 'error');
            return false;
        }
    }
  
    public function push(string $collection, array $filter, array $update, string $field){ 
        $options = [
            'multi' => false,
            'upsert' => false
        ];

        $pushdate = [
            '$push' => [
                $field => $update
            ]
        ];

        $bulk = new BulkWrite;
        $bulk->update($filter, $pushdate, $options);
        try {
            $result = $this->manager->executeBulkWrite($this->dbname.'.'.$collection, $bulk);
            return $result->getModifiedCount();
        } catch (\MongoDB\Driver\Exception\Exception $e) {
            AppLog::error("MongoDB 更新错误:  MongoDbHelper::push() -> ". json_encode($filter) . " UPDATE -> " .json_encode($update) . " " .$e->getMessage(), 'error');
            return false;
        }
    }
  
  
    //判断集合是否存在 返回 true false
    /*
        列出所有集合名字
        如果集合不存在,此时返回假
    */
    public function exists(string $collection){ 
        $cmd = new Command(["listCollections" => 1]);  
        $cursor = $this->manager->executeCommand($this->dbname, $cmd);
        $data = $cursor->toArray();
        $isExist = false;
        foreach ($data as $row) {
            if ($row->name == $collection) {
                $isExist = true;
                break;
            }
        }
        return $isExist;
    }
 
 
    /**
     * 创建单个或多个索引
     * @param string $collection 集合名称
     * @param array $indexes 索引配置数组
     * [
     *     // 单个索引
     *     ['keys' => ['field1' => 1], 'options' => ['name' => 'index1', 'unique' => true]],
     *     // 复合索引
     *     ['keys' => ['field1' => 1, 'field2' => -1], 'options' => ['name' => 'index2']],
     *     // 文本索引
     *     ['keys' => ['content' => 'text'], 'options' => ['name' => 'text_index']],
     *     // 地理空间索引
     *     ['keys' => ['location' => '2dsphere'], 'options' => ['name' => 'geo_index']]
     * ]
     * @return bool|array 返回创建结果
     */
    public function createIndexes(string $collection, array $indexes) {
        $indexesToCreate = [];
    
        foreach ($indexes as $index) {
            if (!isset($index['keys'])) {
                continue;
            }
        
            $indexConfig = [
                'key' => $index['keys'],
                'name' => $index['options']['name'] ?? null,
                'unique' => $index['options']['unique'] ?? false,
            ];
        
            // 添加可选的索引参数
            if (isset($index['options']['sparse'])) {
                $indexConfig['sparse'] = $index['options']['sparse'];
            }
        
            if (isset($index['options']['expireAfterSeconds'])) {
                $indexConfig['expireAfterSeconds'] = $index['options']['expireAfterSeconds'];
            }
        
            if (isset($index['options']['partialFilterExpression'])) {
                $indexConfig['partialFilterExpression'] = $index['options']['partialFilterExpression'];
            }
        
            if (isset($index['options']['collation'])) {
                $indexConfig['collation'] = $index['options']['collation'];
            }
        
            $indexesToCreate[] = $indexConfig;
        }
    
        $command = [
            'createIndexes' => $collection,
            'indexes' => $indexesToCreate
        ];
    
        try {
            $result = $this->command($command);
            return $this::Json($result->toArray());
        } catch (\Exception $e) {
            AppLog::error("MongoDB 创建索引错误: MongoDbHelper::createIndexes() " . $e->getMessage(), 'error');
            return false;
        }
    }
  
    /**
     * 删除指定索引
     * @param string $collection 集合名称
     * @param string|array $indexName 索引名称或索引键数组
     * @return bool
     */
    public function dropIndex(string $collection, $indexName) {
        $command = [
            'dropIndexes' => $collection,
            'index' => is_array($indexName) ? $indexName : $indexName
        ];
    
        try {
            $result = $this->command($command);
            return true;
        } catch (\Exception $e) {
            AppLog::error("MongoDB 删除索引错误: MongoDbHelper::dropIndex() " . $e->getMessage(), 'error');
            return false;
        }
    }
  
    /**
     * 删除集合所有索引
     * @param string $collection 集合名称
     * @return bool
     */
    public function dropAllIndexes(string $collection) {
        $command = [
            'dropIndexes' => $collection,
            'index' => '*'
        ];
    
        try {
            $result = $this->command($command);
            return true;
        } catch (\Exception $e) {
            AppLog::error("MongoDB 删除所有索引错误: MongoDbHelper::dropAllIndexes() " . $e->getMessage(), 'error');
            return false;
        }
    }
  
    /**
     * 获取集合的所有索引
     * @param string $collection 集合名称
     * @return array|bool
     */
    public function getIndexes(string $collection) {
        $command = [
            'listIndexes' => $collection
        ];
    
        try {
            $result = $this->command($command);
            return $this::Json($result->toArray());
        } catch (\Exception $e) {
            AppLog::error("MongoDB 获取索引错误: MongoDbHelper::getIndexes() " . $e->getMessage(), 'error');
            return false;
        }
    }
 
    /**
     * 执行MongoDB命令
     * @param array $param
     * @return \MongoDB\Driver\Cursor
     */
    public function command(array $param) {
        $cmd = new Command($param);
        try {
            return $this->manager->executeCommand($this->dbname, $cmd);
        } catch (\MongoDB\Driver\Exception\Exception $e) {
            AppLog::error("MongoDB 执行MongoDB命令:  MongoDbHelper::command()-> ". json_encode($param) . " " . $e->getMessage(), 'error');
            return false;
        }
    }
  
  
    //JSON 格式化BSON数据 返回为数组格式
    public static function Json($bson){
        $json= json_encode($bson);
        $bejson=json_decode($json,true);
        return $bejson;
    }
  
    //BSON Array数组格式化JSON数据,直接插入数据使用
    public static function Bson($array){
        $data=json_encode($array);
        $bson = MongoDB\BSON\fromJSON($data);
        $redata = MongoDB\BSON\toPHP($bson);
        return $redata;
    }
  
    // 静态方法用于获取类的实例
    public static function getInstance($configKey = 'default') {
        if (!isset(self::$instances[$configKey])) {
            self::$instances[$configKey] = new self($configKey);
        }
        return self::$instances[$configKey];
    }

}