当多个线程同时操作共享的内存数据时,为了解决线程安全问题,各语言都引入并发锁来确保同一时间只有一个线程会操作共享内存。而现在的大规模互联网应用一般采用分布式和集群来提高后端服务性能,同一个服务可能在不同的机器或容器中都存在实例,多线程中的并发锁也失去了意义,这时就引入了分布式锁的概念,当然它们的目的是相同的,都是为了限制同一时间内一个资源只能被一个请求操作,或者说某个方法只能被运行一次。
目前业内实现分布式锁的方式主要有下面三种:
-
基于数据库的悲观锁乐观锁
-
基于redis的分布式事务锁
-
基于Zookeeper的节点锁
其中redis简单易用,应用最广也较为稳定,本文主要介绍这种方式下的分布式锁。
应用场景
一般而言,下面几个条件满足其中之一的业务场景就需要考虑引入分布式情况。
-
多个用户操作同一资源。如抢单,抢红包等场景。
-
单用户操作的不可重复的敏感资源。如缴费,扣款等。
-
不能保证幂等的更新操作。如各类活动积分。
在引入分布式事务锁时需要考虑清楚业务场景是否需要和适合上锁,虽说上锁成本和难度都不高,但盲目上锁会占用redis资源,一定程度上也会降低接口性能,所以还是三思而后行。
python实现
目前分布式锁的设计思路,大家应该没有什么疑问,都是按照下图的流程来进行业务操作。
一般实现
# ...
SOME_LOCK_KEY = 'project.some_lock_key:{}'
def test_lock(args1)
lock_key = SOME_LOCK_KEY.format(args1) # 设定加锁参数
if redis_client.set(lock_key, 1, ex=10, nx=True):
try:
# ... 执行业务代码
except Exception as ex:
logger.error(traceback.format_exc())
raise e
finally:
redis_client.delete(lock_key)
else:
raise_user_exc()
需要注意,加锁时一般要设置过期时间(为了防止异常原因导致死锁),但这两个操作需要是原子操作,即需要保证只要上锁,必定会过期。
这里python中redis的set
方法可满足此条件。
装饰器实现
这里引入一个开源项目,来源未知,可对指定位置的参数上锁,简单易用,附上源码。
# -*- coding: utf-8 -*-
__all__ = ['RedisLocker']
import gevent
import md5
import functools
from gevent import Timeout
from uuid import uuid1
from zeus_core.redis_client import create_redis_client
class LockerExc(Exception):
def __init__(self, message):
super(LockerExc, self).__init__(message)
class _LockerContext(object):
"""
_LockerContext.
Inside a `_LockerContext`, `_acquire` will be called when enter the context,
i.e. before your code execute, and `_release` will be called when exit.
:param locker: RedisLocker object wraps public access APIs.
:param key: Key of redis lock.
:param token: Token of redis lock.
:param expr: Expiration time before the lock auto release.
:param nx: NX flag of redis, `nx=True` means `set` when not exists.
Returns an :py:class:`~LockerContext`.
"""
def __init__(
self,
locker,
key,
token,
expr,
nx=True):
self._locker = locker
self._key = key
self._token = token
self._expr = expr
self._nx = nx
def __enter__(self):
self._acquire()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._release()
def _acquire(self):
with Timeout(
self._locker.timeout,
LockerExc('acquire lock timeout')
):
retry_count = 1
while True:
if retry_count > self._locker.max_retry:
raise LockerExc('acquire lock exceed max retry')
if self._locker.redis.set(
self._key, self._token, ex=self._expr, nx=self._nx):
return 1
gevent.sleep(self._locker.retry_delay)
retry_count += 1
def _release(self):
token = self._locker.redis.get(self._key)
if token == self._token:
self._locker.redis.expire(self._key, 0)
return 1
class RedisLocker(object):
"""
RedisLocker.
:param redis_dsn: Redis DSN.
:param max_retry: Max tries until an redis lock acquired.
:param retry_delay: Intervals between requests, if the former request fails
to acquire the lock.
:param timeout: Timeout until the redis lock acquired.
Returns an :py:class:`~RedisLocker`.
RedisLocker is designed to acquire redis lock on function and function's
specific arguments.
Inside the code snippet below, the function `function_tobe_locked` will be
locked by its first two(index 0 and 1) arguments `bank_card_id` and
`withdraw`, until the function returns. Otherwise, any call with the same
function name, same bank_card_id and same withdraw will be blocked.
.. code:: python
>>> locker = RedisLocker('redis_dsn')
>>> @locker.lock_on_argument(3, 0, 1)
>>> def function_tobe_locked(bank_card_id, withdraw, *args):
>>> # biz logic here
>>>
"""
def __init__(
self,
redis_dsn,
max_retry=3,
retry_delay=2,
timeout=10):
self._redis = create_redis_client(redis_dsn)
self._max_retry = max_retry
self._retry_delay = retry_delay
self._timeout = timeout
def lock_on_argument(self, expr, *positions):
"""
:param expr: Expiration time for the lock.
:param positions: Indexes specifying the arguments which would be
assigned to create the arugments' signature for the locker.
Superficially `expr` could be assigned by any legal digits, however,
it's highly recommended to set `expr` greater than `self._timeout`
to avoid the situation that the function blocked, and redis lock
released automatically before the function return. Meanwhile, another
request could acquire the lock before its timeout, and the race
condition appears again.
"""
def _lock(func):
@functools.wraps(func)
def wraps(*args, **kwargs):
include_args = [args[pos] for pos in positions]
key, token = RedisLocker.init_key_token_pair(
func, *include_args)
with _LockerContext(self, key, token, expr):
return func(*args, **kwargs)
return wraps
return _lock
@staticmethod
def init_key_token_pair(func, *args):
args_signature = md5.new(
':'.join([str(ar) for ar in args])).hexdigest()
module_routes = func.__module__.split('.')
module_routes.append(func.__name__)
func_signature = md5.new(':'.join(module_routes)).hexdigest()
key = '{}:lock|{}'.format(
func_signature, args_signature)
# token = PkGen().pkgen().pk
token = str(uuid1()).replace('-', '')
return key, token
@property
def redis(self):
return self._redis
@property
def max_retry(self):
return int(self._max_retry)
@property
def retry_delay(self):
return int(self._retry_delay)
@property
def timeout(self):
return int(self._timeout)
(End)