解决windows下python3使用multiprocessing.Pool出现的问题

例如:

from multiprocessing import Pool

def f(x):
return x*x
pool = Pool(processes=4)
r=pool.map(f, range(100)) 
pool.close() 
pool.join() 

在spyder里运行直接没反应;在shell窗口里,直接报错,如下:

Process SpawnPoolWorker-15:
Traceback (most recent call last):
File "C:\Anaconda3\lib\multiprocessing\process.py", line 254, in _bootstr
self.run()
File "C:\Anaconda3\lib\multiprocessing\process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "C:\Anaconda3\lib\multiprocessing\pool.py", line 108, in worker
task = get()
File "C:\Anaconda3\lib\multiprocessing\queues.py", line 357, in get
return ForkingPickler.loads(res)
AttributeError: Can't get attribute 'f' on <module '__main__' (built-in)>

解决:

Windows下面的multiprocessing跟Linux下面略有不同,Linux下面基于fork,fork之后所有的本地变量都复制一份,因此可以使用任意的全局变量;在Windows下面,多进程是通过启动新进程完成的,所有的全局变量都是重新初始化的,在运行过程中动态生成、修改过的全局变量是不能使用的。

multiprocessing内部使用pickling传递map的参数到不同的进程,当传递一个函数或类时,pickling将函数或者类用所在模块+函数/类名的方式表示,如果对端的Python进程无法在对应的模块中找到相应的函数或者类,就会出错。

当你在Interactive Console当中创建函数的时候,这个函数是动态添加到__main__模块中的,在重新启动的新进程当中不存在,所以会出错。

当不在Console中,而是在独立Python文件中运行时,你会遇到另一个问题:由于你下面调用multiprocessing的代码没有保护,在新进程加载这个模块的时候会重新执行这段代码,创建出新的multiprocessing池,无限调用下去。

解决这个问题的方法是永远把实际执行功能的代码加入到带保护的区域中:if __name__ == '__mian__':

补充知识:multiprocessing Pool的异常处理问题

multiprocessing.Pool开发多进程程序时,在某个子进程执行函数使用了mysql-python连接数据库,

由于程序设计问题,没有捕获到所有异常,导致某个异常错误直接抛到Pool中,导致整个Pool挂了,其异常错误如下所示:

Exception in thread Thread-3:
Traceback (most recent call last):
 File "/usr/lib64/python2.7/threading.py", line 812, in __bootstrap_inner
 self.run()
 File "/usr/lib64/python2.7/threading.py", line 765, in run
 self.__target(*self.__args, **self.__kwargs)
 File "/usr/lib64/python2.7/multiprocessing/pool.py", line 376, in _handle_results
 task = get()
 File "/usr/lib/python2.7/site-packages/mysql/connector/errors.py", line 194, in __init__
 'msg': self.msg.encode('utf8') if PY2 else self.msg
AttributeError: ("'int' object has no attribute 'encode'", <class 'mysql.connector.errors.Error'>, 
(2055, "2055: Lost Connection to MySQL '192.169.36.189:3306', system error: timed out", None))

本文档基于以上问题对multiprocessing.Pool以及python-mysql-connector的源码实现进行分析,以定位具体的错误原因。解决方法其实很简单,不要让异常抛到Pool里就行。

问题产生场景

python 版本centos7.3自带的2.7.5版本,或者最新的python-2.7.14

mysql-connector库,版本是2.0及以上,可到官网下载最新版:mysql-connector

问题发生的code其实可以简化为如下所示:

from multiprocessing import Pool, log_to_stderr
import logging
import mysql.connector

# open multiprocessing lib log
log_to_stderr(level=logging.DEBUG)

def func():
 raise mysql.connector.Error("demo test", 100)

if __name__ == "__main__":
 p = Pool(3)
 res = p.apply_async(func)
 res.get()

所以解决问题很简单,在func里加个try-except就可以了。但是如果你好奇为什么为出现AttributeError的异常,那么可以继续往下看。

Multiprocessing.Pool的实现

通过查看源码,大致上multiprocess.Pool的实现如下图所示:

AttributeError: 'int' object has no attribute 'encode'

从上述错误日志中可以看到,表明在重构时msg参数传入了int类型变量。就是说在unpickle阶段,Mysql Error重新实例化时执行了__init__()方法,但是传参错误了。为了验证这一现象,我将MySql Error的__init__()进行简化,最终确认到self.args的赋值上,即Exception及其子类在unpickle时会调用__init__()方法,并将self.args作为参数列表传递给__init__()。

通过以下代码可以简单的验证问题:

import os
from multiprocessing import Pipe

class DemoError(Exception):

 def __init__(msg, errno):
  print "msg: %s, errno: %s" % (msg, errno)
  self.args = ("aa", "bb")

def func():
 raise DemoError("demo test", 100)

r, w = Pipe(duplex=False)
try:
 result = (True, func(1))
except Exception, e:
 result = (False, e)

print "send result"
w.send(result)
print "get result"
res = r.recv()
print "finished."

日志会在recv调用时打印 msg: aa, errno: bb,表明recv异常类Exception时会将self.args作为参数传入init()函数中。而Mysql的Error类重写self.args变量,而且顺序不对,导致msg在执行编码时出错。MySql Error的实现简化如下:

class Error(Exception):
 def __init__(self, msg=None, errno=None, values=None, sqlstate=None):
  super(Error, self).__init__()
  ...
  if self.msg and self.errno != -1:
   fields = {
    'errno': self.errno,
    'msg': self.msg.encode('utf-8') if PY2 else self.msg
   }
  ...
  self.args = (self.errno, self._full_msg, self.sqlstate)

可以看到,mysql Error中的self.args与__init__(msg, errno, values, sqlstate)的顺序不一,因此self.args第一个参数errno传给了msg,导致AttributeError。至于self.args是什么,简单查了下,是Exception类中定义的,一般用__str__或者__repr__方法的输出,python官方文档不建议overwrite。

总结

好吧,说了这么多,通过问题的追踪,我们也基本上了解清楚multiprocessing.Pool库的实现了。事实上,也很难说是谁的bug,是两者共同作用下出现的。不管如何,希望在用到multiprocessing库时,特别与Pipe相关时,谨慎点使用,最好的不要让异常跑到multiprocess中处理,应该在func中将所有的异常处理掉,如果有自己定于的异常类,请最好保证self.args的顺序与__init__()的顺序一致。同时,网上好像也听说使用multprocessing和subprocess库出现问题,或许也是这个异常抛出的问题,毕竟suprocessError定义与Exception好像有些区别。

以上这篇解决windows下python3使用multiprocessing.Pool出现的问题就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持来客网。