awakeBird Back-end Dev Engineer

Python Web 基础向(二) 操作数据库

2018-01-17
 

从这篇文章开始,将对上文中所提起及的各抽象层进行进一步阐述,涉及互联网业务代码中的各项基本操作。本文以MySQL为例操作关系型数据库。

注: 此系列文章依托Flask框架介绍,但涉及框架内容较少,具有一定普适性。

操作数据库

虽然一般数据库由DBA进行维护,但仍然推荐将建表的SQL语句以文件形式储存在代码中。

// project.sql

create database if not exists test;

use test;

CREATE TABLE `student` (
    id BIGINT NOT NULL AUTO_INCREMENT COMMENT '自增主键',
    name STRING NOT NULL DEFAULT '' COMMENT '姓名',
    school_id INT(11) NOT NULL DEFAULT 0 COMMENT '学校id',
    grade INT(11) NOT NULL DEFAULT 0 COMMENT '年级',
    class_no INT(11) NOT NULL DEFAULT 0 COMMENT '班级',
    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
    updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
    is_deleted tinyint(4) NOT NULL DEFAULT '0' COMMENT '该记录是否还存在',
    primary key(id),
    KEY `ix_school_id` (school_id),
    KEY `ix_created_at` (`created_at`) USING BTREE,
    KEY `ix_updated_at` (`updated_at`) USING BTREE
)ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT '学生记录表';
...

每张表中除了自增主键外,还应有以下三列分别用于数据库归档和软删除等操作:

  • created_at
  • updated_at
  • is_deleted

ORM

对于大多数业务,借助ORM对数据库进行操作足以满足需求。这里以SQLAlchemy为例。

这里所有的代码位于DB层,即models目录下。

初始化

应当在包的__init__.py文件中完成一些初始化操作。

# models/__init__.py

import functools

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session, scoped_session
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.declarative import declarative_base

DB_CONNECT_STRING = 'mysql+mysqldb://username:password@localhost/database?charset=utf8'
engine = create_engine(DB_CONNECT_STRING)
session_factory = sessionmaker(class_=RoutingSession, bind=engine)
DBSession = scoped_session(session_factory)

DeclarativeBase = declarative_base()

当然,你也可以做适当封装,例如抽象出auto_commit装饰器以适应不同逻辑:

# models/__init__.py

class RoutingSession(Session):

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        try:
            if exc_val is None:
                self.flush()
                self.commit()
            elif isinstance(exc_val, SQLAlchemyError):
                self.rollback()
        except SQLAlchemyError as e:
            self.rollback()
        finally:
            self.close()


def gen_commit_deco(session_factory):
    def decorated(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            try:
                with session_factory():
                    return func(*args, **kwargs)
            except SQLAlchemyError as e:
                raise
        return wrapper
    return decorated

session_factory = sessionmaker(class_=RoutingSession, bind=engine)
DBSession = scoped_session(session_factory)
auto_commit = gen_commit_deco(DBSession)

需要说明的是,Session使用方法可以很灵活,具体参照SQLAlchemy官方文档,这里使用scoped_session包装Session,因此可以在代码中多次实例化Session对象,拿到的都是同一个实例。如果你不愿这样做的话,则需要留心Session的生命周期。

构建Model对象

在这里构建你的Model对象,完成基本的增查改删操作。需要注意的地方有两点:

  1. 构造convert_param_dict方法以过滤Model类不含有的字段,用于insertupdate方法中。
  2. 如果无特殊说明,在方法名称前加m表示批量查询。
# models/student.py

class StudentModel(DeclarativeBase):

    __tablename__ = 'student'

    id = Column(BigInteger, primary_key=True)
    name = Column(String, default=u'')
    school_id = Column(BigInteger, default=0)
    grade = Column(BigInteger, default=0)
    class_no = Column(BigInteger, default=0)
    created_at = Column(DateTime, default=func.now())
    updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
    is_deleted = Column(SmallInteger, default=0)

    @classmethod
    def convert_param_dict(cls, student):
        result = {k: v for k, v in student.items()
                  if hasattr(StudentModel, k) and v is not None}
        return result

    @classmethod
    def insert(cls, student):
        instance = cls(**cls.convert_param_dict(student))
        DBSession().add(instance)
        DBSession().flush()
        return instance.id

    @classmethod
    def update(cls, student):
        param = cls.convert_param_dict(student)
        return DBSession().query(cls).\
            filter(cls.id == student.get('id'),
                   cls.is_deleted == 0).\
            update(param)

    @classmethod
    def delete(cls, id):
        DBSession().query(cls).\
            filter(cls.id == id,
                   cls.is_deleted == 0).\
            update({'is_deleted': 1})

    @classmethod
    def get_student_with_id(cls, student_id):
        return DBSession().query(cls).\
            filter(cls.id == student_id,
                   cls.is_deleted == 0).\
            all()

查询方法

DB层中会有大量的查询方法,这时候需要留心,有的查询会随着数据量的增加持续上升。由于DB层是最内层的服务,一个导致慢查询的方法不但会使整个请求时长增大,甚至可能拖垮你的数据库。 关于数据库查询优化网上有大量博客文章可供参考,这里不加赘述,只列举出几个常见的注意点。

  1. 避免同时使用DESCLIMIT OFFSET的SQL。
  2. 在使用LIMIT OFFSET做分页查询时注意数据量预期和使用场景,若数据量过大则需要优化为以限制id并加LIMIT的方式。
  3. 使用ORDER BY语句需要考虑索引优化。
  4. 避免数据库重复io,批量场景需要重新定义批量查询接口。
  5. 只拿对应的查询数据,即避免不经思考使用SELECT *
  6. 所有的判断和计算在查询DB层之前完成。

如果你在数据库方面没有很多的经验,最好的方式是每次新增或者改动DB层查询方法时,先在数据库explain一下你的查询语句,观察扫描了多少行,是否命中索引等等,防患于未然。

Tips

在group之前执行order

应用场景为在聚合数据中获取某一列值最大的一条记录,需要在group_by前先排序,可以应用from_self方法重新生成子查询,如下:

DBSession().query(model).order_by(model.created_at.desc()).from_self().group_by(model.id).all()

from_self可以根据查询结果生成一个子查询,不但可以改变sql执行的固有顺序,还可以在limitoffset之后继续进行查询语句。

在列中储存数组

应尽量避免在mysql的一列中存储大字段类型的数据,但如果有需要使用的场景,也应注意在将文本型数据转换为相应数组或字典时,避免使用eval方法。这里以数组为例列出一种储存数组型数据的方式。

# model/student_group.py

class StudentGroupModel(DeclarativeBase):

    __tablename__ = 'student_group'

    id = Column(BigInteger, primary_key=True)
    student_ids_str = Column(Text, default="")

    created_at = Column(DateTime, default=func.now())
    updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
    is_deleted = Column(SmallInteger, default=0)

    @property
    def student_ids(self):
        return map(int, self.student_ids_str.split(',')) if self.student_ids_str else []

    @student_ids.setter
    def student_ids(self, student_ids):
        self.student_ids_str = ",".join((str(x) for x in student_ids))

    @classmethod
    def insert(cls, student_list):
        result = cls(student_list = ",".join([str(_) for _ in student_list]))
        DBSession().add(result)
        return result

批量插入

    @classmethod
    def multi_insert(cls, student_list):
        param = [cls.convert_param_dict(student) for student in student_list]
        return DBSession().execute(cls.__table__.insert(), param)

依托sqlalchemy的批量插入有许多方法,这里采用非ORM方式,效率较高。(sqlalchemy中有bulk_insert_mappings方法为ORM形式) 此外,此方法如果同property一起连用的话需要注意做一些转换。

(End)


Comments

Content