简介

第一二部分在这里

Python asyncio 入门实战-1

Python asyncio 入门实战-2

之前我们已经学会了 Python asyncio 等基本概念和基本使用方法。

这一部分我们将着重学习 multiprocessing库的基本使用,进程池的作用,以及如何与 asyncio配合处理 CPU 密集型任务,还有如何在多进程任务间加锁以避免竟态条件等。

多进程

multiprocessing 入门

使用多进程处理一个 CPU 密集型任务

import time
from multiprocessing import Process


def count(count_to: int) -> int:
    start = time.time()
    counter = 0
    while counter < count_to:
        counter = counter + 1
    end = time.time()
    print(f'Finished counting to {count_to} in {end-start}')
    return counter


if __name__ == "__main__":
    start_time = time.time()

    to_one_hundred_million = Process(target=count, args=(100000000,))
    to_two_hundred_million = Process(target=count, args=(200000000,))

    to_one_hundred_million.start()
    to_two_hundred_million.start()

    to_one_hundred_million.join() # 等待主进程
    to_two_hundred_million.join() # 等待主进程

    end_time = time.time()
    print(f'Completed in {end_time-start_time}')

使用进程池

上面的示例,我们得不到函数的返回结果,还需要使用 start 和 join 来辅助多进程执行。

使用进程池方法,不需要使用这些

from multiprocessing import Pool


def say_hello(name: str) -> str:
    return f'Hi there, {name}'


if __name__ == "__main__":
    with Pool() as process_pool:  # A
        hi_jeff = process_pool.apply(say_hello, args=('Jeff',))  # B
        hi_john = process_pool.apply(say_hello, args=('John',))
        print(hi_jeff)
        print(hi_john)

但是这个方法也有一个很大的缺点就是,每个任务执行的时候是阻塞的,即如果每个任务如果都是耗时 10s,那么两个就是 20s ,并没有效率的提升,接下来我们使用 apply_async来解决这个问题。

from multiprocessing import Pool


def say_hello(name: str) -> str:
    return f'Hi there, {name}'


if __name__ == "__main__":
    with Pool() as process_pool:
        hi_jeff = process_pool.apply_async(say_hello, args=('Jeff',))
        hi_john = process_pool.apply_async(say_hello, args=('John',))
        print(hi_jeff.get())
        print(hi_john.get())

虽然上面的示例可以提升多任务效率,但是还是有个小问题,那就是如果 hi_jeff执行花费 10s 而 hi_john执行花费 1 s,那么 get方法会阻塞 10s 然后几乎同一时间返回两个结果。

没事,我们有办法解决这个问题,那就是进程池执行器。

进程池执行器

Python 的 concurrent.futures 里封装了进程池执行器和线程池执行器,让我们先学习进程池执行器。

简单应用

import time
from concurrent.futures import ProcessPoolExecutor


def count(count_to: int) -> int:
    start = time.time()
    counter = 0
    while counter < count_to:
        counter = counter + 1
    end = time.time()
    print(f'Finished counting to {count_to} in {end - start}')
    return counter


if __name__ == "__main__":
    with ProcessPoolExecutor() as process_pool:
        numbers = [1, 3, 5, 22, 100000000]
        for result in process_pool.map(count, numbers):
            print(result)

实例化 ProcessPoolExecutor对象会默认创建与系统相同核心数量的进程,map 方法类似 asyncio.as_completed方法

执行结果

Finished counting to 1 in 1.1920928955078125e-06
Finished counting to 3 in 9.5367431640625e-07
Finished counting to 5 in 7.152557373046875e-07
Finished counting to 22 in 1.1920928955078125e-06
1
3
5
22
Finished counting to 100000000 in 3.007368803024292
100000000

缺点是,它还是按顺序返回结果的,也就是说,如果高用时任务在前面,后面的任务返回结果还是阻塞的,在高用时任务返回后才会一起返回。

带异步事件循环的执行器

import asyncio
from asyncio.events import AbstractEventLoop
from concurrent.futures import ProcessPoolExecutor
from functools import partial
from typing import List


