一、multiprocessing模块

python中的多线程无法利用多核优势,如果想要充分地使用多核cpu的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程。python提供了multiprocessing

multiprocessing 模块用来开启子进程。并在子进程中执行我们定制的任务(例如函数)。与多线程threading类似

 

multiprocessing 模块支持很多功能 :子进程、通信、共享数据,执行不同的同步,提供Process、Queue、Pipe、Lock 等组件

ps:与线程不同,进程没有任何共享状态,修改的数据(改动等等)仅限于该进程内。

 

二、Process 类的介绍

 创建进程的类:

Procss([group [, target [,name [, args [, kwargs ] ] ] ] ] ) ,由该类实例化得到对象,表示一个子进程中的人物(尚未启动)

ps:1. 需要指定关键字的方式来制定参数

  2. args 指定为传给 target 函数位置,是一个元组形式,必须有逗号

参数介绍:

1. group 参数 未使用 ,值始终为 None

2. target 表示 调用对象 , 即子进程要执行的任务(不加括号)

3. args 表示调用对象的位置参数元组,args = (x , y ,)

4. kwargs 表示调用对象的字典,kwargs={'x':1 , 'y':99}

5. name 为子进程的名称

方法介绍:

1. p.start() :启动进程,并调用该子进程的 p.run()

2. p.run() :进程启动时运行的方法 ,它去调用target指定的函数,自定义的类一定要实现该方法。

3. p.terminate() :强制终止进程p ,不会进行任何清理操作,如果p创建了子进程,该子进程就变成僵尸进程了,使用该方法需要小心这个情况,如果p还保存了一个锁 那么也将不会被释放,进而导致死锁

4. p.is_alive() : 如果p仍然运行,返回True 

5. p.join([timeout]) :主进程等待p终止 (ps:是主线程处于等 的状态,而p是处于运行的状态),timeout是可选的超时时间,需要强调的是,p.join 只能join 住 start 开启的进程,而不能join住 run开启的进程

属性介绍:

1. p.daemon : 默认为False , 如果设为True ,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设置完后,p不能创建自己的新进程,必须在p.start() 之前设置。

2. p.name :进程的名称

3. p.pid :进程的pid

4. p.exitcode :进程运行时为None、如果为-N ,表示被信号N结束(了解)

5. p.authkey :进程的身份验证键,默认是由 os.urandom()随机生成的32位字符串,这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同身份验证键时才算成功(了解)

 

三、Process类的使用

在windows中Process()必须放到# if __name__ == '__main__':下

创建并开启子进程的两种方式

\"\"\"\"
# 开启子进程的方式1:

from multiprocessing import Process
import time

def task(name):
    print('%s is running' %name)
    time.sleep(3)
    print('%s is done' %name)

# 在windows系统上开启子进程的操作必须放到该行代码下
if __name__ == '__main__':
    p=Process(target=task,args=('子进程',)) # Process(target=task,kwargs={'name':'子进程'}) #
    p.start() # 仅仅只是向操作系统发送一个创造子进程的信号
    print('')
方式一
\"\"\"\"
# 开启子进程的方式2:
from multiprocessing import Process
import time

class Myprocess(Process):
    def __init__(self,name):
        super().__init__()
        self.name=name

    def run(self):
        print('%s is running' %self.name)
        time.sleep(3)
        print('%s is done' %self.name)

# 在windows系统上开启子进程的操作必须放到该行代码下
if __name__ == '__main__':
    p=Myprocess('子进程')
    p.start() # 仅仅只是向操作系统发送一个创造子进程的信号,start会调用run方法
    p.join()
    print('')
方式二

 

进程之间的内存空间是隔离的

\"\"\"\"
from multiprocessing import Process
n=100 #在windows系统中应该把全局变量定义在if __name__ == '__main__'之上就可以了
def work():
    global n
    n=0
    print('子进程内: ',n)


if __name__ == '__main__':
    p=Process(target=work)
    p.start()
    print('主进程内: ',n)
View Code

 

将之前的基于TCP的套接字通信改为支持并发

\"\"\"\"
from socket import *
from multiprocessing import Process

