awakeBird Back-end Dev Engineer

Python Web 基础向(六) 查询

2018-04-20

本篇文章将介绍实际业务中的查询场景。一般来说,这部分要做的只是从数据层调用查询方法获取数据并返回给api层。但在业务量不断上涨的过程中,可能会出现百万级以上的大规模数据查询场景,也可能会存在基础信息(用户账户信息等)被多个接口同时调用查询的场景,给DB造成很大压力。这时就需要考虑引入协程和缓存等方式来降低数据库IO。除此之外,还将介绍一些查询中经常碰到的一些场景的处理方法。

此处输入图片的描述

一般场景

调用数据层的查询方法即可,需要考虑数据缺失的异常情况,单个查询数据丢失时是否抛错,列表查询丢失数据时是否返回空值等。

# query/student.py

from my_project.data.student import Student
from my_project.util.trans import convert_student_info_to_dict
from my_project.exc import raise_user_exc, ProjectErrorCode


class StudentQuery(object):

    def __init__(self):
        self.student = Student()

    def get_student_info_with_student_id(self, student_id):
        student_info = self.student.mget_student_with_id(student_id)
        if not student_info:
            raise_user_exc(ProjectErrorCode.STUDENT_NOT_EXIST)
        return convert_student_info_to_dict(student_info)

条件分页查询

这是Web应用常见的筛选搜索情况,需要结合多个条件搜索并对结果进行分页。

这个场景下,需要根据不同的条件组合查询方式,并根据页数和大小进行取模,并返回数据总量用于前端分页。如果筛选条件并不存在于数据库的列中,而是通过某一列与其他表关联,在筛选之前要拿到所有充当外键的列的值,传入当前model对象中进行查询。

# models/student

class StudentModel(DeclarativeBase):

	# ...

    @classmethod
    def mget_student_list(cls, condition, count=False):
        if count:
            query_ = DBSession().query(func.count(cls.id)).filter(cls.is_deleted == 0)
        else:
            query_ = DBSession().query(cls).filter(cls.is_deleted == 0)
        if condition.get('to_date') and condition.get('from_date'):
        	# utc2datetime为时间戳转datetime对象的工具方法
            to_date = utc2datetime(condition.get('to_date'))
            from_date = utc2datetime(condition.get('from_date'))
            query_ = query_.filter(cls.created_at >= from_date,
                                   cls.created_at <= to_date)
        if condition.get('school_id'):
            query_ = query_.filter(cls.school_id == condition.get('school_id'))

        if count:
            return query_.scalar()

        if condition.get('page') and condition.get('page_size'):
            query_ = query_.order_by(cls.id.desc())
            offset = (condition.get('page') - 1) * condition.get('page_size')
            query_ = query_.offset(offset).limit(condition.get('page_size'))

        return query_.all()

返回结果作为筛选结果的展示内容,如果需要展示的字段涉及其他表的内容,则需要构建_extend_journals方法进行组合,要注意查询其他表内容时需要批量获取以降低数据库IO。

# query/student.py

class StudentQuery(object):

	# ...

    def _extend_journals(self, student_list):
        result = []
        student_id_list = [student.id for student in student_list]
        identity_map = self.student.mget_student_identity_list_with_id_list(student_id_list)
        for item in student_list:
            result.append({
                'name': item.name,
                # ...
                'identity_id': identity_map.get(item.id).id
            })

    def get_student_list(self, condition):
        student_count = self.student.mget_student_list(condition, count=True)
        if not student_count:
            return {
                'student_count': 0,
                'student_list': []
            }
        student_list = self.student.mget_student_list(condition, count=False)
        return {
            'student_count': student_count,
            'student_list': self._extend_journals(student_list)
        }

大数据量查询

实际业务场景中经常会碰到大数据量数据查询的情况,如获取某一城市所有的学生列表。大数据量查询本身是需要一定耗时的,因此首先想到的思路是离线计算,将对应城市所有的学生列表存在线下库里的某一列中,查询时就可以实时取出,我们将在之后详细介绍此类业务场景的思路。但事与愿违,你的项目中可能不得不处理一些大数据量的查询工作,这时可以借助python的协程解决高IO负载查询。