def countdown(count_from: int) -> int:
    counter = 0
    while counter < count_from:
        counter = counter + 1
    return counter


async def main():
    with ProcessPoolExecutor() as process_pool:
        loop: AbstractEventLoop = asyncio.get_running_loop()
        nums = [1, 3, 5, 22, 100000000]
        calls: List[partial[int]] = [partial(countdown, num) for num in nums]
        call_coros = []

        for call in calls:
            call_coros.append(loop.run_in_executor(process_pool, call))

        results = await asyncio.gather(*call_coros)

        for result in results:
            print(result)


if __name__ == "__main__":
    asyncio.run(main())

run_in_executor接收一个可调用对象,并且不能带参数。然后返回一个 awaitable 对象,可以被 asyncio 接管使用。例如上面的 gather 方法。

多进程中的锁

共享数据

在多进程编程中,每个进程通常有自己的内存空间,普通变量不能在进程之间共享。如果你想让多个进程共享某个变量(比如一个计数器、布尔值、状态标志等),就可以用 Value 来实现。

竞态条件

当一组操作的结果取决于哪个操作先完成时,就会出现竞态条件,例如

from multiprocessing import Process, Value

def increment_value(shared_value):
    # Increment the shared value by 1
    shared_value.value = shared_value.value + 1

if __name__ == "__main__":
    for _ in range(100):
        integer = Value('i', 0)
        procs = [
            Process(target=increment_value, args=(integer,)),
            Process(target=increment_value, args=(integer,))
        ]

        [p.start() for p in procs]
        [p.join() for p in procs]
        print(integer.value)
        print(integer.value == 2)

预期的结果是 2,但是最终的结果并不总是 2,这种就是竞态条件。那么为什么不总是 2 呢?原因是,进程 1 和进程 2,读取 value 值的时间是不一定的,可能 value 已经完成了+1 那么读取就是 1,如果两个进程读取 value 都是 0,那么最终结果就是 1。为了解决这个问题,我们需要加锁

使用锁进行同步

为了避免进程间的竞态条件,我们需要在进程间加锁,也就是说,当一个进程先读取到 value 的值的时候,加锁,这样其他的进程就会等待锁释放才能读取到 value 的值,避免了静态条件。

from multiprocessing import Process, Value

def increment_value(shared_value):
    # Increment the shared value by 1
    with shared_value.get_lock():  # Ensure thread-safe access
        shared_value.value += 1

if __name__ == "__main__":
    for _ in range(100):
        integer = Value('i', 0)
        procs = [
            Process(target=increment_value, args=(integer,)),
            Process(target=increment_value, args=(integer,))
        ]

        [p.start() for p in procs]
        [p.join() for p in procs]
        print(integer.value)
        print(integer.value == 2)

这可以修复净态条件,但可能降低应用程序性能。

进程池共享数据

from concurrent.futures import ProcessPoolExecutor
import asyncio
from multiprocessing import Value

shared_counter: Value

def init(counter: Value):
    """Initialize the shared counter."""
    global shared_counter
    shared_counter = counter

def increment_value():
    # Increment the shared value by 1
    with shared_counter.get_lock():  # Ensure thread-safe access
        shared_counter.value += 1

async def main():
    counter = Value('i', 0)  # Shared counter initialized to 0
    with ProcessPoolExecutor(initializer=init, initargs=(counter,)) as pool:
        await asyncio.get_running_loop().run_in_executor(pool, increment_value)
        print(counter.value)

我们不能直接在 init 函数中初始化 shared_value 的值为 Value(‘i’, 0) 那样会创建多个线程池,同时初始化多个 shared_value 值为 0 导致错误结果。

多进程,多时间循环

虽然 multiprocessing 模块主要用来解决 CPU 密集型任务,但是也可以为 IO 密集型任务带来性能提升,因为一些 IO 密集型任务中也存在大量的 CPU 运算。

感兴趣的可自行 google 或者 chatGPT 和 Deepseek😅

或者看《Python asyncio 并发编程》这本书的第 6 章节。