文章详情

  • 游戏榜单
  • 软件榜单
关闭导航
热搜榜
热门下载
热门标签
php爱好者> php文档> Python multiprocessing 关于Queue

Python multiprocessing 关于Queue

时间:2010-05-04  来源:oychw

原文:http://blog.ftofficer.com/2009/12/python-multiprocessing-3-about-queue/


继续讨论Python multiprocessing,这次讨论的主要内容是mp库的核心组件之一的Queue。

Queue是mp库当中用来提供多进程对象交换的方式。对象交换和上一部分 当中提到的对象共享都是使多个进程访问同一个对象的方式,两者的区别就是,对象共享是多个进程访问同一 个对象,对象交换则是将对象从一个进程传输的另一个进程。

multiprocessing当中的Queue使用方式和Python内置的threading.Queue对象很像,它支持一个put操作,将 对象放入Queue,也支持一个get操作,将对象从Queue当中读出。和threading.Queue不同的是,mp.Queue默认不支持 join()和task_done操作,这两个支持需要使用mp.JoinableQueue对象。

由于Queue对象负责进程之间的对象传输,因此第一个问题就是如何在两个进程之间共享这个Queue对象本身。在上一部分所言的三种共享方式当 中,Queue对象只能使用继承(inheritance)的方式共享。这是因为Queue本身基于unix的Pipe对象实现,而Pipe对象的共享需 要通过继承。因此,在一个典型的应用实现模型当中,应该是父进程创建Queue,然后创建子进程共享该Queue,由父进程和子进程分别读写。例如下面的 这个例子:

view plaincopy to clipboardprint?
  1. import multiprocessing  
  2.    
  3. q = multiprocessing.Queue()  
  4.    
  5. def reader_proc():  
  6.     print q.get()  
  7.    
  8. reader = multiprocessing.Process(target=reader_proc)  
  9. reader.start()  
  10.    
  11. q.put(100)  
  12. reader.join()  
import multiprocessing q = multiprocessing.Queue() def reader_proc(): print q.get() reader = multiprocessing.Process(target=reader_proc) reader.start() q.put(100) reader.join()

另一种实现方式是父进程创建Queue,创建多个子进程,有的子进程读Queue,有的子进程写Queue,例如:

view plaincopy to clipboardprint?
  1. import multiprocessing  
  2.    
  3. q = multiprocessing.Queue()  
  4.    
  5. def writer_proc():  
  6.     q.put(100)  
  7.    
  8. def reader_proc():  
  9.     print q.get()  
  10.    
  11. reader = multiprocessing.Process(target=reader_proc)  
  12. reader.start()  
  13. writer = multiprocessing.Process(target=writer_proc)  
  14. writer.start()  
  15.    
  16. reader.join()  
  17. writer.join()  
import multiprocessing q = multiprocessing.Queue() def writer_proc(): q.put(100) def reader_proc(): print q.get() reader = multiprocessing.Process(target=reader_proc) reader.start() writer = multiprocessing.Process(target=writer_proc) writer.start() reader.join() writer.join()

由于使用继承的方式共享Queue,因此代码当中并没有明显的传输Queue对象本身的代码,看起来似乎只要将multiprocessing当中 的对象换成threading当中的对象,程序仍然能够工作。反之,拿到一个现有的多线程程序,是不是将threading改成 multiprocessing就可以工作呢?也许可以,但是更可能的情况是你会遇到很多问题。

第一个问题就是mp的Queue需要考虑多进程之间的对象传输,因此所传输的对象必须是可以pickle的。否则,在Queue的put操作上会抛 出PicklingError。

其他的一些差异表现在一些技术细节上,这些不是任何高层逻辑可以抽象掉的,不知道这些差异会导致一些潜在的错误,例如死锁。在总结这些潜在的犯错的 可能的同时,我们会简单看一下mp当中Queue的实现方式,以便能够方便的理解为什么会有这样的行为。这些实现问题仅仅针对Linux,Windows 上面的实现和出现的问题在这里不涉及。

mp.Queue建构在系统的Pipe之上,但是实际上进程并不是直接将对象写入到Pipe里面,而是先写入一个本地的buffer,再由一个专门 的feed线程将其放入Pipe当中。读取端则是直接从Pipe当中读出对象。之所以有这样一个feed线程,是为了能够提供Queue接口函数所需要的 put的超时控制。但是由于这个feed线程的存在,mp.Queue提供了几个额外的函数来控制它,一个函数close来停止该线程,以及 join_thread来join该线程。close同时负责把所有在buffer当中的对象刷新到Pipe当中。

