Multiprocessing with Python
时间:2009-04-06 来源:cobrawgl
In a previous article for IBM® developerWorks®, I demonstrated a simple and effective pattern for implementing threaded programming in Python. One downside of this approach, though, is that it won't always speed up your application, because the GIL (global interpreter lock) effectively limits threads to one core. If you need to use all of the cores on your machine, then typically you will need to fork processes, to increase speed. Dealing with a flock of processes can be a challenge, because if communication between processes is needed, it can often get complicated to coordinate all of the calls.
Fortunately, as of version 2.6, Python includes a module called "multiprocessing" to help you deal with processes. The API of the processing module has some similarities to the way the threading API works, but there are also few differences to keep in mind. One of the main differences is that processes have subtle underlying behavior that a high-level API will never be able to completely abstract away. You can read more about this in the official documentation for the multiprocessing module (see the Resources section).
There are some very distinct differences between how concurrency works with processes and threads. You can read more about these differences by reading the threading article I wrote for developerWorks (see the Resources section). When a process forks, the operating system creates a new child process with a new process ID, duplicating the state of the parent process (memory,environment variables, and more). To start out, before we get to actually using the processing module, take a look at a very basic fork in Python.
fork.py
#!/usr/bin/env python """A basic fork in action""" import os def my_fork(): child_pid = os.fork() if child_pid == 0: print "Child Process: PID# %s" % os.getpid() else: print "Parent Process: PID# %s" % os.getpid() if __name__ == "__main__": my_fork() |
Now take a look at the output:
mac% python fork.py Parent Process: PID# 5285 Child Process: PID# 5286 |
In the next example, enhance the original fork code and set an environmental variable that will then get copied into the child process. Here is the code:
Example 1. A fork in Python
#!/usr/bin/env python """A fork that demonstrates a copied environment""" import os from os import environ def my_fork(): environ['FOO']="baz" print "FOO environmental variable set to: %s" % environ['FOO'] environ['FOO']="bar" print "FOO environmental variable changed to: %s" % environ['FOO'] child_pid = os.fork() if child_pid == 0: print "Child Process: PID# %s" % os.getpid() print "Child FOO environmental variable == %s" % environ['FOO'] else: print "Parent Process: PID# %s" % os.getpid() print "Parent FOO environmental variable == %s" % environ['FOO'] if __name__ == "__main__": my_fork() |
Here is what that the output looks like:
mac% python env_fork.py FOO environmental variable set to: baz FOO environmental variable changed to: bar Parent Process: PID# 5333 Parent FOO environmental variable == bar Child Process: PID# 5334 Child FOO environmental variable == bar |
In the output, you can see that the "changed" environmental variable FOO stuck with the child processes, as well as the parent process. You could test this example even further by changing the environmental variable in the parent process again, and you would observe that the child is now completely separate, with a life of its own. Note that the subprocess module can also be used to fork processes, albeit in a less sophisticated way than the multiprocessing module.
Introduction to multiprocessing
Now that you have some of the basics out of the way with forking in Python, look at a simple example of how it works with the higher-level multiprocessing library. In this example, a fork still occurs, but much of the boilerplate work gets handled for us.
Example 2. Simple multiprocessing
#!/usr/bin/env python from multiprocessing import Process import os import time def sleeper(name, seconds): print 'starting child process with id: ', os.getpid() print 'parent process:', os.getppid() print 'sleeping for %s ' % seconds time.sleep(seconds) print "Done sleeping" if __name__ == '__main__': print "in parent process (id %s)" % os.getpid() p = Process(target=sleeper, args=('bob', 5)) p.start() print "in parent process after child process start" print "parent process about to join child process" p.join() print "in parent process after child process join" print "parent process exiting with id ", os.getpid() print "The parent's parent process:", os.getppid() |
If you look at the output, you get this:
mac% python simple.py in parent process (id 5245) in parent process after child process start parent process about to join child process starting child process with id: 5246 parent process: 5245 sleeping for 5 Done sleeping in parent process after child process join parent process exiting with id 5245 The parent's parent process: 5231 |
You can see that the main process forks a child process that then sleeps for five seconds. The dispatch of the child process occurs when p.start() gets called. This basic program flows scales up into a bigger program, as you will observe in the next section.
Building an asynchronous Net-SNMP engine
Up until now, you haven't built anything too useful. This next example solves a real-world problem by making the Python bindings for Net-SNMP asynchronous. By default, Net-SNMP will block on every call with Python. Using the multiprocessing library makes converting the Net-SNMP library into fully asynchronous operation relatively simple.
Before getting started, you need to check that you have a few things installed in order to use both the multiprocessing library with Python 2.6 and the Net-SNMP bindings:
- Download Python 2.6 and compile it for your operating system: Python 2.6 Download
- Adjust your shell path so that Python 2.6 launches when you type python. For example, if you compile Python to live in /usr/local/bin/, you will need to prepend your $PATH variable to make sure it comes before an older version of Python.
- Download and install setuptools: Setuptools
- Download Net-SNMP and configure it with the "--with-python-modules" flag, in addition to any other flags your Operating System needs (see the appropriate README file). ./configure --with-python-modules
What compiling Net-SNMP looks like:
--------------------------------------------------------- Net-SNMP configuration summary: --------------------------------------------------------- SNMP Versions Supported: 1 2c 3 Net-SNMP Version: 5.4.2.1 Building for: darwin9 Network transport support: Callback Unix TCP UDP SNMPv3 Security Modules: usm Agent MIB code: default_modules => snmpv3mibs mibII ucd_snmp notification notification-log-mib target agent_mibs agentx disman/event disman/schedule utilities Embedded Perl support: enabled SNMP Perl modules: building -- embeddable SNMP Python modules: building for /usr/local/bin//python Authentication support: MD5 SHA1 Encryption support: DES AES ]] |
Look at the code in the following module, and then you will run it.
Example 3. Multiprocess wrapper for Net-SNMP
#!/usr/bin/env python2.6 """ This is a multiprocessing wrapper for Net-SNMP. This makes a synchronous API asynchronous by combining it with Python2.6 """ import netsnmp from multiprocessing import Process, Queue, current_process class HostRecord(): """This creates a host record""" def __init__(self, hostname = None, query = None): self.hostname = hostname self.query = query class SnmpSession(): """A SNMP Session""" def __init__(self, oid = "sysDescr", Version = 2, DestHost = "localhost", Community = "public", Verbose = True, ): self.oid = oid self.Version = Version self.DestHost = DestHost self.Community = Community self.Verbose = Verbose self.var = netsnmp.Varbind(oid, 0) self.hostrec = HostRecord() self.hostrec.hostname = self.DestHost def query(self): """Creates SNMP query Fills out a Host Object and returns result """ try: result = netsnmp.snmpget(self.var, Version = self.Version, DestHost = self.DestHost, Community = self.Community) self.hostrec.query = result except Exception, err: if self.Verbose: print err self.hostrec.query = None finally: return self.hostrec def make_query(host): """This does the actual snmp query This is a bit fancy as it accepts both instances of SnmpSession and host/ip addresses. This allows a user to customize mass queries with subsets of different hostnames and community strings """ if isinstance(host,SnmpSession): return host.query() else: s = SnmpSession(DestHost=host) return s.query() # Function run by worker processes def worker(input, output): for func in iter(input.get, 'STOP'): result = make_query(func) output.put(result) def main(): """Runs everything""" #clients hosts = ["localhost", "localhost"] NUMBER_OF_PROCESSES = len(hosts) # Create queues task_queue = Queue() done_queue = Queue() #submit tasks for host in hosts: task_queue.put(host) #Start worker processes for i in range(NUMBER_OF_PROCESSES): Process(target=worker, args=(task_queue, done_queue)).start() # Get and print results print 'Unordered results:' for i in range(len(hosts)): print '\t', done_queue.get().query # Tell child processes to stop for i in range(NUMBER_OF_PROCESSES): task_queue.put('STOP') print "Stopping Process #%s" % i if __name__ == "__main__": main() |
There are two classes, a HostRecord class and an SnmpSession class. The SnmpSession class contains a method that actually performs a query using the SNMP library, Net-SNMP. Because that call will block normally, you then import the multiprocessing library and run it using Process. Additionally, pass in a task_queue and a done_queue, which serve as a way to synchronize and protect data coming into the process pool and out of the process pool. If you are familiar with threading, you will notice this is very similar to what you would do in a threading API.
Pay special attention to the hosts list, in the section #clients in the main function. Notice that you could potentially run asynchronous SNMP queries to 50 or 100 hosts, or more, depending on the hardware you are running on. The NUMBER_OF_PROCESSES variable is set rather simply by just taking the number of hosts in the hosts list. Finally, the last two sections grab the results from the queue as they are processed, and then put a "STOP" message into the queue that singles the processes they can die.
If you run the code on a OS X machine that has Net-SNMP listening, you get the following non-blocking output:
mac% time python multisnmp.py Unordered results: ('Darwin mac.local 9.6.0 Darwin Kernel Version 9.6.0: Mon Nov 24 17:37:00 PST 2008; root:xnu-1228.9.59~1/RELEASE_I386 i386',) ('Darwin mac.local 9.6.0 Darwin Kernel Version 9.6.0: Mon Nov 24 17:37:00 PST 2008; root:xnu-1228.9.59~1/RELEASE_I386 i386',) Stopping Process #0 Stopping Process #1 python multisnmp.py 0.18s user 0.08s system 107% cpu 0.236 total |
If you want configure OS X 's SNMP Daemon for testing for this article, you will need to do the following. First, rewrite the configuration file by using these three commands on the shell:
$ sudo cp /etc/snmp/snmpd.conf /etc/snmp/snmpd.conf.bak.testing $ sudo echo "rocommunity public" > /etc/snmp/snmpd.conf $ sudo snmpd |
This effectively backs up your configuration, makes a new configuration, and then restarts snmpd. The steps are similar on many UNIX platforms, except for step 3, which involves restarting snmpd, or sending it a HUP. If you want OS X to permanently run snmpd upon startup, you can edit this plist file to look like this:
/System/Library/LaunchDaemons/org.net-snmp.snmpd.plist
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd"> plist version="1.0"> <dict> <key>Disabled</key> <false/> <key>KeepAlive</key> <true/> <key>Label</key> <string>org.net-snmp.snmpd</string> <key>OnDemand</key> <false/> <key>Program</key> <string>/usr/sbin/snmpd</string> <key>ProgramArguments</key> <array> <string>snmpd</string> <string>-f</string> </array> <key>RunAtLoad</key> <true/> <key>ServiceIPC</key> <false/> </dict> </plist> |
If you want to test this out on a slew of machines you have sitting around, you could easily modify it by replacing the hosts line with something like this:
hosts = ["192.168.1.100", SnmpSession(DestHost="example.com", Community="mysecret"), "example.net", "example.org"] |
The worker function that runs the job will take both host names that are strings, and full SnmpSession objects.
As useful as the multiprocessing library is, you should take special note of the following items mentioned in the official documentation: Avoid shared state; it is often a good idea to explicitly join processes you create; try to avoid terminating processes with shared state; and finally make sure that all items in a queue have been removed before you join or a deadlock could occur. There is a much more detailed list of best practices in the official documentation, and it is recommended that you read the programming resources guide in the Resources section.
With those warnings out of the way, it is fair to say that multiprocessing is a very powerful new addition to the Python programming language. While the limitation of the GIL with threading was once thought to be a weakness, Python has more than made up for it by including an extremely powerful and flexible multiprocessing library. Thanks to David Goodger for doing a technical review of this article.