本篇文章将介绍实际业务中的查询场景。一般来说,这部分要做的只是从数据层调用查询方法获取数据并返回给api层。但在业务量不断上涨的过程中,可能会出现百万级以上的大规模数据查询场景,也可能会存在基础信息(用户账户信息等)被多个接口同时调用查询的场景,给DB造成很大压力。这时就需要考虑引入协程和缓存等方式来降低数据库IO。除此之外,还将介绍一些查询中经常碰到的一些场景的处理方法。
一般场景
调用数据层的查询方法即可,需要考虑数据缺失的异常情况,单个查询数据丢失时是否抛错,列表查询丢失数据时是否返回空值等。
# query/student.py
from my_project.data.student import Student
from my_project.util.trans import convert_student_info_to_dict
from my_project.exc import raise_user_exc, ProjectErrorCode
class StudentQuery(object):
def __init__(self):
self.student = Student()
def get_student_info_with_student_id(self, student_id):
student_info = self.student.mget_student_with_id(student_id)
if not student_info:
raise_user_exc(ProjectErrorCode.STUDENT_NOT_EXIST)
return convert_student_info_to_dict(student_info)
条件分页查询
这是Web应用常见的筛选搜索情况,需要结合多个条件搜索并对结果进行分页。
这个场景下,需要根据不同的条件组合查询方式,并根据页数和大小进行取模,并返回数据总量用于前端分页。如果筛选条件并不存在于数据库的列中,而是通过某一列与其他表关联,在筛选之前要拿到所有充当外键的列的值,传入当前model
对象中进行查询。
# models/student
class StudentModel(DeclarativeBase):
# ...
@classmethod
def mget_student_list(cls, condition, count=False):
if count:
query_ = DBSession().query(func.count(cls.id)).filter(cls.is_deleted == 0)
else:
query_ = DBSession().query(cls).filter(cls.is_deleted == 0)
if condition.get('to_date') and condition.get('from_date'):
# utc2datetime为时间戳转datetime对象的工具方法
to_date = utc2datetime(condition.get('to_date'))
from_date = utc2datetime(condition.get('from_date'))
query_ = query_.filter(cls.created_at >= from_date,
cls.created_at <= to_date)
if condition.get('school_id'):
query_ = query_.filter(cls.school_id == condition.get('school_id'))
if count:
return query_.scalar()
if condition.get('page') and condition.get('page_size'):
query_ = query_.order_by(cls.id.desc())
offset = (condition.get('page') - 1) * condition.get('page_size')
query_ = query_.offset(offset).limit(condition.get('page_size'))
return query_.all()
返回结果作为筛选结果的展示内容,如果需要展示的字段涉及其他表的内容,则需要构建_extend_journals
方法进行组合,要注意查询其他表内容时需要批量获取以降低数据库IO。
# query/student.py
class StudentQuery(object):
# ...
def _extend_journals(self, student_list):
result = []
student_id_list = [student.id for student in student_list]
identity_map = self.student.mget_student_identity_list_with_id_list(student_id_list)
for item in student_list:
result.append({
'name': item.name,
# ...
'identity_id': identity_map.get(item.id).id
})
def get_student_list(self, condition):
student_count = self.student.mget_student_list(condition, count=True)
if not student_count:
return {
'student_count': 0,
'student_list': []
}
student_list = self.student.mget_student_list(condition, count=False)
return {
'student_count': student_count,
'student_list': self._extend_journals(student_list)
}
大数据量查询
实际业务场景中经常会碰到大数据量数据查询的情况,如获取某一城市所有的学生列表。大数据量查询本身是需要一定耗时的,因此首先想到的思路是离线计算,将对应城市所有的学生列表存在线下库里的某一列中,查询时就可以实时取出,我们将在之后详细介绍此类业务场景的思路。但事与愿违,你的项目中可能不得不处理一些大数据量的查询工作,这时可以借助python的协程解决高IO负载查询。
# query/student.py
class StudentQuery(objecy):
# ...
def get_all_student_id_with_school_id_list(self, school_id_list):
# 这里分块大小需根据DB数据规模而定,或做成可配,以后会讲到配置可视化问题。
block_size = 5000
def sync_get_all_student_with_school_id_list(block_school_id_list):
return self.student.get_all_student_id_with_school_id_list(block_school_id_list)
jobs = [gevent.spawn(sync_get_all_student_with_school_id_list, school_id_list[i, i+block_size])
for i in range(0, len(school_id_list), block_size)]
gevent.joinall(jobs)
results = [job.value for job in jobs]
return reduce(lambda a, b: a + b, results) if results else []
缓存
需要缓存的部分一般为批量取用,如通过一批学生id获取学生的信息。
缓存有多种实现方式,实现策略都大同小异,这里以redis为例搭建缓存系统,缓存策略如下(缓存接近缓存容量时可以引入缓存失效策略,这里暂时不考虑):
- 给每个类型的缓存设定相应的key类型,并加上缓存取用的id组合为唯一键值,并将
id_list
替换为key_list
。 - 从redis中取
key_list
对应的值,得到res_list
。 - 遍历
id_list
,若res_list
的对应位置值为空,则该id缓存失效,将其添加到需要查询数据库的query_list
中。 - 从数据库中查询
query_list
的信息,并将其set到redis对应的key
中。 - 将缓存取到的结果和数据库取到的结果组合并返回。
- 当数据库收到
update
和insert
命令时将redis中对应key的缓存更正或失效。这里可以在代码中进行,也可以监听数据库日志借助消息队列进行操作(大厂做法),这里略过这方面的内容。
需要注意,由于缓存的超时时间一般较久,在遇到问题时再去处理redis较为麻烦,因此需要开关控制,在出问题时可直接绕过缓存读取数据库中内容,而不必等缓存失效。
# cache/student.py
import json
from my_project.data.student import Student
from my_project.util.redis_client import redis_client
from my_project.util.json_helper import MyJsonEncoder, my_json_obj_hook
from my_project.util.trans import convert_student_info_to_dict
from my_project.settings import STUDENT_INFO_CACHE_TIME
class StudentCache(object):
@staticmethod
def get_student_info(student_ids):
student_info_redis_key = "STUDENT_INFO_{}"
query_list = []
result = {}
cache_time = STUDENT_INFO_CACHE_TIME
if cache_time:
with redis_client.pipeline(transaction=False) as pipe:
BLOCK_SIZE = 100
for i in range(0, len(student_ids), BLOCK_SIZE):
pipe.mget([student_info_redis_key.format(student_id) for student_id in student_ids[i:i+BLOCK_SIZE]])
res = pipe.execute()
for i in range(0, len(student_ids)):
if not res[i / BLOCK_SIZE][i % BLOCK_SIZE]:
query_list.append(student_ids[i])
else:
result[student_ids[i]] = json.loads(res[i / BLOCK_SIZE][i % BLOCK_SIZE],
object_hook=my_json_obj_hook)
# 这里的object_hook和下面的Encoder使用工具类中json序列化中的方法,实现对数据库中一些对象的序列化和反序列化
else:
query_list = student_ids
new_query_dict = Student().mget_student_list(query_list)
with redis_client.pipeline(transaction=False) as pipe:
for student in new_query_dict:
student_info = convert_student_info_to_dict(student)
result[student.id] = student_info
if cache_time:
pipe.set(student_info_redis_key.format(student.id),
json.dumps(student_info, cls=MyJsonEncoder),
cache_time)
pipe.execute()
return result
(End)