FTPS=socket(AF_INET,SOCK_STREAM)
FTPS.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
FTPS.bind(('127.0.0.1',9999))
FTPS.listen(5)

def talk(conn):
    while True:
        try:
            msg=conn.recv(1024)
            if not msg :break
            conn.sendall(msg.upper())
        except Exception:
            break

if __name__ == '__main__':
    while True:
        conn,client_addr=FTPS.accept()
        p=Process(target=talk,args=(conn,))
        p.start()
服务端
\"\"\"\"
from socket import *

FTPC=socket(AF_INET,SOCK_STREAM)
FTPC.connect(('127.0.0.1',9999))

while True:
    cmd=input('>>:').strip()
    if not cmd:continue

    FTPC.sendall(cmd.encode('utf-8'))
    msg=FTPC.recv(1024)
    print(msg.decode('utf-8'))
客户端

这样做虽然可以实现并发的效果  但是有一个问题

每来一个客户端,都在服务端开启一个进程,如果并发来十万个客户端,要开启十万个进程吗
解决方法:进程池

 Process对象的join方法

\"\"\"\"
from multiprocessing import Process
import time
import random

class Task(Process):
    def __init__(self,name):
        super().__init__()
        self.name=name


    def run(self):
        print('%s is running' %self.name)
        time.sleep(random.randint(1,3))
        print('%s is ending' %self.name)

if __name__ == '__main__':
    p=Task('子进程')
    p.start()
    p.join(0.5) #等待p停止,等0.5秒就不再等了
    print('开始')
View Code
\"\"\"\"
from multiprocessing import Process
import time
import random
class Task(Process):
    def __init__(self,name):
        super().__init__()
        self.name=name


    def run(self):
        print('%s is running' %self.name)
        time.sleep(random.randint(1,3))
        print('%s is ending' %self.name)
if __name__ == '__main__':
    p_msg = []
    for p in range(4):
        p=Task('子进程%s'%p)
        p_msg.append(p)
        p.start()
    for p in p_msg:
        p.join()
#当然不是了,必须明确:p.join()是让谁等?
#很明显p.join()是让主线程等待p的结束,卡住的是主线程而绝非进程p,

#详细解析如下:
#进程只要start就会在开始运行了,所以p1-p4.start()时,系统中已经有四个并发的进程了
#而我们p1.join()是在等p1结束,没错p1只要不结束主线程就会一直卡在原地,这也是问题的关键
#join是让主线程等,而p1-p4仍然是并发执行的,p1.join的时候,其余p2,p3,p4仍然在运行,等#p1.join结束,可能p2,p3,p4早已经结束了,这样p2.join,p3.join.p4.join直接通过检测,无需等待
# 所以4个join花费的总时间仍然是耗费时间最长的那个进程运行的时间


    print('主线程')


#上述启动进程与join进程可以简写为
# p_l=[p1,p2,p3,p4]
#
# for p in p_l:
#     p.start()
#
# for p in p_l:
#     p.join()
p.join详细说明

process对象的其他方法或属性

\"\"\"\"
from multiprocessing import Process
import time
import random
class Myprocess(Process):
    def __init__(self,name):
        # self.name=name
        # super().__init__() #Process的__init__方法会执行self.name=Myprocess-1,
        #                    #所以加到这里,会覆盖我们的self.name=name

        #为我们开启的进程设置名字的做法
        super().__init__()
        self.name=name

    def run(self):
        print('%s is running' %self.name)
        time.sleep(random.randrange(1,3))
        print('%s is  ending' %self.name)
if __name__ == '__main__':

    p=Myprocess('子进程')
    p.start()
    print('开始')
    print(p.pid) #查看pid
name和pid
\"\"\"\"
#进程对象的其他方法一:terminate,is_alive
from multiprocessing import Process
import time
import random
class Myprocess(Process):
    def __init__(self,name):
        # self.name=name
        # super().__init__() #Process的__init__方法会执行self.name=Myprocess-1,
        #                    #所以加到这里,会覆盖我们的self.name=name

        #为我们开启的进程设置名字的做法
        super().__init__()
        self.name=name

    def run(self):
        print('%s is running' %self.name)
        time.sleep(random.randrange(1,3))
        print('%s is  ending' %self.name)