# query/student.py

class StudentQuery(objecy):

	# ...

    def get_all_student_id_with_school_id_list(self, school_id_list):
    	# 这里分块大小需根据DB数据规模而定,或做成可配,以后会讲到配置可视化问题。
        block_size = 5000

        def sync_get_all_student_with_school_id_list(block_school_id_list):
            return self.student.get_all_student_id_with_school_id_list(block_school_id_list)

        jobs = [gevent.spawn(sync_get_all_student_with_school_id_list, school_id_list[i, i+block_size])
                for i in range(0, len(school_id_list), block_size)]
        gevent.joinall(jobs)
        results = [job.value for job in jobs]
        return reduce(lambda a, b: a + b, results) if results else []

缓存

需要缓存的部分一般为批量取用,如通过一批学生id获取学生的信息。

缓存有多种实现方式,实现策略都大同小异,这里以redis为例搭建缓存系统,缓存策略如下(缓存接近缓存容量时可以引入缓存失效策略,这里暂时不考虑):

  1. 给每个类型的缓存设定相应的key类型,并加上缓存取用的id组合为唯一键值,并将id_list替换为key_list
  2. 从redis中取key_list对应的值,得到res_list
  3. 遍历id_list,若res_list的对应位置值为空,则该id缓存失效,将其添加到需要查询数据库的query_list中。
  4. 从数据库中查询query_list的信息,并将其set到redis对应的key中。
  5. 将缓存取到的结果和数据库取到的结果组合并返回。
  6. 当数据库收到updateinsert命令时将redis中对应key的缓存更正或失效。这里可以在代码中进行,也可以监听数据库日志借助消息队列进行操作(大厂做法),这里略过这方面的内容。

需要注意,由于缓存的超时时间一般较久,在遇到问题时再去处理redis较为麻烦,因此需要开关控制,在出问题时可直接绕过缓存读取数据库中内容,而不必等缓存失效。

# cache/student.py

import json

from my_project.data.student import Student
from my_project.util.redis_client import redis_client
from my_project.util.json_helper import MyJsonEncoder, my_json_obj_hook
from my_project.util.trans import convert_student_info_to_dict
from my_project.settings import STUDENT_INFO_CACHE_TIME

class StudentCache(object):

    @staticmethod
    def get_student_info(student_ids):
        student_info_redis_key = "STUDENT_INFO_{}"
        query_list = []
        result = {}
        cache_time = STUDENT_INFO_CACHE_TIME
        if cache_time:
            with redis_client.pipeline(transaction=False) as pipe:
                BLOCK_SIZE = 100
                for i in range(0, len(student_ids), BLOCK_SIZE):
                    pipe.mget([student_info_redis_key.format(student_id) for student_id in student_ids[i:i+BLOCK_SIZE]])
                res = pipe.execute()

            for i in range(0, len(student_ids)):
                if not res[i / BLOCK_SIZE][i % BLOCK_SIZE]:
                    query_list.append(student_ids[i])
                else:
                    result[student_ids[i]] = json.loads(res[i / BLOCK_SIZE][i % BLOCK_SIZE],
                                                        object_hook=my_json_obj_hook)
                    # 这里的object_hook和下面的Encoder使用工具类中json序列化中的方法,实现对数据库中一些对象的序列化和反序列化
        else:
            query_list = student_ids

        new_query_dict = Student().mget_student_list(query_list)

        with redis_client.pipeline(transaction=False) as pipe:
            for student in new_query_dict:
                student_info = convert_student_info_to_dict(student)
                result[student.id] = student_info
                if cache_time:
                    pipe.set(student_info_redis_key.format(student.id),
                             json.dumps(student_info, cls=MyJsonEncoder),
                             cache_time)
            pipe.execute()

        return result

(End)


Comments

Content