awakeBird Back-end Dev Engineer

你真的了解SQLAlchemy吗

2018-11-27

SQLAlchemy作为一款强大而又实用的ORM框架,越来越频繁的出现在各类Python项目中。如果你是一名Python开发工程师,肯定可以熟练的写出SQLAlchemy的查询语句来满足自己的业务需求。然而,SQLAlchemy背后的知识你又了解多少呢?它是什么时候与数据库建立连接的呢?又是怎样的连接方式呢?SQLAlchemy中的Session与数据库的Transaction是一回事吗?如何优雅地管理SQLAlchemy的连接与事务呢?本文将结合SQLAlchemy文档解答这些问题。

为了方便展示,我在本地MySQL中创建了一张student表,同时打开MySQL的general-log来追踪MySQL的行为。

CREATE TABLE `student` (
    id BIGINT NOT NULL AUTO_INCREMENT COMMENT '自增主键',
    name STRING NOT NULL DEFAULT '' COMMENT '姓名',
    primary key(id)
)ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT '学生表';

set global general_log=1;

同时在iPython中创建我们的ORM对象。

DeclarativeBase = declarative_base()

class StudentModel(DeclarativeBase):

    __tablename__ = 'student'

    id = Column(BigInteger, primary_key=True)
    name = Column(String, default=u'')

连接

SQAlchemy通过create_engine创建Engine对象来实现数据库连接.

# DB_CONNECT_STRING = 'mysql+pymysql://root:1234@127.0.0.1/test?charset=utf8'
engine = create_engine(DB_CONNECT_STRING, pool_size=5, max_overflow=2, pool_recycle=60, echo=True)

这里设置echo=True来显示所执行的SQL日志。

延时加载

执行create_enginegeneral-logstdout并没有显示任何信息,因为SQLAlchemy到数据库的连接是延时加载的,只有真正需要建立连接时才会创建。

The Engine, when first returned by create_engine(), has not actually tried to connect to the database yet; that happens only the first time it is asked to perform a task against the database.

调用connect方法建立连接:

connection = engine.connect()

此时general-log显示如下信息,表示真正建立了到数据库的连接,连接线程ID为22,除此之外,SQAlchemy会默认将数据库的autocommit设置为0,在查询时会隐式开始Transaction

2018-11-27T13:52:48.695968Z    22 Connect   root@localhost on test using TCP/IP
2018-11-27T13:52:48.701437Z    22 Query     SET AUTOCOMMIT = 0

连接池

Engine对象维护着一个连接池pool,如果没有通过poolclass=NullPool来禁用连接池,在连接关闭后会被暂存在pool中,下次创建连接时会直接从pool中获取连接,如果从连接池中拿到的连接距离创建时间超过pool_recycleEngine将会将此连接释放并创建新的连接。

pool_recycle时间内调用close关闭连接,同时创建一个新连接:

connection.close()
connection2 = engine.connect()

general-log并没有新日志显示,查看MySQL的连接发现并没有新连接产生。

mysql> show processlist;
+----+------+-----------------+------+---------+------+----------+------------------+
| Id | User | Host            | db   | Command | Time | State    | Info             |
+----+------+-----------------+------+---------+------+----------+------------------+
| 12 | root | localhost       | NULL | Query   |    0 | starting | show processlist |
| 22 | root | localhost:64797 | test | Sleep   |   11 |          | NULL             |
+----+------+-----------------+------+---------+------+----------+------------------+
2 rows in set (0.00 sec)

而如果在pool_recycle之外执行上面的语句,general-log则会展示老连接释放,新连接创建的过程。

2018-11-27T13:53:14.045708Z    22 Query     ROLLBACK
2018-11-27T14:05:03.360576Z    22 Quit
2018-11-27T14:05:03.367828Z    23 Connect   root@localhost on test using TCP/IP
2018-11-27T14:05:03.372500Z    23 Query     SET AUTOCOMMIT = 0

Session

调用sessionmaker方法绑定Engine对象创建Session Factory,实例化后产生Session对象。

