Модуль queue

modul queue Системные инструменты параллельного выполнения

Синхронизировать доступ потоков выполнения к совместно используемым ресурсам можно с помощью блокировок, но часто в этом нет необходимости. Как уже упоминалось выше, на практике многопоточные программы часто организуются, как набор потоков производителей и потребителей, которые взаимодействуют между собой, помещая данные в общую очередь и извлекая их оттуда. Если очередь синхронизирует доступ к самой себе, она автоматически будет синхронизировать взаимодействия потоков выполнения.

Как раз такое хранилище данных реализует модуль queue из стандартной библиотеки. Он предоставляет стандартную очередь данных — список объектов Python, построенный по принципу «первый пришел, первый ушел» (firstin firstout, fifo), в котором добавление элементов производится с одного конца, а удаление — с другого. Подобно обычным спискам, очереди, реализуемые этим модулем, могут содержать объекты любых типов, включая объекты простых типов (строки, списки, словари и так далее) и более экзотических типов (экземпляры классов, произвольные вызываемые объекты, такие как функции и связанные методы, и многие другие).

Однако, в отличие от обычных списков, объект очереди автоматически управляет операциями приобретения и освобождения блокировки, благодаря чему в каждый конкретный момент времени изменять очередь может только один поток. Вследствие этого программы, использующие очереди для организации взаимодействий между потоками, изначально обеспечивают поддержку многопоточной модели выполнения и обычно не используют свои собственные блокировки для доступа к данным из потоков выполнения.

Подобно другим инструментам из арсенала поддержки потоков выполнения в языке Python, очереди удивительно просты в использовании. Так, сценарий в примере 5.14 порождает два потока-потребителя, которые ожидают появления данных в общей очереди, и четыре потока- производителя, периодически, через определенные интервалы времени, помещающие данные в очередь (для каждого из них установлена своя продолжительность интервала, чтобы имитировать выполнение длительных операций). Другими словами, эта программа запускает семь потоков выполнения (включая главный поток), шесть из которых обращаются к общей очереди параллельно.

Пример 5.14. PP4E\System\Threads\queuetest.py

“взаимодействие потоков производителей и потребителей посредством очереди”

numconsumers = 2 # количество потоков-потребителей

numproducers = 4 # количество потоков-производителей

nummessages = 4 # количество сообщений, помещаемых производителем

import _thread as thread, queue, time

safeprint = thread.allocate_lock() # в противном случае вывод может

# перемешиваться

dataQueue = queue.Queue() # общая очередь неограниченного размера

def producer(idnum):

for msgnum in range(nummessages):

time.sleep(idnum)

dataQueue.put(‘[producer id=%d, count=%d]’ % (idnum, msgnum))

def consumer(idnum):

while True:

time.sleep(0.1)

try:

data = dataQueue.get(block=False)

except queue.Empty:

pass

else:

with safeprint: print(‘consumer’, idnum, ‘got =>’, data)

if __name__ == ‘__main__’:

for i in range(numconsumers):

thread.start_new_thread(consumer, (i,)) for i in range(numproducers):

thread.start_new_thread(producer, (i,)) time.sleep(((numproducers-1) * nummessages) + 1) print(‘Main thread exit.’)

Прежде чем я покажу вывод этого сценария, я хочу подчеркнуть некоторые важные моменты в этом программном коде.

Использованная литература:
Марк Лутц — Программирование на Python, 4-е издание, I том, 2011

Оцените статью
Секреты программирования
Добавить комментарий