Как python встраивать в тестирующую систему executor
Перейти к содержимому

Как python встраивать в тестирующую систему executor

  • автор:

Как запустить executor тг бота в асинхронной функции? Python, aiogram

Мне нужно запустить цикл, я знаю, что есть функция dp.start_polling(), но мне нужно скипать апдейты, которые могут приходить во время отключения бота. Это можно прописать в executor.start_polling(skip_updates=True), но у меня не получается запустить executor в асинхронной функции. Как можно это сделать? Помогите, пожалуйста

Отслеживать
задан 17 июл 2023 в 18:35
95 8 8 бронзовых знаков

1 ответ 1

Сортировка: Сброс на вариант по умолчанию

Можно использовать on_startup параметр в executor.start_polling() для запуска асинхронной функции при старте бота, а затем запустить вашу задачу с помощью метода create_task библиотеки asyncio

async def my_func(): while True: print('Task is running') await asyncio.sleep(1) async def on_startup(dp): asyncio.create_task(my_func()) if __name__ == '__main__': executor.start_polling(dp, skip_updates=True, on_startup=on_startup) 

В этом примере, мы создаем асинхронную функцию my_func() , которая будет выполняться в цикле, и используем on_startup параметр в executor.start_polling() для запуска этой функции при старте бота. Затем мы используем метод create_task библиотеки asyncio для запуска задачи. Надеюсь, это поможет вам запустить асинхронный цикл с пропуском обновлений в aiogram!

Эффективная многопоточность в Python

Хочу поделиться простым рецептом, как можно эффективно выполнять большое число http-запросов и других задач ввода-вывода из обычного Питона. Самое правильное, что можно было бы сделать — использовать асинхронные фреймворки вроде Торнадо или gevent. Но иногда этот вариант не подходит, потому что встроить event loop в уже существующий проект проблематично.

В моем случае уже существовало Django-приложение, из которого примерно раз в месяц нужно было выгрузить немного очень мелких файлов на AWS s3. Шло время, количество файлов стало приближаться к 50 тысячам, и выгружать их по очереди стало утомительным. Как известно, s3 не поддерживает множественное обновление за один PUT-запрос, а установленная опытным путем максимальная скорость запросов с сервера ec2 в том же датацентре не превышает 17 в секунду (что очень не мало, кстати). Таким образом, время обновления для 50 тысяч файлов стало приближаться к одному часу.

Питонисты с детства знают, что от использования потоков (тредов операционной системы) нет никакого толка из-за глобального лока интерпретатора. Но немногие догадываются, что как и любой лок, этот время от времени освобождается. В частности, это происходит при операциях ввода-вывода, в том числе и сетевых. А значит, потоки можно использовать для распараллеливания http-запросов — пока один поток ожидает ответа, другой спокойно обрабатывает результат предыдущего или готовит следующий.

Получается, всего-то нужен пул потоков, который будет выполнять запросы. К счастью, такой пул уже написан. Начиная с версии 3.2 для унификации всей асинхронной работы в Питоне появилась библиотека concurrent.futures . Для второй версии Питона есть бекпорт под именем futures. Код до безобразия прост:

from concurrent.futures import ThreadPoolExecutor with ThreadPoolExecutor(concurrency) as executor: for _ in executor.map(upload, queryset): pass 

Здесь concurrency — число рабочих потоков, upload — функция, выполняющую саму задачу, queryset — итератор объектов, которые по одному будут передаваться в задачу. Уже этот код при concurrency в 150 смог пропихнуть на сервера Амазона ≈450 запросов в секунду.

Тут необходимо замечание относительно задач: они должны быть потокобезопасны. Т.е. несколько паралельно выполняющихся задач не должны иметь общих ресурсов, либо должны ими правильно управлять. Глобальный лок интерпретатора тут плохой помощник — он не гарантирует, что выполнение потока не прервется в самом неподходящем месте. Если вы пользуетесь только urllib3, requests или boto, волноваться не о чем, они уже потокобезопасны. Про другие библиотеки нужно уточнять. Также потоконебезопасным может оказаться ваш собственный код.

Шло время, количество файлов стало приближаться к 200 тысячам. Как думаете, сколько памяти могут занимать 200 тысяч Django-моделей? А 200 тысяч фьючерсов? А 200 тысяч поставленных задач? Все вместе около гигабайта. Стало понятно, что посылать в экзекутор все сразу — не выход. Но почему бы не добавлять новые задачи по окончании предыдущих? В самом начале добавляем количество задач, равное количеству потоков, ведем учет сколько задач поставлено, сколько выполнено. Сами фьючерсы не храним, наружу не отдаем. Получается очень классная функция, которую можно использовать повторно (осторожно, это не окончательный вариант) :

from concurrent.futures import ThreadPoolExecutor, Future def task_queue(task, iterator, concurrency=10): def submit(): try: obj = next(iterator) except StopIteration: return stats['delayed'] += 1 future = executor.submit(task, obj) future.add_done_callback(upload_done) def upload_done(future): submit() stats['delayed'] -= 1 stats['done'] += 1 executor = ThreadPoolExecutor(concurrency) stats = for _ in range(concurrency): submit() return stats 

