多进程 python_Python 多进程

多进程 python_Python 多进程进程前置知识点进程:一个程序运行起来后,代码+用到的资源称之为进程,它是操作系统分配资源的基本单元。并发:指的是任务数多余cpu核数,通过操作系统的各种任务调度算法,实现用多个任务“一起”执行

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全系列IDE使用 1年只要46元 售后保障 童叟无欺

进程

 

前置知识点

  • 进程:一个程序运行起来后,代码+用到的资源 称之为进程,它是操作系统分配资源的基本单元。
  • 并发:指的是任务数多余cpu核数,通过操作系统的各种任务调度算法,实现用多个任务“一起”执行(实际上总有一些任务不在执行,因为切换任务的速度相当快,看上去一起执行而已)
  • 并行:指的是任务数小于等于cpu核数,即任务真的是一起执行的
     

进程的创建

multiprocessing模块就是跨平台版本的多进程模块,提供了一个Process类来代表一个进程对象,这个对象可以理解为是一个独立的进程,可以执行另外的事情

from multiprocessing import Process
import os


# 子进程要执行的代码
def run_proc(name):
    print('启动子进程{}{}'.format(name, os.getpid()))


if __name__ == '__main__':
    print('父进程{}'.format(os.getpid()))
    p = Process(target=run_proc, args=('test',))
    print('子进程将要启动')
    p.start()
    p.join()
    print('子进程结束')

 

进程pid

from multiprocessing import Process
import os
import time


def run_proc():
    """子进程要执行的代码"""
    print('子进程运行中,pid=%d...' % os.getpid())  # os.getpid获取当前进程的进程号
    print('子进程将要结束...')


if __name__ == '__main__':
    print('父进程pid: %d' % os.getpid())  # os.getpid获取当前进程的进程号
    p = Process(target=run_proc)
    p.start()

>>> 父进程pid: 3580
>>> 子进程运行中,pid=3581...
>>> 子进程将要结束...

 

Process语法结构

Process([group [, target [, name [, args [, kwargs]]]]])
  • target:如果传递了函数的引用,可以任务这个子进程就执行这里的代码(常用)
  • args:给target指定的函数传递的参数,以元组的方式传递(常用)
  • kwargs:给target指定的函数传递命名参数
  • name:给进程设定一个名字,可以不设定
  • group:指定进程组,大多数情况下用不到

Process创建的实例对象的常用方法:

  • start():启动子进程实例(创建子进程)
  • is_alive():判断进程子进程是否还在活着
  • join([timeout]):是否等待子进程执行结束,或等待多少秒
  • terminate():不管任务是否完成,立即终止子进程

Process创建的实例对象的常用属性:

  • name:当前进程的别名,默认为Process-N,N为从1开始递增的整数
  • pid:当前进程的pid(进程号)
     

给子进程指定的函数传递参数

from multiprocessing import Process
import os
from time import sleep


def run_proc(name, age, **kwargs):
    for i in range(10):
        print('子进程运行中,name= %s,age=%d ,pid=%d...' % (name, age, os.getpid()))
        print(kwargs)
        sleep(0.2)

if __name__=='__main__':
    p = Process(target=run_proc, args=('test',18), kwargs={"m":20})
    p.start()
    sleep(1)  # 1秒中之后,立即结束子进程
    p.terminate()
    p.join()

>>> 子进程运行中,name= test,age=18 ,pid=3593...
>>> {'m': 20}
>>> 子进程运行中,name= test,age=18 ,pid=3593...
>>> {'m': 20}
>>> 子进程运行中,name= test,age=18 ,pid=3593...
>>> {'m': 20}
>>> 子进程运行中,name= test,age=18 ,pid=3593...
>>> {'m': 20}
>>> 子进程运行中,name= test,age=18 ,pid=3593...
>>> {'m': 20}

 

进程和线程的区别

  • 进程是资源调度的基本单位,而线程是程序执行的基本单位
  • 不同进程的地址空间是独立的,而同一进程中的线程之间共享
  • 进程之间通信必须使用操作系统提供的进程间通信机制,同一进程中的各线程可以直接通信
  • 多进程之间可以并发执行,多线程之间也可以并发执行
  • 线程切换的开销要比进程切换的开销小
     

进程间通信

如果两个进程之间需要通信,则需要用到Queue类,相当于队列
 

初始化Queue()对象

q = Queue()

括号中可以指定最大可接受的消息数量,若不指定,则默认代表消息数量没有上限
 

Queue()类的方法

Queue有多个方法,下面介绍几个常用的方法
 

