并发模式:主动对象(并发.主动.对象.模式...)

wufei1232025-01-08python11
介绍

主动对象模式是一种并发设计模式,它将方法执行方法调用解耦。此模式的主要目标是通过在单独的线程中执行操作来引入异步行为,同时向客户端提供同步接口。这是通过消息传递、请求队列和调度机制的组合来实现的。

关键部件
  1. proxy:代表客户端的公共接口。更简单地说,这就是客户端将要交互的内容。它将方法调用转换为对活动对象的请求。
  2. 调度器:管理请求队列并确定请求执行的顺序。
  3. servant:包含被调用方法的实际实现。这就是实际计算逻辑的所在。
  4. 激活队列:存储来自代理的请求,直到调度程序处理它们。
  5. future/callback:异步计算结果的占位符。
工作流程
  1. 客户端调用代理上的方法。
  2. 代理创建请求并将其放入激活队列中。
  3. 调度程序接收请求并将其转发给servant执行。
  4. 结果通过 future 对象返回给客户端。
使用案例
  • 需要可预测执行模式的实时系统。
  • gui 应用程序保持主线程响应。
  • 用于处理异步请求的分布式系统。
执行

假设我们需要进行计算,可能是 api 调用、数据库查询等。我不会实现任何异常处理,因为我太懒了。

def compute(x, y):
    time.sleep(2)  # some time taking task
    return x + y
没有活动对象模式

下面是我们如何在不使用主动对象模式的情况下处理并发请求的示例。

import threading
import time


def main():
    # start threads directly
    results = {}

    def worker(task_id, x, y):
        results[task_id] = compute(x, y)

    print("submitting tasks...")
    thread1 = threading.thread(target=worker, args=(1, 5, 10))
    thread2 = threading.thread(target=worker, args=(2, 15, 20))

    thread1.start()
    thread2.start()

    print("doing other work...")

    thread1.join()
    thread2.join()

    # retrieve results
    print("result 1:", results[1])
    print("result 2:", results[2])


if __name__ == "__main__":
    main()
上述方法的缺点
  • 线程管理:直接管理线程会增加复杂性,尤其是随着任务数量的增加。

  • 缺乏抽象:客户端负责管理线程的生命周期,将任务管理与业务逻辑耦合。

  • 可扩展性问题:如果没有适当的队列或调度机制,就无法控制任务执行顺序。

  • 响应能力有限:客户端必须等待线程加入才能访问结果。

使用主动对象模式实现

下面是主动对象模式的 python 实现,使用线程和队列来执行与上面相同的操作。我们将一一介绍每个部分:

methodrequest: 封装方法、参数和用于存储结果的 future。

class methodrequest:
    def __init__(self, method, args, kwargs, future):
        self.method = method
        self.args = args
        self.kwargs = kwargs
        self.future = future

    def execute(self):
        try:
            result = self.method(*self.args, **self.kwargs)
            self.future.set_result(result)
        except exception as e:
            self.future.set_exception(e)

调度程序:在单独的线程中持续处理来自activation_queue的请求。

import threading
import queue


class scheduler(threading.thread):
    def __init__(self):
        super().__init__()
        self.activation_queue = queue.queue()
        self._stop_event = threading.event()

    def enqueue(self, request):
        self.activation_queue.put(request)

    def run(self):
        while not self._stop_event.is_set():
            try:
                request = self.activation_queue.get(timeout=0.1)
                request.execute()
            except queue.empty:
                continue

    def stop(self):
        self._stop_event.set()
        self.join()

servant:实现实际逻辑(例如,计算方法)。

import time


class servant:
    def compute(self, x, y):
        time.sleep(2)
        return x + y

proxy:将方法调用转换为请求并返回结果的 future。

from concurrent.futures import future


class proxy:
    def __init__(self, servant, scheduler):
        self.servant = servant
        self.scheduler = scheduler

    def compute(self, x, y):
        future = future()
        request = methodrequest(self.servant.compute, (x, y), {}, future)
        self.scheduler.enqueue(request)
        return future

客户端:异步提交任务并在需要时检索结果。

def main():
    # Initialize components
    scheduler = Scheduler()
    scheduler.start()

    servant = Servant()
    proxy = Proxy(servant, scheduler)

    # Client makes an asynchronous call
    print("Submitting tasks...")
    future1 = proxy.compute(5, 10)
    future2 = proxy.compute(15, 20)

    # Perform other tasks while computation is ongoing
    print("Doing other work...")

    # Retrieve results
    print("Result 1:", future1.result())
    print("Result 2:", future2.result())

    # Shutdown scheduler
    scheduler.stop()

if __name__ == "__main__":
    main()
优点
  • 解耦接口:客户端可以调用方法而无需担心执行细节。
  • 响应性:异步执行确保客户端保持响应。
  • 可扩展性:支持多个并发请求。
缺点
  • 复杂性:增加架构复杂性。
  • 开销:需要额外的资源来管理线程和队列。
  • 延迟:异步处理可能会引入额外的延迟。
结论

主动对象模式是用于管理多线程环境中的异步操作的强大工具。通过将方法调用与执行分离,它可以确保更好的响应能力、可扩展性和更干净的代码库。虽然它具有一定的复杂性和潜在的性能开销,但它的好处使其成为需要高并发和可预测执行的场景的绝佳选择。然而,它的使用取决于当前的具体问题。与大多数模式和算法一样,不存在一刀切的解决方案。

参考

维基百科 - 活动对象

以上就是并发模式:主动对象的详细内容,更多请关注知识资源分享宝库其它相关文章!

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。