if __name__ == '__main__':

    p=Myprocess('子进程')
    p.start()

    p.terminate()#关闭进程,不会立即关闭,所以is_alive立刻查看的结果可能还是存活
    print(p.is_alive()) #结果为True

    print('开始')
    print(p.is_alive()) #结果为False
terminate和is_alive

 

僵尸进程与孤儿进程

 参考博客:http://www.cnblogs.com/Anker/p/3271773.html

\"\"\"\"
#总结僵尸进程和孤儿进程

#僵尸进程:有害
例如有个进程,它定期的产 生一个子进程,这个子进程需要做的事情很少,做完它该做的事情之后就退出了,因此这个子进程的生命周期很短,但是,父进程只管生成新的子进程,至于子进程 退出之后的事情,则一概不闻不问,这样,系统运行上一段时间之后,系统中就会存在很多的僵死进程,倘若用ps命令查看的话,就会看到很多状态为Z的进程。 严格地来说,僵死进程并不是问题的根源,罪魁祸首是产生出大量僵死进程的那个父进程。因此,当我们寻求如何消灭系统中大量的僵死进程时,答案就是把产生大 量僵死进程的那个元凶枪毙掉(也就是通过kill发送SIGTERM或者SIGKILL信号啦)。枪毙了元凶进程之后,它产生的僵死进程就变成了孤儿进 程,这些孤儿进程会被init进程接管,init进程会wait()这些孤儿进程,释放它们占用的系统进程表中的资源,这样,这些已经僵死的孤儿进程 就能瞑目而去了。

#孤儿进程无害:会被系统接管
总结孤儿和僵尸进程

 

四、守护进程

主进程创建守护进程

  其一:守护进程会在主进程代码执行结束后就终止

  其二:守护进程内无法再开启子进程,否则抛出异常:Asserti : daemonic processes are not allowed to have children

注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止

\"\"\"\"
from multiprocessing import Process
import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")
if __name__ == '__main__':
    p1=Process(target=foo)
    p1.daemon = True
    p1.start()
    print('主进程')
View Code
\"\"\"\"
#主进程代码运行完毕,守护进程就会结束
from multiprocessing import Process
from threading import Thread
import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")

if __name__ == '__main__':

    p1=Process(target=foo)
    p2=Process(target=bar)

    p1.daemon=True
    p1.start()
    p2.start()
    print("main-------") #打印该行则主进程代码结束,则守护进程p1应该被终止,可能会有p1任务执行的打印信息123,因为主进程打印main----时,p1也执行了,但是随即被终止
特殊的例子

 

五、互斥锁

进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,

而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理

\"\"\"\"
from multiprocessing import Process,Lock
import os,time

#互斥锁 可理解为生活中卫生间的锁

def work(mutex):
    # mutex.acquire() 抢锁操作
    print('%s is running' %os.getpid())
    time.sleep(2)
    print('%s is done' %os.getpid())
    # mutex.release()  #释放锁操作

if __name__ == '__main__':
    # mutex=Lock()    #多进程需加在main后
    for i in range(3):
        p=Process(target=work)#args=(mutex,)#传mutex
        p.start()
互斥锁mutex

加锁:由并发变成‘串行’,牺牲了运行效率,但保障了数据安全

例子:模拟12306抢票

\"\"\"\"
from multiprocessing import Process,Lock
import json
import time,random

def search(i):
    with open('db.json','rt',encoding='utf-8') as f: #db.json 文件 {"count": 1},表示余一张票
        dic=json.load(f)
        time.sleep(1)
        print('路人%s查看到剩余票数:%s' %(i,dic['count']))

def get(i):
    with open('db.json','rt',encoding='utf-8') as f:
        dic=json.load(f)
    if dic['count'] > 0:
        # 有票
        dic['count']-=1
        time.sleep(random.randint(1,3)) #关键在于此处 睡的同时所有子进程都读取到1张票
        with open('db.json','wt',encoding='utf-8') as f:
            json.dump(dic,f)
            print('路人%s抢票成功' % i)

    else:
        print('路人%s抢票失败' %i)