DBSession = sessionmaker(bind=engine, expire_on_commit=False)
session = DBSession()

Session状态

Session有两个状态:begintransactional。 区别在于Session有没有真正建立到数据库的连接,即绑定的Engine对象是否通过调用connect方法变为Connection对象。

  • 如果autocommit为默认值FalseSession在实例化之后即处于begin状态。如果autocommit值为True,则需要显式调用session.begin()方法来开启事务。
  • 当调用queryexecuteflush方法后,将创建到数据库的连接,Engine变为Connection对象,Session进入transactional状态。
  • 之后继续调用commitrollback方法后,连接关闭放入连接池,Connection对象变为Engine对象,Session也重新回到begin状态。

When the transactional state is completed after a rollback or commit, the Session releases all Transaction and Connection resources, and goes back to the “begin” state, which will again invoke new Connection and Transaction objects as new requests to emit SQL statements are received.

实例化一个ORM对象,依次调用addflushrollback方法,stdout显示了隐式开启Transaction的步骤:

a = StudentModel()
session.add(a)
session.flush()
# 2018-11-27 23:04:02,579 INFO sqlalchemy.engine.base.Engine BEGIN (implicit)
# 2018-11-27 23:04:02,579 INFO sqlalchemy.engine.base.Engine INSERT INTO student (id, name) VALUES (%s, %s)
# 2018-11-27 23:04:02,579 INFO sqlalchemy.engine.base.Engine (2, u'')

session.rollback()
# 2018-11-27 23:04:14,036 INFO sqlalchemy.engine.base.Engine ROLLBACK

general-log也显示在flush之后建立了连接并执行了查询语句,之后进行了回滚。

2018-11-27T15:04:02.573576Z    32 Connect   root@localhost on test using TCP/IP
2018-11-27T15:04:02.573872Z    32 Query     SET AUTOCOMMIT = 0
2018-11-27T15:04:02.580709Z    32 Query     INSERT INTO student (id, name) VALUES (2, '')
2018-11-27T15:04:14.036762Z    32 Query     ROLLBACK
2018-11-27T15:04:14.042584Z    32 Query     ROLLBACK

保留上面的session在1分钟后重复执行上面的步骤,general-log显示了老连接释放和新连接的建立过程。

2018-11-27T15:10:43.755010Z    32 Quit
2018-11-27T15:10:43.762052Z    33 Connect   root@localhost on test using TCP/IP
2018-11-27T15:10:43.763616Z    33 Query     SET AUTOCOMMIT = 0
2018-11-27T15:10:43.764588Z    33 Query     INSERT INTO student (id, name) VALUES (2, '')
2018-11-27T15:11:11.283859Z    33 Query     ROLLBACK
2018-11-27T15:11:11.289628Z    33 Query     ROLLBACK

flush与add的区别

通过上面的展示也可以发现,add实际上不会真正发送请求到数据库,这里所做的任何修改其他事务都是不可见的。 而flush则会发送请求到数据库,而此时的变更属于未提交变更,对其他事务是否可见取决于数据库的隔离机制。

expire_on_commit

expire_on_commit可以用来更改SQLAlchemy的对象刷新机制,默认值为True即在session调用commit之后会主动将同一个sessioncommit之前查询得到的ORM对象的_sa_instance_state.expire属性设置为Flase,再次读取该对象属性时将重载这个对象,方法是重新调用之前的查询语句。

expire_on_commit设置为True后重新执行代码,发现在获取b属性时又执行了一次查询。

b = session.query(StudentModel).filter(StudentModel.id==2).first()
# 2018-11-27 23:52:03,584 INFO sqlalchemy.engine.base.Engine SELECT student.id AS student_id, student.name AS student_name
# FROM student
# WHERE student.id = %s
#  LIMIT %s
# 2018-11-27 23:52:03,585 INFO sqlalchemy.engine.base.Engine (2, 1)

b.name = u'test'
session.add(b)
session.flush()
# 2018-11-27 23:52:31,789 INFO sqlalchemy.engine.base.Engine UPDATE student SET name=%s WHERE student.id = %s
# 2018-11-27 23:52:31,789 INFO sqlalchemy.engine.base.Engine (u'test', 2)