В ней всего три действия: функция submit , которая выбирает следующий объект из итератора и создает для него задачу, upload_done , которая вызывается по окончании выполнения задачи и ставит следующую, и цикл, в котором ставятся первые задачи. Пробуем запустить:

stats = task_queue(upload, queryset.iterator(), concurrency=5) while True: print '\rdone , in work: '.format(**stats), sys.stdout.flush() if stats['delayed'] == 0: break time.sleep(0.2) 

Отлично, работает! Тут уже используется метод iterator кверисета. Кажется, что его можно было бы использовать и в первом примере с функцией executor.map , но executor.map выбирает сразу весь итератор и делает его бесполезным. Тут же объекты действительно выбираются по одному на каждый работающий поток.

Правда, есть проблема: стоит увеличить кол-во потоков, как начинают сыпаться исключения «ValueError: generator already executing». Код использует один и тот же генератор из всех потоков, поэтому рано или поздно два потока пытаются выбрать значения одновременно (на самом деле это может произойти когда потоков всего два, но с меньшей вероятностью). Это же касается и счетчиков, рано или поздно два процесса одновременно считают одно значение, потом оба прибавят единицу и оба запишут «исходное число + 1», а не «исходное число + 2». Поэтому всю работу с разделяемыми объектами нужно обернуть в локи.

Есть и другие проблемы. Нет обработки ошибок, которые могут произойти во время выполнения задачи. Если прервать выполнение с помощью ctrl+c, в основном потоке будет выброшено исключение, а остальные продолжат выполнение до самого конца, поэтому нужен механизм принудительного завершения очереди. У экзекутора как раз есть метод shutdown для этих целей и можно было бы отдавать экзекутор наружу, чтобы останавливать его, когда пользователь нажимает ctrl+c. Но есть вариант получше: можно создать фьючерс, который будет резолвится по окончании всех работ и подчищать экзекутор, если кто-то извне его отменит. Вот версия, в которой учтены все эти ошибки:

def task_queue(task, iterator, concurrency=10, on_fail=lambda _: None): def submit(): try: obj = next(iterator) except StopIteration: return if result.cancelled(): return stats['delayed'] += 1 future = executor.submit(task, obj) future.obj = obj future.add_done_callback(upload_done) def upload_done(future): with io_lock: submit() stats['delayed'] -= 1 stats['done'] += 1 if future.exception(): on_fail(future.exception(), future.obj) if stats['delayed'] == 0: result.set_result(stats) def cleanup(_): with io_lock: executor.shutdown(wait=False) io_lock = threading.RLock() executor = ThreadPoolExecutor(concurrency) result = Future() result.stats = stats = result.add_done_callback(cleanup) with io_lock: for _ in range(concurrency): submit() return result 

Тут нужно использовать reentrant лок, потому что есть определенная вероятность, что очень короткая задача успеет выполнится до навешивания обработчика в add_done_callback , и тогда обработчик будет выполнен немедленно в том же потоке и попытается еще раз захватить лок. Получится дедлок. Reentrant лок позволит тому же потоку, что захватил его в первый раз, спокойно зайти еще раз, но не даст себя захватить из другого потока, пока первый поток не освободит его столько же раз, сколько захватывал. Немного меняется и код, который использует эту очередь задач:

from concurrent.futures import ThreadPoolExecutor, Future, TimeoutError result = task_queue(upload, queryset.iterator(), concurrency=5) try: while not result.done(): try: result.result(.2) except TimeoutError: pass print '\rdone , in work: '.format(**result.stats), sys.stdout.flush() except KeyboardInterrupt: result.cancel() raise 

Больше не нужно тупо засыпать каждые 200 миллисекунд, можно засыпать по умному, ожидая завершения очереди. А в случае прерывания останавливать очередь.

Смеркалось. Шло время, количество файлов стало приближаться к 1,5 миллионам. Несмотря на то, что все выглядело так, как будто все работает с фиксированным потреблением памяти (кол-во тредов, фьючерсов и Django-моделей на протяжении всего выполнения не должно меняться), потребление памяти все равно росло. Оказалось, что queryset.iterator() работает немного не так, как ожидалось. Объекты действительно создаются только тогда, когда явно выбираются из итератора, а вот сырой ответ базы данных все равно выгребается драйвером сразу. Получается около 500 мегабайт на миллион строк. Решение этой проблемы довольно очевидно: нужно делать запросы не на все объекты сразу, а разделять порции. При этом следует избегать выборки со смещением, потому что запрос вида LIMIT 100 OFFSET 200000 на самом деле означает, что СУБД нужно пробежаться по 200100 записям. Вместо смещения следует использовать выборку по полю с индексом.

def real_queryset_iterator(qs, pk='pk', chunk_size=5000): qs = qs.order_by(pk) chunk = list(qs[:chunk_size]) while chunk: for item in chunk: yield item last_pk = getattr(chunk[-1], pk) chunk = list(qs.filter(**)[:chunk_size]) 

Здесь pk — скорее pagination key, нежели primary. Впрочем, зачастую primary хорошо подходит на эту роль. Такой итератор действительно расходует фиксированное количество памяти и работает не медленнее выборки за один раз. Но если увеличить кол-во потоков, возникает еще одна проблема. В Джанге соединения с базой данных являются локальными для потоков, поэтому, когда очередной поток делает запрос, создается новое соединение. Рано или поздно количество соединений доходит до критического числа и возникает исключение, подобное этому:

OperationalError: FATAL: remaining connection slots are reserved for non-replication superuser connections 

Правильным решением было бы использовать для всех потоков одно и то же соединение, т.к. мы уже ограничили возможность одновременно делать запросы из разных потоков. Стандартных средств для этого в Джанге нет, но это можно сделать с помощью хака, заменив объект threading.local на обычный объект:

from django.db import connections, DEFAULT_DB_ALIAS connections._connections = type('empty', (object,), <>)() connections[DEFAULT_DB_ALIAS].allow_thread_sharing = True 

Но надо понимать, что это убъет потокобезопасность базы данных во всем остальном приложении, поэтому такой вариант годится только для команд, запускаемых из консоли. Более гуманный вариант — закрывать соединение после каждого запроса, или после каждого элемента, что дает не сильно большой оверхэд.

def close_connection_iterator(iterator, db=DEFAULT_DB_ALIAS): for item in iterator: connections[db].close() yield item result = task_queue( upload, close_connection_iterator(real_queryset_iterator(queryset)), concurrency=150 ) 

Есть и третье решение: использовать отдельный поток, который будет общаться с базой данных, отдавая объекты в остальные потоки. Этот вариант ничего не ломает в остальном приложении и не привносит накладных расходов на постоянное переоткрытие соединений. Но его реализация довольно сложна и тянет не меньше чем на отдельную статью.

Возможно, пройдет еще время, кол-во файлов возрастет до 10 миллионов и появятся новые проблемы. Но пока кажется, что основная проблема будет в том, что такое обновление займет около восьми часов и будет стоит $50 только за PUT-запросы по текущим ценам Амазона.

  1. Потоки для ввода-вывода на Питоне работают хорошо, но надо позаботиться об изоляции.
  2. Запускать десятки и сотни тысяч задач нужно очень аккуратно, следя за потреблением памяти.
  3. queryset.iterator() в Джанговской ORM работает не совсем так, как ожидается.

concurrent.futures в Python

Привет, Хабр! Сегодня мы взглянем на одну из самых интересных библиотек в Python для работы с параллельным выполнением задач — concurrent.futures.

Каждый разработчик сталкивается с ситуациями, когда необходимо выполнять задачи параллельно. Это может быть I/O-операции, которые блокируют основной поток, или вычисления, требующие большого объема процессорных ресурсов. Здесь на помощь приходит concurrent.futures — модуль, предоставляющий высокоуровневый интерфейс для асинхронного и параллельного выполнения задач.

Какие преимущества предоставляет этот модуль?

  1. Простота использования: Concurrent.futures предоставляет простой и интуитивно понятный API для запуска задач параллельно. Это позволяет сосредоточиться на решении задачи, а не на деталях многозадачности.
  2. Автоматическое масштабирование: Модуль позволяет легко масштабировать задачи, выполняемые в пулах потоков (ThreadPoolExecutor) и пулах процессов (ProcessPoolExecutor). Вы можете использовать их в зависимости от характера задачи и доступных ресурсов.
  3. Удобная обработка результатов: Concurrent.futures предоставляет Future объекты, которые позволяют отслеживать выполнение задач и получать результаты, когда они готовы.
  4. Отсутствие необходимости заботиться о GIL: В отличие от многих других способов параллельного выполнения в Python, concurrent.futures позволяет избежать проблем, связанных с Global Interpreter Lock (GIL), что делает его отличным выбором для многозадачных приложений.

Основы concurrent.futures

Асинхронность и параллелизм — два важных понятия, которые позволяют нам оптимизировать выполнение задач в наших программах:

  1. Асинхронность: Этот подход позволяет программе продолжать выполнение других задач, не блокируя основной поток выполнения. Он часто используется для обработки I/O-операций, которые могут занимать значительное время, например, чтение данных из сети или файловой системы. Асинхронный код обычно использует событийную модель и обратные вызовы (callbacks) для уведомления о завершении операции.
  2. Параллелизм: В то время как асинхронность позволяет выполнять задачи, не блокируя основной поток, параллелизм предоставляет возможность выполнения нескольких задач одновременно. При этом каждая задача может выполняться в собственном потоке или процессе, в зависимости от используемого механизма.

Инструменты для работы с concurrent.futures:

  1. ThreadPoolExecutor: ThreadPoolExecutor — это класс, предоставляющий пул потоков для выполнения задач. Этот инструмент особенно полезен, когда задачи связаны с I/O-операциями, которые блокируют поток выполнения. Пример использования:
from concurrent.futures import ThreadPoolExecutor def task_function(param): # Реализация задачи return result # Создание ThreadPoolExecutor с 4 рабочими потоками with ThreadPoolExecutor(max_workers=4) as executor: # Запуск задачи асинхронно future = executor.submit(task_function, param) # Ожидание результата и получение его result = future.result() 
  1. ProcessPoolExecutor: ProcessPoolExecutor предоставляет пул процессов для выполнения задач. Этот инструмент полезен, когда задачи могут выполняться независимо и имеют высокую вычислительную нагрузку:
from concurrent.futures import ProcessPoolExecutor def task_function(param): # Реализация задачи return result # Создание ProcessPoolExecutor с 4 рабочими процессами with ProcessPoolExecutor(max_workers=4) as executor: # Запуск задачи параллельно future = executor.submit(task_function, param) # Ожидание результата и получение его result = future.result() 

Для создания экземпляра ThreadPoolExecutor или ProcessPoolExecutor, вы можете установить максимальное количество рабочих потоков или процессов с помощью параметра max_workers . Это позволяет вам настроить параллельное выполнение под ваше конкретное приложение:

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor # Создание ThreadPoolExecutor с 8 рабочими потоками thread_executor = ThreadPoolExecutor(max_workers=8) # Создание ProcessPoolExecutor с 4 рабочими процессами process_executor = ProcessPoolExecutor(max_workers=4) 

Future объекты представляют асинхронные задачи, которые были отправлены на выполнение в пуле потоков или процессов. Они предоставляют удобный способ отслеживания статуса и получения результатов задачи:

from concurrent.futures import ThreadPoolExecutor def task_function(param): # Реализация задачи return result with ThreadPoolExecutor(max_workers=4) as executor: future = executor.submit(task_function, param) # Ожидание результата result = future.result() 

Future объекты также позволяют обработать ошибки, которые могли возникнуть при выполнении задачи:

try: result = future.result() except Exception as e: print(f"Произошла ошибка: ") 

Задачи и Пулы потоков (Thread Pools)

Задачи — это базовые строительные блоки параллельного выполнения. Они могут быть представлены в виде функций, которые выполняют какую-либо работу. Например, задача может быть функцией, которая загружает данные из сети, анализирует текст или обрабатывает изображение:

def load_data_from_server(): # Загрузка данных из сети pass def process_text(data): # Анализ и обработка текста pass def process_image(image): # Обработка изображения pass 

ThreadPoolExecutor предоставляет пул потоков для выполнения задач. Он работает следующим образом: вы создаете экземпляр ThreadPoolExecutor, указывая количество рабочих потоков, и отправляете задачи на выполнение в пуле.

from concurrent.futures import ThreadPoolExecutor # Создание ThreadPoolExecutor с 4 рабочими потоками with ThreadPoolExecutor(max_workers=4) as executor: # Запуск задач асинхронно future1 = executor.submit(load_data_from_server) future2 = executor.submit(process_text, data) future3 = executor.submit(process_image, image) # Ожидание результата и получение его result1 = future1.result() result2 = future2.result() result3 = future3.result() 

ThreadPoolExecutor автоматически управляет пулом потоков и распределяет задачи между рабочими потоками, что делает его удобным инструментом для асинхронного выполнения задач.

ThreadPoolExecutor предоставляет методы для управления пулом потоков. Например, метод shutdown() позволяет корректно завершить работу пула после завершения всех задач:

from concurrent.futures import ThreadPoolExecutor with ThreadPoolExecutor(max_workers=4) as executor: # Запуск задач . # После завершения всех задач, пул потоков автоматически закроется 

Определение правильного количества рабочих потоков чрезвычайно важно. Слишком много потоков может привести к избыточным накладным расходам, а слишком мало — к неэффективности параллельного выполнения. ThreadPoolExecutor позволяет легко управлять этим параметром с помощью max_workers .

from concurrent.futures import ThreadPoolExecutor # Создание ThreadPoolExecutor с динамическим управлением рабочими потоками with ThreadPoolExecutor(max_workers=None) as executor: # Он автоматически адаптирует количество потоков к нагрузке . 

Рассмотрим различные сценарии использования ThreadPoolExecutor: параллельная загрузка данных из сети, обработка больших объемов текста и многозадачная обработка изображений:

Параллельная загрузка данных из сети

from concurrent.futures import ThreadPoolExecutor import requests def fetch_url(url): response = requests.get(url) return response.content urls = ["http://example.com", "http://example.org", "http://example.net"] with ThreadPoolExecutor(max_workers=4) as executor: results = list(executor.map(fetch_url, urls)) 

Обработка больших объемов текста

from concurrent.futures import ThreadPoolExecutor def process_text(text): # Реализация обработки текста pass text_data = [. ] with ThreadPoolExecutor(max_workers=4) as executor: results = list(executor.map(process_text, text_data)) 

Многозадачная обработка изображений

from concurrent.futures import ThreadPoolExecutor from PIL import Image def process_image(image_path): image = Image.open(image_path) # Реализация обработки изображения pass image_paths = [. ] with ThreadPoolExecutor(max_workers=4) as executor: results = list(executor.map(process_image, image_paths)) 

Многозадачность и Пулы процессов (Process Pools)

ProcessPoolExecutor предоставляет пул процессов, что позволяет выполнить задачи параллельно в отдельных процессах. Это особенно полезно, когда у вас есть задачи, которые могут выполняться независимо друг от друга, и вы хотите максимально использовать ресурсы многопроцессорных систем.

Пример использования ProcessPoolExecutor:

from concurrent.futures import ProcessPoolExecutor def task_function(param): # Реализация задачи return result # Создание ProcessPoolExecutor с 4 рабочими процессами with ProcessPoolExecutor(max_workers=4) as executor: # Запуск задачи параллельно future = executor.submit(task_function, param) # Ожидание результата и получение его result = future.result() 