def task(i,mutex):
    search(i)
    # mutex.acquire()
    get(i)
    # mutex.release()

if __name__ == '__main__':
    mutex = Lock()

    for i in range(1,11):
        p=Process(target=task,args=(i,mutex))
        p.start()
        # p.join()

    print('主进程')
模拟12306抢票:互斥锁的应用

 

总结

#加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
虽然可以用文件共享数据实现进程间通信,但问题是:
1.效率低(共享数据基于文件,而文件是硬盘上的数据)
2.需要自己加锁处理



#因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。
1 队列和管道都是将数据存放于内存中
2 队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,
我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。

 

六、IPC机制

进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的

1、队列

创建队列的类(底层就是以管道和锁定的方式实现的)

  Queue(maxsize):创建共享的进程队列,Queue是多进程安全的队列

可以使用Queue实现多进程之间的数据传递。参数:maxsize 是队列中允许最大数,不写则无限制
from multiprocessing import Queue

 

主要方法:

\"\"\"\"
'''
multiprocessing模块支持进程间通信的两种主要形式:管道和队列
都是基于消息传递实现的,但是队列接口
'''

from multiprocessing import Process,Queue
import time
q=Queue(3)


#put ,get ,put_nowait,get_nowait,full,empty
q.put(3)
q.put(3)
q.put(3)
print(q.full()) #满了

print(q.get())
print(q.get())
print(q.get())
print(q.empty()) #空了
Queue 队列
\"\"\"\"
1 q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
2 q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
3  
4 q.get_nowait():同q.get(False)
5 q.put_nowait():同q.put(False)
6 
7 q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
8 q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
9 q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样
详细

2、管道(了解)

创建管道的类

Pipe(duplex)在进程之间创建一条管道,并返回元组(coon1,coon2)表示管道的两端

  ps:必须在产生process对象之前产生管道

参数:dumplex:默认管道是全双工的,如果将duplex设成False,conn1只能用于接收,conn2只能用于发送。

主要方法:

\"\"\"\"
    conn1.recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。
    conn1.send(obj):通过连接发送对象。obj是与序列化兼容的任意对象
 #其他方法:
conn1.close():关闭连接。如果conn1被垃圾回收,将自动调用此方法
conn1.fileno():返回连接使用的整数文件描述符
conn1.poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout射成None,操作将无限期地等待数据到达。
 
conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。
conn.send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收    
 
conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。
Pipe 管道
\"\"\"\"
from multiprocessing import Process,Pipe

import time,os
def consumer(p,name):
    left,right=p
    left.close()
    while True:
        try:
            baozi=right.recv()
            print('%s 收到包子:%s' %(name,baozi))
        except EOFError:
            right.close()
            break
def producer(seq,p):
    left,right=p
    right.close()
    for i in seq:
        left.send(i)
        # time.sleep(1)
    else:
        left.close()
if __name__ == '__main__':
    left,right=Pipe()

    c1=Process(target=consumer,args=((left,right),'c1'))
    c1.start()


    seq=(i for i in range(10))
    producer(seq,(left,right))

    right.close()
    left.close()

    c1.join()
    print('主进程')
Pipe 示例

七、生产者消费者模型

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

 特点:

  • 实现了 生产者 与 消费者 解耦合
  • 平衡了生产者的生产能力和消费者的消费能力

 为什么使用此模式:

当程序中存在明显的两类任务,一类负责造数据,另外一类负责处理数据,就可以用生产者消费者模型来实现
解耦和从而提升效率

如何实现

\"\"\"\"
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

基于队列实现生产者消费者模型
实现方法
生产者进程-------->queue<----------消费者进程
\"\"\"\"
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('33[45m%s 吃 %s33[0m' %(os.getpid(),res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('33[44m%s 生产了 %s33[0m' %(os.getpid(),res))

if __name__ == '__main__':
    q=Queue()
    #生产者们:即厨师们
    p1=Process(target=producer,args=(q,))

    #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,))

    #开始
    p1.start()
    c1.start()
    print('')
收藏 打印