Python数据分析及可视化实例之MongoDB增删改查

发布时间:2021-12-03 公开文章

1.MongoDB安装

有时候度娘还是给力的,相反一些博客的安装方法则显得凌乱:

jingyan.baidu.com/artic

最初使用Win7安装经常会出现意料不到的错误,

比如开机启动服务,服务器日志等。

上面链接安装步骤简单,没有炫技的成分,赞一个。

 

2.基本操作:增删改查

在Python中操作该数据库,需要安装依赖库Pymongo,

没有什么说明比官方文档更好的了:PyMongo 3.5.1 Documentation

如果你采用其他的库请参照响应的文档,比如MongoEngine。

 

下面的代码在Pymongo文档的基础上DIY类,比较完善。

不用该类也是可以的,用的时候再直接粗暴操作,MongoDB也喜欢。

# encoding: utf-8
__author__ = 'yeayee'  2015-07

from pymongo import MongoClient
from pymongo import errors
import json
from datetime import date, datetime

# 数据库日志配置
from log import Logger
my_logger = Logger('mongodb', 'mongodb.log', 'DEBUG')
my_logger.set_file_level('DEBUG')
my_logger.set_stream_level('WARNING')  # WARNING DEBUG
my_logger.set_stream_handler_fmt('%(message)s')
my_logger.load()
logger = my_logger.logger
# my_logger.get_memory_usage()


class Mongodb():
    """
    自定义mongodb工具
    """
    def __init__(self, db_config, db_name=None):
        self.db_config = db_config
        if db_name is not None:
            self.db_config['database'] = db_name
        try:
            # 实例化mongodb
            self.conn = MongoClient(self.db_config['host'], self.db_config['port'])
            # 获取数据库对象(选择/切换)
            self.db = self.conn.get_database(self.db_config['database'])
        except errors.ServerSelectionTimeoutError, e:
            logger.error('连接超时:%s' % e)
        except Exception as e:
            logger.error(e)

    @staticmethod
    def __default(obj):
        """
        支持datetime的json encode
        TypeError: datetime.datetime(2015, 10, 21, 8, 42, 54) is not JSON serializable
        :param obj:
        :return:
        """
        if isinstance(obj, datetime):
            return obj.strftime('%Y-%m-%d %H:%M:%S')
        elif isinstance(obj, date):
            return obj.strftime('%Y-%m-%d')
        else:
            raise TypeError('%r is not JSON serializable' % obj)

    def close_conn(self):
        """
        关闭连接
        关闭所有套接字的连接池和停止监控线程。
        如果这个实例再次使用它将自动重启和重新启动线程
        """
        self.conn.close()

    def find_one(self, table_name, condition=None):
        """
        查询单条记录
        :param table_name:
        :param condition:
        :return:
        """
        return self.db.get_collection(table_name).find_one(condition)

    def find_all(self, table_name, condition=None):
        """
        查询多条记录
        :param table_name:
        :param condition:
        :return:
        """
        return self.db.get_collection(table_name).find(condition)

    def count(self, table_name, condition=None):
        """
        查询记录总数
        :param table_name:
        :param condition:
        :return:
        """
        return self.db.get_collection(table_name).count(condition)

    def distinct(self, table_name, field_name):
        """
        查询某字段去重后值的范围
        :param table_name:
        :param field_name:
        :return:
        """
        return self.db.get_collection(table_name).distinct(field_name)

    def insert(self, table_name, data):
        """
        插入数据
        :param table_name:
        :param data:
        :return:
        """
        try:
            ids = self.db.get_collection(table_name).insert(data)
            return ids
        except Exception, e:
            logger.error('插入失败:%s' % e)
            return None

    def update(self, table_name, condition, update_data, update_type='set'):
        """
        批量更新数据
        upsert : 如果不存在update的记录,是否插入;true为插入,默认是false,不插入。
        :param table_name:
        :param condition:
        :param update_data:
        :param update_type: 范围:['inc', 'set', 'unset', 'push', 'pushAll', 'addToSet', 'pop', 'pull', 'pullAll', 'rename']
        :return:
        """
        if update_type not in ['inc', 'set', 'unset', 'push', 'pushAll', 'addToSet', 'pop', 'pull', 'pullAll', 'rename']:
            logger.error('更新失败,类型错误:%s' % update_type)
            return None
        try:
            result = self.db.get_collection(table_name).update_many(condition, {'$%s' % update_type: update_data})
            logger.info('更新成功,匹配数量:%s;更新数量:%s' % (result.matched_count, result.modified_count))
            return result.modified_count  # 返回更新数量,仅支持MongoDB 2.6及以上版本
        except Exception, e:
            logger.error('更新失败:%s' % e)
            return None

    def remove(self, table_name, condition=None):
        """
        删除文档记录
        :param table_name:
        :param condition:
        :return:
        """
        result = self.db.get_collection(table_name).remove(condition)
        if result.get('err') is None:
            logger.info('删除成功,删除行数%s' % result.get('n', 0))
            return result.get('n', 0)
        else:
            logger.error('删除失败:%s' % result.get('err'))
            return None

    def output_row(self, table_name, condition=None, style=0):
        """
        格式化输出单个记录
        style=0 键值对齐风格
        style=1 JSON缩进风格
        :param table_name:
        :param condition:
        :param style:
        :return:
        """
        row = self.find_one(table_name, condition)
        if style == 0:
            # 获取KEY最大的长度作为缩进依据
            max_len_key = max([len(each_key) for each_key in row.keys()])
            str_format = '{0: >%s}' % max_len_key
            keys = [str_format.format(each_key) for each_key in row.keys()]
            result = dict(zip(keys, row.values()))
            print ('**********  表名[%s]  **********' % table_name)
            for key, item in result.items():
                print key, ':', item
        else:
            print (json.dumps(row, indent=4, ensure_ascii=False, default=self.__default))

    def output_rows(self, table_name, condition=None, style=0):
        """
        格式化输出批量记录
        style=0 键值对齐风格
        style=1 JSON缩进风格
        :param table_name:
        :param condition:
        :param style:
        :return:
        """
        rows = self.find_all(table_name, condition)
        total = self.count(table_name, condition)
        if style == 0:
            count = 0
            for row in rows:
                # 获取KEY最大的长度作为缩进依据
                max_len_key = max([len(each_key) for each_key in row.keys()])
                str_format = '{0: >%s}' % max_len_key
                keys = [str_format.format(each_key) for each_key in row.keys()]
                result = dict(zip(keys, row.values()))
                count += 1
                print ('**********  表名[%s]  [%d/%d]  **********' % (table_name, count, total))
                for key, item in result.items():
                    print key, ':', item
        else:
            for row in rows:
                print (json.dumps(row, indent=4, ensure_ascii=False, default=self.__default))

