从这篇文章开始,将对上文中所提起及的各抽象层进行进一步阐述,涉及互联网业务代码中的各项基本操作。本文以MySQL为例操作关系型数据库。
注: 此系列文章依托Flask框架介绍,但涉及框架内容较少,具有一定普适性。
操作数据库
虽然一般数据库由DBA进行维护,但仍然推荐将建表的SQL语句以文件形式储存在代码中。
// project.sql
create database if not exists test;
use test;
CREATE TABLE `student` (
id BIGINT NOT NULL AUTO_INCREMENT COMMENT '自增主键',
name STRING NOT NULL DEFAULT '' COMMENT '姓名',
school_id INT(11) NOT NULL DEFAULT 0 COMMENT '学校id',
grade INT(11) NOT NULL DEFAULT 0 COMMENT '年级',
class_no INT(11) NOT NULL DEFAULT 0 COMMENT '班级',
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
is_deleted tinyint(4) NOT NULL DEFAULT '0' COMMENT '该记录是否还存在',
primary key(id),
KEY `ix_school_id` (school_id),
KEY `ix_created_at` (`created_at`) USING BTREE,
KEY `ix_updated_at` (`updated_at`) USING BTREE
)ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT '学生记录表';
...
每张表中除了自增主键外,还应有以下三列分别用于数据库归档和软删除等操作:
- created_at
- updated_at
- is_deleted
ORM
对于大多数业务,借助ORM对数据库进行操作足以满足需求。这里以SQLAlchemy为例。
这里所有的代码位于DB层,即models
目录下。
初始化
应当在包的__init__.py
文件中完成一些初始化操作。
# models/__init__.py
import functools
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session, scoped_session
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.declarative import declarative_base
DB_CONNECT_STRING = 'mysql+mysqldb://username:password@localhost/database?charset=utf8'
engine = create_engine(DB_CONNECT_STRING)
session_factory = sessionmaker(class_=RoutingSession, bind=engine)
DBSession = scoped_session(session_factory)
DeclarativeBase = declarative_base()
当然,你也可以做适当封装,例如抽象出auto_commit
装饰器以适应不同逻辑:
# models/__init__.py
class RoutingSession(Session):
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
try:
if exc_val is None:
self.flush()
self.commit()
elif isinstance(exc_val, SQLAlchemyError):
self.rollback()
except SQLAlchemyError as e:
self.rollback()
finally:
self.close()
def gen_commit_deco(session_factory):
def decorated(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
with session_factory():
return func(*args, **kwargs)
except SQLAlchemyError as e:
raise
return wrapper
return decorated
session_factory = sessionmaker(class_=RoutingSession, bind=engine)
DBSession = scoped_session(session_factory)
auto_commit = gen_commit_deco(DBSession)
需要说明的是,Session使用方法可以很灵活,具体参照SQLAlchemy官方文档,这里使用scoped_session
包装Session,因此可以在代码中多次实例化Session对象,拿到的都是同一个实例。如果你不愿这样做的话,则需要留心Session的生命周期。
构建Model对象
在这里构建你的Model对象,完成基本的增查改删操作。需要注意的地方有两点:
- 构造
convert_param_dict
方法以过滤Model类不含有的字段,用于insert
和update
方法中。 - 如果无特殊说明,在方法名称前加
m
表示批量查询。
# models/student.py
class StudentModel(DeclarativeBase):
__tablename__ = 'student'
id = Column(BigInteger, primary_key=True)
name = Column(String, default=u'')
school_id = Column(BigInteger, default=0)
grade = Column(BigInteger, default=0)
class_no = Column(BigInteger, default=0)
created_at = Column(DateTime, default=func.now())
updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
is_deleted = Column(SmallInteger, default=0)
@classmethod
def convert_param_dict(cls, student):
result = {k: v for k, v in student.items()
if hasattr(StudentModel, k) and v is not None}
return result
@classmethod
def insert(cls, student):
instance = cls(**cls.convert_param_dict(student))
DBSession().add(instance)
DBSession().flush()
return instance.id
@classmethod
def update(cls, student):
param = cls.convert_param_dict(student)
return DBSession().query(cls).\
filter(cls.id == student.get('id'),
cls.is_deleted == 0).\
update(param)
@classmethod
def delete(cls, id):
DBSession().query(cls).\
filter(cls.id == id,
cls.is_deleted == 0).\
update({'is_deleted': 1})
@classmethod
def get_student_with_id(cls, student_id):
return DBSession().query(cls).\
filter(cls.id == student_id,
cls.is_deleted == 0).\
all()
查询方法
DB层中会有大量的查询方法,这时候需要留心,有的查询会随着数据量的增加持续上升。由于DB层是最内层的服务,一个导致慢查询的方法不但会使整个请求时长增大,甚至可能拖垮你的数据库。 关于数据库查询优化网上有大量博客文章可供参考,这里不加赘述,只列举出几个常见的注意点。
- 避免同时使用
DESC
和LIMIT OFFSET
的SQL。 - 在使用
LIMIT OFFSET
做分页查询时注意数据量预期和使用场景,若数据量过大则需要优化为以限制id并加LIMIT的方式。 - 使用
ORDER BY
语句需要考虑索引优化。 - 避免数据库重复io,批量场景需要重新定义批量查询接口。
- 只拿对应的查询数据,即避免不经思考使用
SELECT *
。 - 所有的判断和计算在查询DB层之前完成。
如果你在数据库方面没有很多的经验,最好的方式是每次新增或者改动DB层查询方法时,先在数据库explain一下你的查询语句,观察扫描了多少行,是否命中索引等等,防患于未然。
Tips
在group之前执行order
应用场景为在聚合数据中获取某一列值最大的一条记录,需要在group_by
前先排序,可以应用from_self
方法重新生成子查询,如下:
DBSession().query(model).order_by(model.created_at.desc()).from_self().group_by(model.id).all()
from_self
可以根据查询结果生成一个子查询,不但可以改变sql执行的固有顺序,还可以在limit
或offset
之后继续进行查询语句。
在列中储存数组
应尽量避免在mysql的一列中存储大字段类型的数据,但如果有需要使用的场景,也应注意在将文本型数据转换为相应数组或字典时,避免使用eval方法。这里以数组为例列出一种储存数组型数据的方式。
# model/student_group.py
class StudentGroupModel(DeclarativeBase):
__tablename__ = 'student_group'
id = Column(BigInteger, primary_key=True)
student_ids_str = Column(Text, default="")
created_at = Column(DateTime, default=func.now())
updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
is_deleted = Column(SmallInteger, default=0)
@property
def student_ids(self):
return map(int, self.student_ids_str.split(',')) if self.student_ids_str else []
@student_ids.setter
def student_ids(self, student_ids):
self.student_ids_str = ",".join((str(x) for x in student_ids))
@classmethod
def insert(cls, student_list):
result = cls(student_list = ",".join([str(_) for _ in student_list]))
DBSession().add(result)
return result
批量插入
@classmethod
def multi_insert(cls, student_list):
param = [cls.convert_param_dict(student) for student in student_list]
return DBSession().execute(cls.__table__.insert(), param)
依托sqlalchemy
的批量插入有许多方法,这里采用非ORM方式,效率较高。(sqlalchemy
中有bulk_insert_mappings
方法为ORM形式)
此外,此方法如果同property
一起连用的话需要注意做一些转换。
(End)