session.commit()
# 2018-11-27 23:53:06,424 INFO sqlalchemy.engine.base.Engine COMMIT

b._sa_instance_state.__dict__
# {'_instance_dict': <weakref at 0x1028c6f70; to 'WeakInstanceDict' at 0x1036b6050>,
#  '_strong_obj': None,
#  'callables': {'id': <sqlalchemy.orm.state.InstanceState at 0x103743990>,
#   'name': <sqlalchemy.orm.state.InstanceState at 0x103743990>},
#  'class_': __main__.StudentModel,
#  'committed_state': {},
#  'expired': True,
#  'key': (__main__.StudentModel, (2,)),
#  'manager': <ClassManager of <class '__main__.StudentModel'> at 1036b64f0>,
#  'modified': False,
#  'obj': <weakref at 0x1037016d8; to 'StudentModel' at 0x103743850>,
#  'runid': 2,
#  'session_id': 1}

b.name
# 2018-11-27 23:54:34,431 INFO sqlalchemy.engine.base.Engine BEGIN (implicit)
# 2018-11-27 23:54:34,431 INFO sqlalchemy.engine.base.Engine SELECT student.id AS student_id, student.name AS student_name
# FROM student
# WHERE student.id = %s
# 2018-11-27 23:54:34,431 INFO sqlalchemy.engine.base.Engine (2,)

虽然这样可以保持数据同步,但实际应用中一般将这个值设置为Flase,避免内存中的数据属性被更改。

实战Q&A

有了上面的理论基础,下面从实际应用出发,思考一些项目中的常见问题。

Q1.是否应该启用连接池,如何配置连接池

首先,MySQL建立连接是非常复杂的过程,我们需要启用连接池来保持与MySQL的长连接,以减少建立连接的次数。

那连接池是否越大越好呢,甚至所有连接都使用长连接?答案也是否定的,因为MySQL在执行过程中使用的临时内存是存放在连接对象中的,如果保持大量长连接会导致MySQL的内存快速增长,以致于被系统强行杀掉(OOM)异常重启。因此需要结合业务的TPS来配置连接池大小,来保证与MySQL的活跃连接维持在pool_size+max_overflow内。

至于连接池的连接回收时间,先来看一个使用MySQL时的常见报错,在查询时与MySQL断开了连接:

OperationalError: (OperationalError) (2013, "Lost connection to MySQL server during query (error(54, 'Connection reset by peer'))") 'SELECT student.id AS student_id, student.name AS student_name \nFROM student \nWHERE student.id = %s \n LIMIT %s' (1, 1)

这个错误是由MySQL连接器抛出的,原因一般有两个:

  • 所连接的MySQL服务真的发生了重启操作,这种问题无法避免。
  • MySQL连接器会主动关闭空闲(Sleep)时间达到wait_timeout时间的连接,这个超时时间的默认值为8小时

SQLAlchemy在捕获到MySQL连接器的抛错后会主动清空连接池里的连接,如果需要进行查询需要重新建立连接。因此,你的连接池pool_recycle参数至少要小于MySQL的wait_timeout时间,以避免连接池中存在被MySQL连接器主动断开的连接。当然也不能太小,使连接池失去意义。一般的建议值为30分钟之内。

Q2.如何在应用中管理Session

关于这点,SQLAlchemy官方文档中给出了合理的方案,但需要注意一些细节。

  1. As a general rule, keep the lifecycle of the session separate and external from functions and objects that access and/or manipulate database data. This will greatly help with achieving a predictable and consistent transactional scope.
  2. Make sure you have a clear notion of where transactions begin and end, and keep transactions short, meaning, they end at the series of a sequence of operations, instead of being held open indefinitely.

一般来说,你需要在访问数据库的时候创建Session,开启Transaction同时执行查询语句,在所有查询语句执行结束之后关闭TransactionSession。任何时候,只要一个Session被创建并建立数据库连接,则它必须被关闭从而将数据库连接释放到连接池中,否则连接将永远不会释放直到达到数据库连接的超时时间。