Queue.qsize()

返回当前队列包含的消息数量;
 

Queue.empty()

判断队列是否为空,如果队列为空,返回True,反之False
 

Queue.full()

判断队列是否满了,如果队列满了,返回True,反之False
 

Queue.get([block[, timeout]])

获取队列中的一条消息,然后将其从列队中移除,block默认值为True;
block=True的情况
如果block=True,且没有设置timeout(单位秒),消息列队如果为空,此时程序将被阻塞(停在读取状态),直到从消息列队读到消息为止
如果设置了timeout,则会等待timeout秒,若还没读取到任何消息,则抛出Queue.Empty异常;
block=False的情况
如果block=False,消息列队如果为空,则会立刻抛出Queue.Empty异常;
 

Queue.get_nowait()

相当Queue.get(False)
 

Queue.put(item,[block[, timeout]])

将item消息写入队列,block默认值为True
block=True的情况
如果block=True,且没有设置timeout(单位秒),消息列队如果已经没有空间可写入,此时程序将被阻塞(停在写入状态),直到从消息列队腾出空间为止
如果设置了timeout,则会等待timeout秒,若还没空间,则抛出Queue.Full异常;
block=False的情况
如果block=False,消息列队如果没有空间可写入,则会立刻抛出Queue.Full异常;
 

Queue.put_nowait(item)

相当Queue.put(item, False)
 

我们以Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:

from multiprocessing import Process, Queue
import os, time, random


# 写数据进程执行的代码:
def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())


# 读数据进程执行的代码:
def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)


if __name__ == '__main__':
    # 父进程创建Queue,并传给各个子进程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw,写入:
    pw.start()
    # 启动子进程pr,读取:
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr进程里是死循环,无法等待其结束,只能强行终止:
    pr.terminate()

 

进程池

当需要创建的子进程数量不多时,可以直接利用multiprocessing中的Process动态成生多个进程,但如果是上百甚至上千个目标,手动的去创建进程的工作量巨大,此时就可以用到multiprocessing模块提供的Pool方法。

初始化Pool时,可以指定一个最大进程数,当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会用之前的进程来执行新的任务,请看下面的实例:

"""
如果要启动大量的子进程,可以用进程池的方式批量创建子进程:
"""
from multiprocessing import Pool
import os, time, random


def long_time_task(name):
    print('运行任务 %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('任务 %s 运行 %0.2f 秒' % (name, (end - start)))


if __name__ == '__main__':
    print('父进程 %s.' % os.getpid())
    p = Pool(4)  # 创建进程池中最多存4个子进程
    for i in range(5):
          # Pool().apply_async(要调用的目标,(传递给目标的参数元祖,))
          # 每次循环将会用空闲出来的子进程去调用目标
        p.apply_async(long_time_task, args=(i,))
    print('等待所有子进程完成...')
    p.close()  # 关闭进程池,关闭后po不再接收新的请求
    p.join()  # 等待po中所有子进程执行完成,必须放在close语句之后
    print('所有子进程完成.')
# 运行结果
>>> 等待所有子进程完成...
>>> 运行任务 0 (3722)...
>>> 运行任务 1 (3723)...
>>> 运行任务 2 (3724)...
>>> 运行任务 3 (3725)...
>>> 任务 3 运行 0.67 秒
>>> 运行任务 4 (3725)...
>>> 任务 2 运行 1.29 秒
>>> 任务 0 运行 2.00 秒
>>> 任务 1 运行 2.77 秒
>>> 任务 4 运行 2.31 秒
>>> 所有子进程完成.

 

multiprocessing.Pool常用函数解析:

  • apply_async(func[, args[, kwds]]) :使用非阻塞方式调用func(并行执行,堵塞方式必须等待上一个进程退出才能执行下一个进程),args为传递给func的参数列表,kwds为传递给func的关键字参数列表;
  • close():关闭Pool,使其不再接受新的任务;
  • terminate():不管任务是否完成,立即终止;
  • join():主进程阻塞,等待子进程的退出, 必须在close或terminate之后使用;
     

代码解读:

Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。
请注意输出的结果,task 0,1,2,3是立刻执行的,而task 4要等待前面某个task完成后才执行,这是因为Pool的默认大小在我的电脑上是4,因此,最多同时执行4个进程。这是Pool有意设计的限制,并不是操作系统的限制。如果改成:

p = Pool(5)

就可以同时跑5个进程。
由于Pool的默认大小是CPU的核数,如果你不幸拥有8核CPU,你要提交至少9个子进程才能看到上面的等待效果。
 

进程池中的Queue

如果要使用Pool创建进程,就需要使用multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue(),否则会得到一条如下的错误信息:

RuntimeError: Queue objects should only be shared between processes through inheritance.

# 修改import中的Queue为Manager
from multiprocessing import Manager,Pool
import os,time,random

def reader(q):
    print("reader启动(%s),父进程为(%s)" % (os.getpid(), os.getppid()))
    for i in range(q.qsize()):
        print("reader从Queue获取到消息:%s" % q.get(True))

def writer(q):
    print("writer启动(%s),父进程为(%s)" % (os.getpid(), os.getppid()))
    for i in "itcast":
        q.put(i)

if __name__=="__main__":
    print("(%s) start" % os.getpid())
    q = Manager().Queue()  # 使用Manager中的Queue
    po = Pool()
    po.apply_async(writer, (q,))

    time.sleep(1)  # 先让上面的任务向Queue存入数据,然后再让下面的任务开始从中取数据

    po.apply_async(reader, (q,))
    po.close()
    po.join()
    print("(%s) End" % os.getpid())
>>> (4157) start
>>> writer启动(4159),父进程为(4157)
>>> reader启动(4160),父进程为(4157)
>>> reader从Queue获取到消息:i  
>>> reader从Queue获取到消息:t
>>> reader从Queue获取到消息:c
>>> reader从Queue获取到消息:a
>>> reader从Queue获取到消息:s
>>> reader从Queue获取到消息:t
>>> (4157) End
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/165494.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • ideaIU-2021.12.13 激活码【中文破解版】

    (ideaIU-2021.12.13 激活码)最近有小伙伴私信我,问我这边有没有免费的intellijIdea的激活码,然后我将全栈君台教程分享给他了。激活成功之后他一直表示感谢,哈哈~IntelliJ2021最新激活注册码,破解教程可免费永久激活,亲测有效,下面是详细链接哦~https://javaforall.net/100143.html…

    2022年3月30日
    54
  • CSDN社区内容创作规范

    CSDN长久以来秉持初心,致力于为广大用户提供良好的创作环境,打造健康有序的技术生态!但良好的社区环境,需各位创作者与CSDN共同维护建立!【CSDN内容创作规范】请在发文前认真阅读:如你发布的内容存在以下问题,文章将无法通过审核,违规情节严重的,将对帐号进行封号处理。请各位创作者严格遵守社区的内容创作规范,共同守护我们的社区环境!目录一、在平台发布以下相关内容审核将不予通过1、违反法律法规和相关政策2、无资质发布专业领域内容3、流量作弊4、营销/推广引流5、不文明用语6、

    2022年4月8日
    105
  • 数据仓库(五)元数据管理

    数据仓库(五)元数据管理概述 元数据通常定义为 关于数据的数据 在数据仓库中是定义和描述 DW BI 系统的结构 操作和内容的所有信息 元数据贯穿了数据仓库的整个生命周期 使用元数据驱动数据仓库的开发 使数据仓库自动化 可视化 nbsp 元数据类型 nbsp nbsp nbsp nbsp nbsp nbsp nbsp nbsp nbsp nbsp nbsp nbsp nbsp nbsp nbsp nbsp nbsp nbsp nbsp nbsp nbsp nbsp nbsp nbsp nbsp nbsp nbsp nbsp nbsp 1 业务元数据 nbsp 业务元数据指从业务角度描述业务

    2025年10月19日
    2
  • 修改apache服务器端口号_apache修改端口号

    修改apache服务器端口号_apache修改端口号一、修改/etc/httpd/conf/httpd.conf文件中的监听端口号Listen80把80修改成需要的号,如8000,即Listen8000二、查看SELinux下http相关端口#semanageport-l|grephttphttp_cache_port_ttcp3128,8080,8118,10001-10010http_cache_po

    2025年11月13日
    7
  • 对话

    对话

    2021年5月4日
    110
  • phpMyAdmin安装教程[通俗易懂]

    phpMyAdmin安装教程[通俗易懂]phpmyadmin是一款mysql数据库管理工具,是由php编写的,可以通过互联网控制和操作mysql,通过phpmyadmin可以完全对数据库进行操作,例如建立、复制/删除数据等等。可以管理整个MySQL服务器(需要超级用户),也可以管理单个数据库,为了实现后一种,你将需要合理设置MySQL用户,他只能对允许的数据库进行读/写,那要等到你看过MySQL手册中相关的部分。

    2022年6月1日
    50

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号