Синхронизировать доступ потоков выполнения к совместно используемым ресурсам можно с помощью блокировок, но часто в этом нет необходимости. Как уже упоминалось выше, на практике многопоточные программы часто организуются, как набор потоков производителей и потребителей, которые взаимодействуют между собой, помещая данные в общую очередь и извлекая их оттуда. Если очередь синхронизирует доступ к самой себе, она автоматически будет синхронизировать взаимодействия потоков выполнения.
Как раз такое хранилище данных реализует модуль queue из стандартной библиотеки. Он предоставляет стандартную очередь данных — список объектов Python, построенный по принципу «первый пришел, первый ушел» (first—in first—out, fifo), в котором добавление элементов производится с одного конца, а удаление — с другого. Подобно обычным спискам, очереди, реализуемые этим модулем, могут содержать объекты любых типов, включая объекты простых типов (строки, списки, словари и так далее) и более экзотических типов (экземпляры классов, произвольные вызываемые объекты, такие как функции и связанные методы, и многие другие).
Однако, в отличие от обычных списков, объект очереди автоматически управляет операциями приобретения и освобождения блокировки, благодаря чему в каждый конкретный момент времени изменять очередь может только один поток. Вследствие этого программы, использующие очереди для организации взаимодействий между потоками, изначально обеспечивают поддержку многопоточной модели выполнения и обычно не используют свои собственные блокировки для доступа к данным из потоков выполнения.
Подобно другим инструментам из арсенала поддержки потоков выполнения в языке Python, очереди удивительно просты в использовании. Так, сценарий в примере 5.14 порождает два потока-потребителя, которые ожидают появления данных в общей очереди, и четыре потока- производителя, периодически, через определенные интервалы времени, помещающие данные в очередь (для каждого из них установлена своя продолжительность интервала, чтобы имитировать выполнение длительных операций). Другими словами, эта программа запускает семь потоков выполнения (включая главный поток), шесть из которых обращаются к общей очереди параллельно.
Пример 5.14. PP4E\System\Threads\queuetest.py
“взаимодействие потоков производителей и потребителей посредством очереди”
numconsumers = 2 # количество потоков-потребителей
numproducers = 4 # количество потоков-производителей
nummessages = 4 # количество сообщений, помещаемых производителем
import _thread as thread, queue, time
safeprint = thread.allocate_lock() # в противном случае вывод может
# перемешиваться
dataQueue = queue.Queue() # общая очередь неограниченного размера
def producer(idnum):
for msgnum in range(nummessages):
time.sleep(idnum)
dataQueue.put(‘[producer id=%d, count=%d]’ % (idnum, msgnum))
def consumer(idnum):
while True:
time.sleep(0.1)
try:
data = dataQueue.get(block=False)
except queue.Empty:
pass
else:
with safeprint: print(‘consumer’, idnum, ‘got =>’, data)
if __name__ == ‘__main__’:
for i in range(numconsumers):
thread.start_new_thread(consumer, (i,)) for i in range(numproducers):
thread.start_new_thread(producer, (i,)) time.sleep(((numproducers-1) * nummessages) + 1) print(‘Main thread exit.’)
Прежде чем я покажу вывод этого сценария, я хочу подчеркнуть некоторые важные моменты в этом программном коде.
Использованная литература:
Марк Лутц — Программирование на Python, 4-е издание, I том, 2011