对于Web应用来说,由于一个请求的处理时间足够短,可以更简单地将Session的生命周期同请求的生命周期保持一致,即如果一个请求需要访问数据库,则创建Session,处理完这个请求则关闭Session

由于Session是必须被关闭的,为了方便的管理Session,保证一个请求内只有一个Session是更好的选择,你可以将创建的Session作为全局变量使用,或者更方便地,使用scoped_session,保证同一个线程内实例化的Session都为同一个对象。

session_factory = sessionmaker(bind=engine, expire_on_commit=True)
DBSession = scoped_session(session_factory)

这样,在请求结束时,你只需要关闭一个Session即可。例如你可以在Flask应用中的after_request函数中加入一行代码调用Session Factoryremove方法,就可以保证请求中的Session对象被关闭,它会调用Sessionclose方法,将数据库连接放回连接池并将未commitTransaction回滚。

DBSession.remove()

The scoped_session.remove() method first calls Session.close() on the current Session, which has the effect of releasing any connection/transactional resources owned by the Session first, then discarding the Session itself. “Releasing” here means that connections are returned to their connection pool and any transactional state is rolled back, ultimately using the rollback() method of the underlying DBAPI connection.

需要注意的是,scoped_session对在同一个线程的判断方式为thrending.local(),当使用多线程处理某个查询语句时,正确的使用方法是在当前线程中实例化一个Session并将其传入多线程函数中,否则其他线程中的Session将无法被关闭,数据库连接和Transaction也将一直不会被释放。

不同于close方法,写请求的commit则不是一定要执行的,个人不推荐和remove方法一样放在请求结束,而是自定义上下文或用装饰器来进行commit,保证需要执行commit的查询被commit。下面的代码是我个人推荐的写法:

class RoutingSession(Session):

    def __init__(self, *args, **kw):
        super(RoutingSession, self).__init__(*args, **kw)

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        try:
            if exc_val is None:
                self.flush()
                self.commit()
            elif isinstance(exc_val, SQLAlchemyError):
                self.rollback()
        except SQLAlchemyError as e:
            self.rollback()
        finally:
            self.close()

def gen_commit_deco(session_factory):
    def decorated(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            try:
                with session_factory():
                    return func(*args, **kwargs)
            except SQLAlchemyError as e:
                raise
        return wrapper
    return decorated

session_factory = sessionmaker(class_=RoutingSession, bind=engine, expire_on_commit=False)
auto_commit = gen_commit_deco(DBSession)

Q3.Session与Transaction是一回事吗

回到开篇的问题,Session与数据库的Transaction能画等号吗?

答案是否定的,只有transactional状态下的Session对象才相当于数据库的Transaction,除此之外,当Session执行commitrollback方法后,与其对应的数据库Transaction也就结束了,之后这个Session再次执行queryflush方法,又会有一个新的数据库Transaction与其绑定,所以一个Session是可以对应不同的数据库Transaction的。

对于每一个Transaction,数据库都会记录它的回滚日志undo log来记录当前事务内数据库的某条记录的值read-viewundo log会一直保留,直到数据库中没有比它更老版本的read-view,因此Transaction的生命周期不能太长,否则数据库会浪费大量空间储存回滚日志。对于普通Web请求,可以用Q2中的方法来管理Session,但如果你在耗时较大的脚本或定时任务中用到Session,推荐将Session的生命周期控制在尽可能少的函数块内,以数据库避免长事务的产生。

可以用下面的命令判断数据库中存在的长事务:

select * from information_schema.innodb_trx where TIME_TO_SEC(timediff(now(),trx_started))>60

写在后面

SQLAlchemy对于每一位Python开发者来说都是宝藏,不但在于其强大且实用,其源码所蕴含的unit work思想和各类设计模式更是绝佳的教材。这里给自己挖个坑,下一篇SQLAlchemy博客将会从源码角度进行架构剖析。

(End)


Similar Posts

上一篇 Zookeeper全攻略

Comments