В этом примере мы создаем пул процессов с помощью ProcessPoolExecutor, отправляем задачу на выполнение и затем ожидаем ее завершения. ProcessPoolExecutor автоматически управляет процессами, что делает его мощным инструментом для параллельного выполнения задач.

Различия между ThreadPoolExecutor и ProcessPoolExecutor:

  1. Потоки vs. Процессы: ThreadPoolExecutor использует потоки, работающие в пределах одного процесса, в то время как ProcessPoolExecutor использует отдельные процессы для каждой задачи. Это позволяет ProcessPoolExecutor избегать проблем с Global Interpreter Lock (GIL), которые могут возникнуть в ThreadPoolExecutor.
  2. Ресурсы и производительность: ThreadPoolExecutor обычно требует меньше системных ресурсов, так как потоки делят один и тот же адресное пространство процесса. Однако ProcessPoolExecutor может обеспечить более высокую производительность в случае многозадачных вычислений, так как каждая задача выполняется в отдельном процессе.
  3. Сериализация данных: В ProcessPoolExecutor данные, передаваемые между процессами, должны быть сериализованы и десериализованы. Это может потребовать дополнительных усилий и влиять на производительность. В ThreadPoolExecutor такой необходимости нет.

Передача данных между процессами в ProcessPoolExecutor требует сериализации и десериализации объектов. Это означает, что данные, передаваемые в задачу и возвращаемые из нее, должны быть сериализуемыми.

Пример передачи данных между процессами:

from concurrent.futures import ProcessPoolExecutor def process_data(data): # Реализация обработки данных pass data_list = [. ] with ProcessPoolExecutor(max_workers=4) as executor: results = list(executor.map(process_data, data_list)) 

Здесь data_list передается в задачу, и результаты обработки данных возвращаются в основной процесс.

Рассмотрим пару примеров применения вышесказанного:

Вычисление суммы квадратов чисел в параллельных процессах

from concurrent.futures import ProcessPoolExecutor def square_and_sum(numbers): return sum(x ** 2 for x in numbers) numbers_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] with ProcessPoolExecutor(max_workers=4) as executor: results = list(executor.map(square_and_sum, [numbers_list] * 4)) 

Обработка изображений в параллельных процессах

from concurrent.futures import ProcessPoolExecutor from PIL import Image def process_image(image_path): image = Image.open(image_path) # Реализация обработки изображения pass image_paths = [. ] with ProcessPoolExecutor(max_workers=4) as executor: results = list(executor.map(process_image, image_paths)) 

Параллельное вычисление матричного произведения

from concurrent.futures import ProcessPoolExecutor import numpy as np def matrix_multiply(matrices): A, B = matrices result = np.dot(A, B) return result matrix_A = np.random.rand(100, 100) matrix_B = np.random.rand(100, 100) with ProcessPoolExecutor(max_workers=4) as executor: results = list(executor.map(matrix_multiply, [(matrix_A, matrix_B)] * 4)) 

Очереди и многозадачность

Очереди позволяют распределять задачи между разными рабочими процессами или потоками, а многозадачность обеспечивает эффективное выполнение этих задач.

Основные преимущества применения очередей и многозадачности:

  1. Увеличение производительности: Многозадачность позволяет распараллеливать выполнение задач, что может значительно повысить производительность вашего приложения.
  2. Управление ресурсами: Вы можете эффективно управлять ресурсами вашей системы, распределяя задачи на выполнение в соответствии с их приоритетом и доступными ресурсами.
  3. Обработка больших объемов данных: Очереди и многозадачность идеально подходят для обработки больших объемов данных, так как они позволяют эффективно разделить задачи между несколькими потоками или процессами.

Producer-consumer паттерн — это популярный способ реализации очередей и многозадачности. В этом паттерне задачи генерируются «производителями» и помещаются в очередь, откуда их забирают «потребители» для выполнения. Concurrent.futures предоставляет удобные средства для реализации этого паттерна.

Рассмотрим три примера кода, демонстрирующих применение producer-consumer паттерна с использованием concurrent.futures:

Многозадачная загрузка изображений из сети

from concurrent.futures import ThreadPoolExecutor import requests def download_image(url): response = requests.get(url) return response.content image_urls = ["url1.jpg", "url2.jpg", "url3.jpg"] with ThreadPoolExecutor(max_workers=4) as executor: image_futures = for future in concurrent.futures.as_completed(image_futures): url = image_futures[future] image_data = future.result() # Обработка загруженного изображения 

В этом примере, задачи «производителями» являются запросы на загрузку изображений, а задачи «потребителями» — обработка полученных данных.

Обработка данных с использованием многозадачности

from concurrent.futures import ProcessPoolExecutor data_to_process = [. ] def process_data(data): # Обработка данных pass with ProcessPoolExecutor(max_workers=4) as executor: results = list(executor.map(process_data, data_to_process)) 

В этом примере, «производителем» является процесс, создающий данные для обработки, а «потребителем» — процессы, выполняющие фактическую обработку.

Параллельное выполнение вычислений

from concurrent.f utures import ThreadPoolExecutor def perform_computation(data): # Вычисления pass computation_data = [. ] with ThreadPoolExecutor(max_workers=4) as executor: results = list(executor.map(perform_computation, computation_data)) 

В этом примере, «производителем» могут быть данные, которые требуют вычислений, а «потребителем» — потоки, выполняющие эти вычисления.

Отслеживание выполнения и обработка результатов

Future объекты предоставляют способ отслеживать выполнение асинхронных задач и получать их результаты, позволяют вам мониторить статус и результаты задач.

Пример использования Future объектов:

from concurrent.futures import ThreadPoolExecutor def task_function(param): # Реализация задачи return result with ThreadPoolExecutor(max_workers=4) as executor: future = executor.submit(task_function, param) # Отслеживание выполнения задачи if future.done(): result = future.result() else: # Задача все еще выполняется 

Мы создаем Future объект, представляющий задачу, и затем отслеживаем его выполнение с помощью методов done() и result() . Мы можем узнать, завершилась ли задача, и получить ее результат, когда она завершится.

При разработке приложений важно обрабатывать исключения и ошибки, которые могут возникнуть во время выполнения задач. Future объекты предоставляют механизм для обработки исключений, возникающих в задачах:

from concurrent.futures import ThreadPoolExecutor def task_function(param): try: # Реализация задачи result = perform_task(param) except Exception as e: # Обработка исключения result = handle_exception(e) return result with ThreadPoolExecutor(max_workers=4) as executor: future = executor.submit(task_function, param) # Обработка исключения, если оно произошло try: result = future.result() except Exception as e: handle_exception(e) 

В этом примере, мы внутри задачи «ловим» исключение, если оно возникает, и затем обрабатываем его. Затем, при получении результата с Future объекта, мы также обрабатываем исключение, если оно было выброшено внутри задачи.

В некоторых сценариях вам может потребоваться собрать и агрегировать результаты выполнения нескольких задач. Concurrent.futures предоставляет удобные способы сделать это:

from concurrent.futures import ThreadPoolExecutor def task_function(param): # Реализация задачи return result params = [param1, param2, param3] with ThreadPoolExecutor(max_workers=4) as executor: futures = [executor.submit(task_function, param) for param in params] # Сбор результатов выполнения задач results = [future.result() for future in futures] # Агрегация результатов aggregated_result = aggregate_results(results) 

В этом примере мы создаем Future объекты для каждой задачи и затем собираем результаты выполнения задач в список. Затем эти результаты могут быть агрегированы по вашим потребностям.

Использование concurrent.futures позволяет создавать быстрые и эффективные приложения, эффективно управлять задачами и избегать проблем с Global Interpreter Lock (GIL).

Статья подготовлена в преддверии старта специализации Системный аналитик. Узнать подробнее о специализации можно тут.

Использование ThreadPoolExecutor в Python 3

Использование ThreadPoolExecutor в Python 3

Потоки в Python представляют собой форму параллельного программирования, позволяющую программе выполнять несколько процедур одновременно. Параллелизм в Python также можно реализовать посредством использования нескольких процессов, однако потоки особенно хорошо подходят для ускорения приложений, использующих существенные объемы ввода/вывода.

Например, операции ввода-вывода включают отправку веб-запросов и чтение данных из файлов. В отличие от операций ввода вывода, операции процессора (например, математические операции со стандартной библиотекой Python) не становятся намного эффективнее при использовании потоков Python.

В состав Python 3 входит утилита ThreadPoolExecutor для выполнения кода в потоке.

В этом обучающем модуле мы используем ThreadPoolExecutor для ускоренной отправки сетевых запросов. Мы определим функцию, хорошо подходящую для вызова в потоках, используем ThreadPoolExecutor для выполнения этой функции и обработаем результаты выполнения.

В этом обучающем модуле мы будем составлять сетевые запросы для проверки существования страниц на портале Wikipedia.

Примечание. Тот факт, что операции ввода-вывода получают больше выгод от потоков, чем операции процессора, связан с использованием в Python глобальной блокировки интерпретатора, которая позволяет только одному потоку сохранять контроль над интерпретатором Python. Если хотите, вы можете узнать больше о глобальном блокировке интерпретатора Python в официальной документации по Python.

Предварительные требования

Для наиболее эффективного прохождения этого обучающего модуля требуется знакомство с программированием на Python и локальной средой программирования Python с requests .

Необходимую информацию можно получить, пройдя следующие обучающие модули:

  • Программирование на Python 3
  • Установка Python 3 и настройка среды программирования в Ubuntu 18.04
  • Чтобы установить пакет requests в локальную среду программирования Python, запустите следующую команду:

Шаг 1 — Определение функции для выполнения в потоках

Для начала определим функцию, которую мы хотим выполнить с помощью потоков.

Откройте этот файл, используя nano или предпочитаемый текстовый редактор или среду разработки:

Для этого обучающего модуля мы напишем функцию, проверяющую существование страницы на портале Wikipedia:

wiki_page_function.py

import requests def get_wiki_page_existence(wiki_page_url, timeout=10): response = requests.get(url=wiki_page_url, timeout=timeout) page_status = "unknown" if response.status_code == 200: page_status = "exists" elif response.status_code == 404: page_status = "does not exist" return wiki_page_url + " - " + page_status 

Функция get_wiki_page_existence принимает два аргумента: URL страницы Wikipedia ( wiki_page_url ) и timeout — количество секунд ожидания ответа от этого URL.

get_wiki_page_existence использует пакет requests для отправки веб-запроса на этот URL. В зависимости от кода состояния ответа HTTP функция возвращает строку, описывающую наличие или отсутствие страницы. Разные коды состояния соответствуют разным результатам выполнения запроса HTTP. Эта процедура предполагает, что код состояния 200 (успех) означает, что страница Wikipedia существует, а код состояния 404 (не найдено) означает, что страница Wikipedia не существует.

Как указывалось в разделе «Предварительные требования», для запуска этой функции должен быть установлен пакет requests .

Попробуем запустить функцию, добавив url и вызов функции после функции get_wiki_page_existence :

wiki_page_function.py

. . . url = "https://en.wikipedia.org/wiki/Ocean" print(get_wiki_page_existence(wiki_page_url=url)) 

После добавления кода сохраните и закройте файл.

Если мы запустим этот код:

Результат будет выглядеть примерно следующим образом:

Output
https://en.wikipedia.org/wiki/Ocean - exists

Вызов функции get_wiki_page_existence для существующей страницы Wikipedia возвращает строку, подтверждающую фактическое существование страницы.

Предупреждение. Обычно небезопасно делать объекты или состояния Python доступными для всех потоков, не приняв особых мер для предотвращения ошибок параллельной обработки. При определении функции для выполнения в потоке лучше всего определить функцию, которая выполняет одну задачу и не делится своим состоянием с другими потоками. get_wiki_page_existence — хороший пример такой функции.

Шаг 2 — Использование ThreadPoolExecutor для выполнения функции в потоках

Теперь у нас есть функция, подходящая для вызова в потоках, и мы можем использовать ThreadPoolExecutor для многократного ускоренного вызова этой функции.

Добавьте следующий выделенный код в свою программу в файле wiki_page_function.py :

wiki_page_function.py

import requests import concurrent.futures def get_wiki_page_existence(wiki_page_url, timeout=10): response = requests.get(url=wiki_page_url, timeout=timeout) page_status = "unknown" if response.status_code == 200: page_status = "exists" elif response.status_code == 404: page_status = "does not exist" return wiki_page_url + " - " + page_status wiki_page_urls = [ "https://en.wikipedia.org/wiki/Ocean", "https://en.wikipedia.org/wiki/Island", "https://en.wikipedia.org/wiki/this_page_does_not_exist", "https://en.wikipedia.org/wiki/Shark", ] with concurrent.futures.ThreadPoolExecutor() as executor: futures = [] for url in wiki_page_urls: futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url)) for future in concurrent.futures.as_completed(futures): print(future.result()) 

Посмотрим, как работает этот код:

  • concurrent.futures импортируется, чтобы предоставить нам доступ к ThreadPoolExecutor .
  • Выражение with используется для создания исполнительного блока экземпляра ThreadPoolExecutor , который будет быстро очищать потоки после выполнения.
  • Четыре задания отправляются в исполнительный блок : по одному для каждого URL из списка wiki_page_urls .
  • Каждый вызов submit возвращает экземпляр Future , хранящийся в списке futures .
  • Функция as_completed ожидает каждого вызова Future get_wiki_page_existence для выполнения, чтобы дать нам возможность распечатать результат.

Если мы снова запустим эту программу с помощью следующей команды:

Результат будет выглядеть примерно следующим образом:

Output
https://en.wikipedia.org/wiki/Island - exists https://en.wikipedia.org/wiki/Ocean - exists https://en.wikipedia.org/wiki/this_page_does_not_exist - does not exist https://en.wikipedia.org/wiki/Shark - exists

Этот вывод имеет смысл: 3 адреса URL указывают на существующие страницы Wikipedia, а один из них this_page_does_not_exist не существует. Обратите внимание. что вывод может иметь другой порядок, отличающийся от показанного здесь. Функция concurrent.futures.as_completed в этом примере возвращает результаты сразу же, как только они становятся доступными, вне зависимости от порядка отправки заданий.

Шаг 3 — Обработка исключений функций, выполняемых в потоках

На предыдущем шаге функция get_wiki_page_existence успешно вернула значения во всех случаях вызова. На этом шаге мы увидим, что ThreadPoolExecutor также может выводить исключения при вызове функций в потоках.

Рассмотрим в качестве примера следующий блок кода:

wiki_page_function.py

import requests import concurrent.futures def get_wiki_page_existence(wiki_page_url, timeout=10): response = requests.get(url=wiki_page_url, timeout=timeout) page_status = "unknown" if response.status_code == 200: page_status = "exists" elif response.status_code == 404: page_status = "does not exist" return wiki_page_url + " - " + page_status wiki_page_urls = [ "https://en.wikipedia.org/wiki/Ocean", "https://en.wikipedia.org/wiki/Island", "https://en.wikipedia.org/wiki/this_page_does_not_exist", "https://en.wikipedia.org/wiki/Shark", ] with concurrent.futures.ThreadPoolExecutor() as executor: futures = [] for url in wiki_page_urls: futures.append( executor.submit( get_wiki_page_existence, wiki_page_url=url, timeout=0.00001 ) ) for future in concurrent.futures.as_completed(futures): try: print(future.result()) except requests.ConnectTimeout: print("ConnectTimeout.") 

Этот блок кода практически идентичен использованному нами на шаге 2, но имеет два важных отличия:

  • Теперь мы передаем аргумент timeout=0.00001 для функции get_wiki_page_existence . Поскольку пакет requests не может выполнить веб-запрос сайта Wikipedia за 0,00001 секунды, он выдаст исключение ConnectTimeout .
  • Мы собираем исключения ConnectTimeout , выдаваемые future.result() , и выводим строку в каждом таком случае.

Если мы запустим программу снова, мы получим следующий результат:

Output
ConnectTimeout. ConnectTimeout. ConnectTimeout. ConnectTimeout.

Выведено четыре сообщения ConnectTimeout , по одному для каждого из четырех значений wiki_page_urls , поскольку ни один запрос не мог быть выполнен за 0,00001 секунды, и каждый из четырех вызовов get_wiki_page_existence завершился исключением ConnectTimeout .

Мы увидели, что если вызов функции, отправленный в ThreadPoolExecutor , завершается исключением, это исключение может быть выведено обычным образом посредством вызова Future.result . Вызов Future.result для всех вызванных функций гарантирует, что ваша программа не пропустит никаких исключений при выполнении функции в потоке.

Шаг 4 — Сравнение времени исполнения с потоками и без потоков

Убедимся, что использование ThreadPoolExecutor действительно ускоряет нашу программу.

Вначале определим время выполнения функции get_wiki_page_existence при ее запуске без потоков:

wiki_page_function.py

import time import requests import concurrent.futures def get_wiki_page_existence(wiki_page_url, timeout=10): response = requests.get(url=wiki_page_url, timeout=timeout) page_status = "unknown" if response.status_code == 200: page_status = "exists" elif response.status_code == 404: page_status = "does not exist" return wiki_page_url + " - " + page_status wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)] print("Running without threads:") without_threads_start = time.time() for url in wiki_page_urls: print(get_wiki_page_existence(wiki_page_url=url)) print("Without threads time:", time.time() - without_threads_start) 

В этом пример кода мы вызываем функцию get_wiki_page_existence с пятьюдесятью разными URL страниц Wikipedia по одной. Мы используем функцию time.time() для вывода количества секунд выполнения нашей программы.

Если мы запустим этот код снова, как и раньше, мы увидим следующий результат:

Output
Running without threads: https://en.wikipedia.org/wiki/0 - exists https://en.wikipedia.org/wiki/1 - exists . . . https://en.wikipedia.org/wiki/48 - exists https://en.wikipedia.org/wiki/49 - exists Without threads time: 5.803015232086182

Записи 2–47 в выводимых результатах пропущены для краткости.

Количество секунд, выводимое после Without threads time , будет отличаться для вашего компьютера, и это нормально, ведь это просто базовое число для сравнения с получаемым при использовании ThreadPoolExecutor . В данном случае мы получили результат ~5,803 секунды .

Теперь снова пропустим те же пятьдесят URL страниц Wikipedia через функцию get_wiki_page_existence , но в этот раз с использованием ThreadPoolExecutor :

wiki_page_function.py

import time import requests import concurrent.futures def get_wiki_page_existence(wiki_page_url, timeout=10): response = requests.get(url=wiki_page_url, timeout=timeout) page_status = "unknown" if response.status_code == 200: page_status = "exists" elif response.status_code == 404: page_status = "does not exist" return wiki_page_url + " - " + page_status wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)] print("Running threaded:") threaded_start = time.time() with concurrent.futures.ThreadPoolExecutor() as executor: futures = [] for url in wiki_page_urls: futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url)) for future in concurrent.futures.as_completed(futures): print(future.result()) print("Threaded time:", time.time() - threaded_start) 

Это тот же самый код, который мы создали на шаге 2, только в него добавлены выражения print, показывающие время выполнения нашего кода в секундах.

Если мы снова запустим программу, мы увидим следующий результат:

Output
Running threaded: https://en.wikipedia.org/wiki/1 - exists https://en.wikipedia.org/wiki/0 - exists . . . https://en.wikipedia.org/wiki/48 - exists https://en.wikipedia.org/wiki/49 - exists Threaded time: 1.2201685905456543

Количество секунд после Threaded time на вашем компьютере будет отличаться (как и порядок вывода).

Теперь вы можете сравнить время выполнения при доставке пятидесяти URL страниц Wikipedia с потоками и без потоков.

На компьютере, использованном для этого обучающего модуля, выполнение операций без потоков заняло ~5,803 секунды, а с потоками — ~1,220 секунды. С потоками наша программа работала значительно быстрее.

Заключение

В этом обучающем модуле мы научились использовать утилиту ThreadPoolExecutor в Python 3 для эффективного выполнения кода, связанного с операциями ввода-вывода. Вы создали функцию, хорошо подходящую для вызова в потоках, научились получать результаты и исключения при выполнении этой фукнции в потоках и оценили прирост производительности, достигаемый за счет использования потоков.

Далее вас могут заинтересовать другие функции параллельной обработки, доступные в модуле concurrent.futures .

Thanks for learning with the DigitalOcean Community. Check out our offerings for compute, storage, networking, and managed databases.

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *