Multiprocessing Python

Содержание
Введение
Пример
Аргументы
Цикл
Более сложный пример
Доступ к переменным из процессов
Похожие статьи

Введение

В этой статье вы узнаете как запускать несколько процессов одновременно.

Официальная документация

multiprocessing - это пакет, который поддерживает запуск процессов с использованием API, аналогичного модулю threading. Пакет multiprocessing обеспечивает как локальный, так и удаленный параллелизм, эффективно обходя глобальную блокировку интерпретатора за счет использования подпроцессов вместо потоков. Благодаря этому модуль многопроцессорности позволяет программисту в полной мере использовать несколько процессоров на данном компьютере. Он работает как на POSIX, так и на Windows. Модуль многопроцессорной обработки также предоставляет API, не имеющие аналогов в модуле потоковой обработки. Ярким примером этого является объект Pool, который предлагает удобное средство распараллеливания выполнения функции с несколькими входными значениями, распределяя входные данные по процессам (параллелизм данных). Следующий пример демонстрирует общепринятую практику определения таких функций в модуле, чтобы дочерние процессы могли успешно импортировать этот модуль. Это базовый пример параллелизма данных с использованием Pool,

Простой пример

Рассмотрим функцию do_something()

def do_something(): print('Sleeping 1 second...') time.sleep(1) print('Done Sleeping...')

Допустим, нужно выполнить её два раза подряд. Сколько на это уйдёт времени?

Зависит от того как вызывать функцию. Если последовательно, то около двух секунд. Проверим.

import time def main(): start = time.perf_counter() do_something() do_something() finish = time.perf_counter() print(f'Finished in {round(finish-start, 2)} second(s)') def do_something(): print('Sleeping 1 second...') time.sleep(1) print('Done Sleeping...') if __name__ == "__main__": main()

python conseq.py

Sleeping 1 second... Done Sleeping... Sleeping 1 second... Done Sleeping... Finished in 2.0 second(s)

Действительно, ушло две секунды. Сократить это время поможет multiprocessing

import multiprocessing import time def main(): start = time.perf_counter() p1 = multiprocessing.Process(target=do_something) p2 = multiprocessing.Process(target=do_something) p1.start() p2.start() p1.join() p2.join() finish = time.perf_counter() print(f'Finished in {round(finish-start, 2)} second(s)') def do_something(): print('Sleeping 1 second...') time.sleep(1) print('Done Sleeping...') if __name__ == "__main__": main()

python multip.py

Sleeping 1 second... Sleeping 1 second... Done Sleeping... Done Sleeping... Finished in 1.01 second(s)

Потребовалось всего 1.01 секунда

Аргументы

Разберёмся как передавать аргументы. Пусть теперь функция do_something() спит не по одной секунде, а получает время сна как аргумент.

Сперва пример без мультипроцесса

import time def main(): start = time.perf_counter() a = do_something(3) print(a) b = do_something(2) print(b) finish = time.perf_counter() print(f'Finished in {round(finish-start, 2)} second(s)') def do_something(sleep_time=0) -> str: print(f'Sleeping {sleep_time} second...') time.sleep(sleep_time) print('Done Sleeping...') return f"Slept {sleep_time} second(s)" if __name__ == "__main__": main()

Sleeping 3 second... Done Sleeping... Slept 3 second(s) Sleeping 2 second... Done Sleeping... Slept 2 second(s) Finished in 5.0 second(s)

Передать аргументы в процесс можно с помощью args=(arg1, arg2…)

Обратите внимание на то что это кортеж - если аргумент один, он передаётся как (arg1,)

import multiprocessing import time def do_something(sleep_time=0) -> str: print(f'Sleeping {sleep_time} second(s)...') time.sleep(sleep_time) print(f'Done Sleeping {sleep_time} seconds(s)...') return f"Slept {sleep_time} second(s)" def main(): start = time.perf_counter() p1 = multiprocessing.Process(target=do_something, args=(3,)) p2 = multiprocessing.Process(target=do_something, args=(2,)) p1.start() p2.start() p1.join() p2.join() finish = time.perf_counter() print(f'Finished in {round(finish-start, 2)} second(s)') if __name__ == "__main__": main()

Sleeping 3 second(s)... Sleeping 2 second(s)... Done Sleeping 2 seconds(s)... Done Sleeping 3 seconds(s)... Finished in 3.1 second(s)

Цикл

Когда нужно запустить большое количество процессов одновременно, это можно сделать с помощью цикла. Например - for … in

В таргет можно передавать аргументы . Изменим функцию do_something() чтобы она принимала время и текст

import multiprocessing import time def main(): NUMBER_OF_TASKS = 10 p_list = [] start = time.perf_counter() for _ in range(0, NUMBER_OF_TASKS): sleep = 1 action = "Relaxing" p = multiprocessing.Process(target=do_something, args=(sleep, action)) p.start() p_list.append(p) for process_ in p_list: process_.join() finish = time.perf_counter() print(f'Finished in {round(finish-start, 2)} second(s)') def do_something(sleep: int, action: str) -> None: print(f'{action} {sleep} second...') time.sleep(sleep) print('Done Sleeping...')

python multip.py

Relaxing 1 second... Relaxing 1 second... Relaxing 1 second... Relaxing 1 second... Relaxing 1 second... Relaxing 1 second... Relaxing 1 second... Relaxing 1 second... Relaxing 1 second... Relaxing 1 second... Done Sleeping... Done Sleeping... Done Sleeping... Done Sleeping... Done Sleeping... Done Sleeping... Done Sleeping... Done Sleeping... Done Sleeping... Done Sleeping... Finished in 1.01 second(s)

На десять вызовов подряд потребовалось бы 10 секунд, а с multiprocessing по-прежнему хватает одной с небольшим.

Приближенный к жизни пример

import requests import multiprocessing import datetime import sys import urllib3 import time import subprocess urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) NUMBER_OF_TASKS = int(sys.argv[1]) p_list = [] def main(): main_start_time = datetime.datetime.utcnow() for _ in range(0, NUMBER_OF_TASKS): p = multiprocessing.Process(target=connect_to_url) p.start() p_list.append(p) for process_ in p_list: process_.join() main_finish_time = datetime.datetime.utcnow() tdelta = main_finish_time - main_start_time finish = time.perf_counter() with open("performance_data.txt", "a") as pf: pf.write( f"Started: {main_start_time}; " f"Nprocesses: {NUMBER_OF_TASKS }; " f"Elapsed time: {tdelta}; " f"Nerrors: {ERR_COUNT} \r\n" ) def connect_to_url(): url = 'https://192.168.0.2/authentication/api/oauth2/token' auth = ('secret_name','secret_code') payload = { 'grant_type': 'password', 'username': 'secret_username', 'password': 'secret_password', } r = requests.post(url, auth=auth, data=payload, verify=False) _status_code = r.status_code if _status_code != 200: print(_status_code) err_log_name = main_start_time + "_error_log.txt" with open("./logs/{err_log_name}", "a") as pf: pf.write(f"Error: {_status_code} \r\n") if __name__ == '__main__': main()

Возвращаемые значения

Доступ к переменным из каждого процесса можно организовать с помощью общей переменной.

В следующем примере будет создан список return_list в который смогут писать все процессы. Это делается с помощью multiprocessing.Manager()

Вместо использования return функция будет дописывать в этот список.

import multiprocessing import time def do_something(sleep_time, return_list): print(f'Sleeping {sleep_time} second...') time.sleep(sleep_time) print('Done Sleeping...') # return f"Slept {sleep_time} second(s)" return_list.append(f"Slept {sleep_time} second(s)") def main(): manager = multiprocessing.Manager() return_list = manager.list() start = time.perf_counter() p1 = multiprocessing.Process(target=do_something, args=(3, return_list)) p2 = multiprocessing.Process(target=do_something, args=(2, return_list)) p1.start() p2.start() p1.join() p2.join() finish = time.perf_counter() print(f'Finished in {round(finish-start, 2)} second(s)') print(return_list) print(return_list[0]) print(return_list[1]) if __name__ == "__main__": main()

Sleeping 3 second... Sleeping 2 second... Done Sleeping... Done Sleeping... Finished in 3.1 second(s) ['Slept 2 second(s)', 'Slept 3 second(s)'] Slept 2 second(s) Slept 3 second(s)