注:上面的类初学者能看懂最好,看不懂就只看数据库操作相关命令即可。 之所以分享这个类操作,在Flask Web开发过程中要用到类的继承,实现记录用户登录状态。如果你接下来并不打算开发Web也不想用Web数据可视化,那么给你最粗暴的操作方法:

api.mongodb.com/python/

# encoding: utf-8
__author__ = 'yeayee'  2015-08
from pymongo import MongoClient
client = MongoClient(host='localhost', port=27017)
db = client.test # 直接建立一个测试数据库
collection = db.students # 在上数据库建立一个集合
student = {
    'id': '20170101',
    'name': 'Jordan',
    'age': 20,
    'gender': 'male'
}  # 声明一条数据

# 增
collection.insert(student)  # 插入数据
# 改,发现年龄错了,加1岁
collection.update_one({'name': 'Jordan'}, {'$inc': {'age': 1}})
# 查
result = collection.find_one({'name': 'Jordan'})  # 查询数据,返回的是一个游标
print(result)
# <pymongo.results.UpdateResult object at 0x10c6384c8>
# 删
result = collection.remove({'name': 'Jordan'})  # 返回删除状态
print(result)
# {'ok': 1, 'n': 1}

注:增删改查没这么简单,只是举了一个最简单的例子,有一本《MongoDB权威指南》,可以自己找PDF版;如果决定用这货,黑框下面操作MongoDB是逃不掉的。此外,在win平台下推荐软件Robomongo ,可视化操作数据集合。嗯,具体到插入多条,更改多条数据,聚类操作、地理位置信息聚类等,等实例源码中慢慢解说。