[译]用Python自制Event Queue

原文出处:Making an event queue in python: a concurrency and modeling tutorial

概念

事件队列(An event queuer)是一种软件模块,可以接收不同的事件(event)并按顺序执行(通常是先来先服务)。

在Python的语境下,我们以function的循序执行为目标,假设我们可以这样使用这个event queue:

enqueue(someclass.somefunction)

很简单,对吧?

特性

然后我们列出我们所需要的特性: 1. 使用起来尽可能地简单(就像上面那样)。 2. 应该是线程安全的。 3. 可以返回所需的结果,包括exception。 4. 事件执行的实现方式不应该是轮询的(Busy waiting)。

根据列出的特性,为了尽可能地实现简单,我们可以先写出如下的框架:

class EventQueue:
def enqueue(self, '''function call''' highPriority = False):
    pass
def stop(self, highPriority = False):
    pass

其中当highPriorityTrue时,event会加入到queue的头部。

stop方法也有highPriority参数可以看出,我们打算将stop行为包装成event放进队列中执行(而不是外部强制中断queue中event的执行)。

函数的执行

接下来我们解决这样的问题:如何将function包装成event?

思考一下function的组成,我们可以列出以下几个需要包装的部分: 1. 函数的引用(A function reference)。 2. 未命名的参数列表(unnamed parameters)。 3. 指定名称的参数列表(named parameters)。

例如,当我们要做这样的一个函数调用:

telephone.callPeople(bob, "483", showNumber = False, ifDiverted = telephone.ABORT)

我们可以包装成这样:

def execute(func, args = [], kwargs = {}):
func(*args, **kwargs)

execute(telephone.callPeople, [bob, "483"], {'showNumber':False, 'ifDiverted': telephone.ABORT})

分离关注点

结合上文所述,我们可以对整个事件队列处理过程中的关注点进行抽象,得到三种internal objects:

  • Queue 负责event的入队和出队,也就是一个存储者event队列的容器。
  • Results 返回结果的包装类。
  • Runner 负责从Queue取出一个event,执行它,并将结果包装并返回,然后取出下一个event。

EventQueue的实现

首先,我们在EventQueue中维护着一个内部类(internal class)Queue。这样做的原因是我们想只暴露出跟业务相关的方法,把一些成员变量和方法隐藏起来。

class EventQueue:
    def __init__(self):
        self._queue = self.Queue()

    def enqueue(self, func, args=[], kwargs={}, highPriority = False):
        element = self.packCall(func, args, kwargs)
        return self._queue.enqueue(element, highPriority)

    def packCall(self, func, args, kwargs):
        return (func, args, kwargs)

    class Queue:
        def __init__(self):
            self._list = []

        def enqueue(self, element, highPriority):
            if highPriority:
                self._list.insert(0, element)
            else:
                self._list.append(element)
            return

        def hasMore(self):
            return len(self._list) > 0

        def dequeue(self):
            return self._list.pop(0)

注意到,Queue内部类只知道自己入队的是一个element,而不知道这个element究竟是什么。而EventQueue的packCall方法将调用一个函数用到的所有东西(可以理解成context)包装成一个元组(tuple),作为一个element进行入队。

处理并发

到目前为止,上述代码没有考虑在多线程环境中的使用。我们必须要为EventQueue添加一个保证线程安全的机制。

我们打算用Lock来实现。在Python中,Lock的一个实现是threading.lock(),需要引入threading包。

于是,我们访问Queue的过程可以简单地如下所述: 1. 获得Lock 2. 访问Queue 3. 释放Lock

也就是这样使用:

import threading
lock = threading.Lock()
lock.acquire()
'''access the shared resource'''
lock.release()

或者更方便地,可以这样写:

with lock:
    '''access the shared resource'''

然后我们改进一下已有的代码:

class Queue:
    def __init__(self):
        self._list = []
        self._lock = threading.Lock()

    def enqueue(self, element, highPriority):
        with self._lock:
            if highPriority:
                self._list.insert(0, element)
            else:
                self._list.append(element)

    def hasMore(self):
        with self._lock:
            return len(self._list) > 0

    def dequeue(self):
        with self._lock:
            return self._list.pop(0)

简单而有效。而且这里也可以看出内部类的优势(保证线程安全的代码和业务逻辑代码分离)。

Result的实现