但是这个feed线程也是个麻烦制造者,为了保证所有被放入Queue的东西最终都能够到达另外一端的进程,mp库注册了一个atexit的处理函 数,用来在进程退出的时候自动close并且join该feed线程。这个join动作带来了很多问题,比如潜在的死锁。考虑下面一种状况:一个父进程创 建了两个子进程,一个子进程读,另一个子进程写。当需要停止这些进程的时候,父进程如果先把读进程结束,但是同时写进程已经将太多的对象写入Queue, 导致后继的对象等待在buffer当中,则这个进程将无法终止,因为atexit的处理函数等待把所有buffer当中的对象放入Pipe,但是Pipe 已经满了,然后陷入了死锁。

有人可能会问,那只要保证总是按照数据流的顺序来停止进程不就行。问题是在很多复杂的系统流程当中,可能存在一个环形的数据流,这种情况下,无论按 照什么顺序停止进程,终究有一个进程可能陷入这种情景当中。

幸运的是,Queue对象还提供了一个成员函数cancel_join_thread,这个函数可以使得在进程停止的时候不进行join操作,这样 可以避免死锁,代价就是这个时候尚未刷新到Pipe当中的对象都会丢失。鉴于即使调用了join_thread,残留在Pipe当中的对象仍然可能丢失, 所以一旦选择使用mp的Queue对象,就不要假设不会在流程当中丢对象了。

另外一个可能的方案是使用mp库当中的SimpleQueue对象。这个对象在文档当中没有提及,但是在 multiprocessing.queue模块当中有定义。这个对象就是去掉了buffer的Queue对象,因此可能 能 够避免上面说的问题的。但是SimpleQueue没有提供put和get的超时处理,两个动作都是阻塞的。

除了使用multiprocessing.Queue,还可以使用multiprocessing.Pipe进行通信。mp.Pipe是Queue 的底层结构,但是没有feed线程和put/get的超时控制。一定程度上和SimpleQueue很像。需要注意的是Pipe带有一个参数 duplex,当设置为True(默认)的时候,Pipe并不是使用系统的pipe来实现,而是通过socketpair,即Unix Domain Socket来实现。这个和pipe相比有些微的性能差异。

另外一个使用Queue的方式不是mp库内置的。这种方式使用上一篇文章当中提到的server process的方式来共享一个Queue对象。这个Queue对象实际上在server process当中,所有的子进程通过socket连接到server process获取该Queue的代理对象进行操作。说到这有人会想起来mp库有一个内置的SyncManager对象 ,可以通过multiprocess.Manager函数获取到,通过该对象的 Queue方法可以获取一个Queue的代理对象。不幸的是,这个方法不是正确的获取Queue的方式,原因正如上一篇文章所 说,SyncManager.Queue方法的每次调用获取到的是一个新建对象的代理对象,而不是一个共享对象。正确的使用server process当中的Queue的方式是:

共同部分:

view plaincopy to clipboardprint?
  1. import multiprocessing.managers as mpm  
  2. import Queue  
  3.    
  4. class SharedQueueManager(mpm.BaseManager): pass  
  5. q = Queue.Queue()  
  6. SharedQueueManager.register('Queue', lambda: q)  
import multiprocessing.managers as mpm import Queue class SharedQueueManager(mpm.BaseManager): pass q = Queue.Queue() SharedQueueManager.register('Queue', lambda: q)

服务进程:

view plaincopy to clipboardprint?
  1. mgr = SharedQueueManager(address=('', 12345))  
  2. server = mgr.get_server()  
  3. server.serve_forever()  
mgr = SharedQueueManager(address=('', 12345)) server = mgr.get_server() server.serve_forever()

客户进程:

view plaincopy to clipboardprint?
  1. mgr = SharedQueueManager(address=('localhost', 12345))  
  2. mgr.connect()  
  3. q = mgr.Queue() # 这里q就是共享的Queue对象的代理对象  
mgr = SharedQueueManager(address=('localhost', 12345)) mgr.connect() q = mgr.Queue() # 这里q就是共享的Queue对象的代理对象

这种方式比起mp库内置的Queue,有一些性能上的影响,因为毕竟牵涉到多次网络通讯,但是带来的好处是没有feed线程带来的一系列问题,而且 理论上不会存在丢数据的问题,除非server process崩溃。但是正如上一篇所说,server process本身就不是很靠谱的,因此这里也只是“理论上”不会丢数据而已。

说到性能,这里就列两个性能数据,以前在twitter上面提 到 过 的 (这两个连接无法访问的请联系我):

操作对象为 pickle后512字节的对象,通过proxy操作Queue的性能大约是7000次/秒(本机)或1100次/秒(多机),如果使用 multiprocessing.Queue,效率可达54000次/秒。

相关阅读 更多 +
排行榜 更多 +
辰域智控app

辰域智控app

系统工具 下载
网医联盟app

网医联盟app

运动健身 下载
汇丰汇选App

汇丰汇选App

金融理财 下载