Общая переменная не обязана быть списком. Для доступа к элементам по ключу можно создать словарь , в этом случае потребуется ввести дополнительную переменную - ключ (proc_id)

import multiprocessing import time def do_something(proc_id, sleep_time, return_dict): time.sleep(sleep_time) return_dict[proc_id] = f"Process '{proc_id}' slept for {sleep_time} second(s)" def main(): manager = multiprocessing.Manager() return_dict = manager.dict() start = time.perf_counter() p1 = multiprocessing.Process(target=do_something, args=("first", 3, return_dict)) p2 = multiprocessing.Process(target=do_something, args=("middle", 1, return_dict)) p3 = multiprocessing.Process(target=do_something, args=("last", 2, return_dict)) p1.start() p2.start() p3.start() p1.join() p2.join() p3.join() finish = time.perf_counter() print(f'Finished in {round(finish-start, 2)} second(s)') print(return_dict.values()) print(return_dict["middle"]) if __name__ == "__main__": main()

Finished in 3.1 second(s) ["Process 'middle' slept for 1 second(s)", "Process 'last' slept for 2 second(s)", "Process 'first' slept for 3 second(s)"] Process 'middle' slept for 1 second(s)

Обратите внимание на то, что первая запись в словаре - это запись от процесса middle так как он спал меньше всех.

Вернёмся к примеру с циклом - с помощью random сделаем времена сна не одной секундой а псевдослучайными величинами и соберём их в список

import multiprocessing import time from random import randrange def main(): manager = multiprocessing.Manager() return_list = manager.list() NUMBER_OF_TASKS = 10 p_list = [] start = time.perf_counter() for _ in range(NUMBER_OF_TASKS): sleep = randrange(5) action = "Relaxing" p = multiprocessing.Process(target=do_something, args=(sleep, action, return_list)) p.start() p_list.append(p) for process_ in p_list: process_.join() finish = time.perf_counter() print(f'Finished in {round(finish-start, 2)} second(s)') print(return_list) def do_something(sleep: int, action: str, return_list: list) -> None: print(f'{action} {sleep} second(s)...') time.sleep(sleep) return_list.append(sleep) print(f'Done {action} {sleep} second(s)...') if __name__ == '__main__': main()

Relaxing 0 second(s)... Done Relaxing 0 second(s)... Relaxing 2 second(s)... Relaxing 4 second(s)... Relaxing 0 second(s)... Done Relaxing 0 second(s)... Relaxing 1 second(s)... Relaxing 2 second(s)... Relaxing 1 second(s)... Relaxing 0 second(s)... Done Relaxing 0 second(s)... Relaxing 2 second(s)... Relaxing 1 second(s)... Done Relaxing 1 second(s)... Done Relaxing 1 second(s)... Done Relaxing 1 second(s)... Done Relaxing 2 second(s)... Done Relaxing 2 second(s)... Done Relaxing 2 second(s)... Done Relaxing 4 second(s)... Finished in 4.14 second(s) [0, 0, 0, 1, 1, 1, 2, 2, 2, 4]

Обратите внимание на порядок сообщений и элементов списка.

Похожие статьи
C
C++
Go
Groovy
Java
JavaScript
PHP
Python
Ruby
.NET/C#
Thrift
Теория Программирования
asyncio

Поиск по сайту

Подпишитесь на Telegram канал @aofeed чтобы следить за выходом новых статей и обновлением старых

Перейти на канал

@aofeed

Задать вопрос в Телеграм-группе

@aofeedchat

Контакты и сотрудничество:
Рекомендую наш хостинг beget.ru
Пишите на info@urn.su если Вы:
1. Хотите написать статью для нашего сайта или перевести статью на свой родной язык.
2. Хотите разместить на сайте рекламу, подходящую по тематике.
3. Реклама на моём сайте имеет максимальный уровень цензуры. Если Вы увидели рекламный блок недопустимый для просмотра детьми школьного возраста, вызывающий шок или вводящий в заблуждение - пожалуйста свяжитесь с нами по электронной почте
4. Нашли на сайте ошибку, неточности, баг и т.д. ... .......
5. Статьи можно расшарить в соцсетях, нажав на иконку сети: