原文出处:Making an event queue in python: a concurrency and modeling tutorial
概念
事件队列(An event queuer)是一种软件模块,可以接收不同的事件(event)并按顺序执行(通常是先来先服务)。
在Python的语境下,我们以function的循序执行为目标,假设我们可以这样使用这个event queue:
enqueue(someclass.somefunction)
很简单,对吧?
特性
然后我们列出我们所需要的特性: 1. 使用起来尽可能地简单(就像上面那样)。 2. 应该是线程安全的。 3. 可以返回所需的结果,包括exception。 4. 事件执行的实现方式不应该是轮询的(Busy waiting)。
根据列出的特性,为了尽可能地实现简单,我们可以先写出如下的框架:
class EventQueue:
def enqueue(self, '''function call''' highPriority = False):
pass
def stop(self, highPriority = False):
pass
其中当highPriority
为True
时,event会加入到queue的头部。
从stop
方法也有highPriority
参数可以看出,我们打算将stop行为包装成event放进队列中执行(而不是外部强制中断queue中event的执行)。
函数的执行
接下来我们解决这样的问题:如何将function包装成event?
思考一下function的组成,我们可以列出以下几个需要包装的部分: 1. 函数的引用(A function reference)。 2. 未命名的参数列表(unnamed parameters)。 3. 指定名称的参数列表(named parameters)。
例如,当我们要做这样的一个函数调用:
telephone.callPeople(bob, "483", showNumber = False, ifDiverted = telephone.ABORT)
我们可以包装成这样:
def execute(func, args = [], kwargs = {}):
func(*args, **kwargs)
execute(telephone.callPeople, [bob, "483"], {'showNumber':False, 'ifDiverted': telephone.ABORT})
分离关注点
结合上文所述,我们可以对整个事件队列处理过程中的关注点进行抽象,得到三种internal objects:
- Queue 负责event的入队和出队,也就是一个存储者event队列的容器。
- Results 返回结果的包装类。
- Runner 负责从Queue取出一个event,执行它,并将结果包装并返回,然后取出下一个event。
EventQueue的实现
首先,我们在EventQueue中维护着一个内部类(internal class)Queue。这样做的原因是我们想只暴露出跟业务相关的方法,把一些成员变量和方法隐藏起来。
class EventQueue:
def __init__(self):
self._queue = self.Queue()
def enqueue(self, func, args=[], kwargs={}, highPriority = False):
element = self.packCall(func, args, kwargs)
return self._queue.enqueue(element, highPriority)
def packCall(self, func, args, kwargs):
return (func, args, kwargs)
class Queue:
def __init__(self):
self._list = []
def enqueue(self, element, highPriority):
if highPriority:
self._list.insert(0, element)
else:
self._list.append(element)
return
def hasMore(self):
return len(self._list) > 0
def dequeue(self):
return self._list.pop(0)
注意到,Queue内部类只知道自己入队的是一个element,而不知道这个element究竟是什么。而EventQueue的packCall方法将调用一个函数用到的所有东西(可以理解成context)包装成一个元组(tuple),作为一个element进行入队。
处理并发
到目前为止,上述代码没有考虑在多线程环境中的使用。我们必须要为EventQueue添加一个保证线程安全的机制。
我们打算用Lock来实现。在Python中,Lock的一个实现是threading.lock()
,需要引入threading
包。
于是,我们访问Queue的过程可以简单地如下所述: 1. 获得Lock 2. 访问Queue 3. 释放Lock
也就是这样使用:
import threading
lock = threading.Lock()
lock.acquire()
'''access the shared resource'''
lock.release()
或者更方便地,可以这样写:
with lock:
'''access the shared resource'''
然后我们改进一下已有的代码:
class Queue:
def __init__(self):
self._list = []
self._lock = threading.Lock()
def enqueue(self
, element, highPriority):
with self._lock:
if highPriority:
self._list.insert(0, element)
else:
self._list.append(element)
def hasMore(self):
with self._lock:
return len(self._list) > 0
def dequeue(self):
with self._lock:
return self._list.pop(0)
简单而有效。而且这里也可以看出内部类的优势(保证线程安全的代码和业务逻辑代码分离)。