Помещение обработчиков в очередь
В примерах предыдущего раздела данные, помещаемые в очередь, всегда были строками. Этого вполне достаточно для простых приложений, где могут существовать производители только одного типа. Однако если в программе могут иметься потоки, выполняющие различные функции и производящие данные различных типов, это может осложнить обработку данных. Вероятно, вам придется сопровождать данные некоторой дополнительной информацией, которая поможет главному потоку графического интерфейса определить, как обрабатывать эти данные.
Представьте, например, почтовый клиент, где несколько операций отправки и приема почты могут выполняться одновременно. Если все потоки совместно используют одну и ту же очередь, информация, помещаемая в нее, должна иметь какие-то отличительные признаки, которые позволят определить, какое событие она представляет, — загруженное сообщение для отображения, информацию для индикатора хода выполнения операции, сообщение об успешной отправке или что-то еще. Это надуманный пример: мы столкнемся с этой проблемой в приложении PyMailGUI, представленном в главе 14.
К счастью, очереди поддерживают не только строки — в очередь можно помещать объекты Python любых типов. Наиболее универсальными из них, вероятно, являются вызываемые объекты: помещая в очередь функцию или другой вызываемый объект, поток-производитель может самым непосредственным способом сообщить графическому интерфейсу, как обрабатывать данные. Графическому интерфейсу остается просто извлечь объект из очереди и вызвать его. Поскольку все потоки выполняются в пределах одного и того же процесса, в очередь могут помещаться вызываемые объекты любых типов — простые функции, результаты lambda-выражений и даже связанные методы, объединяющие в себе функции и объекты, обеспечивающие доступ к их данным и методам. Любые изменения в объектах, выполняемые такими функциями обратного вызова, будут доступны всему процессу.
Благодаря возможности в языке Python универсальным способом обрабатывать функции и списки их аргументов, передача их через очередь выглядит гораздо проще, чем могло бы показаться. Так, в примере 10.20 демонстрируется один из способов передачи функций обратного вызова через очередь, который будет использоваться в приложении PyMailGUI в главе 14. Этот модуль содержит также ряд полезных инструментов. Класс ThreadCounter можно использовать как совместно используемый счетчик и логический флаг (например, для управления операциями, перекрывающимися во времени). Однако наиболее важным здесь является реализация интерфейса передачи функций через очередь — в двух словах, данная реализация позволяет клиентам запускать потоки выполнения, которые помещают в очередь свои функции завершения для передачи главному потоку.
До определенной степени этот пример можно считать лишь разновидностью примера из предыдущего раздела — здесь мы по-прежнему выполняем цикл событий от таймера, в котором главный поток извлекает данные из очереди. Для большей эффективности за одно событие от таймера из очереди извлекается уже не один (что может оказаться слишком долгим, при большом количестве элементов данных, или слишком накладным, при коротком интервале срабатывания таймера) и не все (что может привести к блокированию графического интерфейса, если данные поступают слишком быстро), а до N элементов данных. Мы дополним этот прием пакетной обработки данных в PyMailGUI возможностью выполнения множественных изменений в графическом интерфейсе без необходимости выделять ресурсы процессора для обработки коротких событий от таймера, в чем обычно нет необходимости.
Однако здесь главное отличие, на которое следует обратить внимание, заключается в том, что мы вызываем объекты, которые потоки- производители обобщенным способом помещают в очередь для обработки успешного или неудачного выполнения операции в ответ на благополучное завершение или возникшее исключение. Кроме того, функции, выполняемые в потоках-производителях, принимают функцию, определяющую информацию о протекании операции, которая при вызове просто помещает в очередь обработчик индикатора хода выполнения операции, предназначенный для вызова в контексте главного потока выполнения. Эту особенность можно использовать, например, чтобы показать в графическом интерфейсе ход выполнения загрузки данных по сети.
Пример 10.20. PP4E\Gui\Tools\threadtools.py
############################################################################## Общесистемные утилиты поддержки многопоточной модели выполнения для графических интерфейсов.
Реализует единую очередь обработчиков и цикл обработки событий от таймера для ее проверки, совместно используемые всеми окнами в программе; рабочие потоки помещают в очередь свои обработчики завершения и протекания операции для вызова в главном потоке; эта модель не блокирует графический интерфейс — он просто выполняет операции в порождаемых дочерних потоках и обрабатывает события завершения и продолжения операций; рабочие потоки могут перекрываться во времени с главным потоком и с другими рабочими потоками.
На практике передача функций-обработчиков с аргументами через очереди намного удобнее, чем передача простых данных, если в программе одновременно могут действовать разнотипные потоки выполнения, — каждый тип может подразумевать выполнение различных действий при завершении.
Библиотеки создания графических интерфейсов не полностью поддерживают многопоточную модель, поэтому, вместо того чтобы напрямую вызывать обработчики, производящие изменение графического интерфейса после выполнения основной операции в потоке, они помещаются в общую очередь и вызываются не в дочерних потоках, а в цикле обработки событий от таймера в главном потоке; это также обеспечивает регулярность и предсказуемость моментов обновления графического интерфейса; требуется, чтобы логика потока разбивалась на основную операцию, завершающие действия и операцию, возвращающую информацию о протекании процесса.
Предполагается, что в случае неудачи функция потока возбуждает исключение и принимает в аргументе ‘progress’ функцию обратного вызова, если поддерживает возможность передачи информации о ходе выполнения операции; предполагается также, что все обработчики выполняются очень быстро, либо производят обновление графического интерфейса в процессе работы, и эта очередь будет содержать функции обратного вызова (или другие вызываемые объекты) для использования в приложениях с графическим интерфейсом, — требуется наличие виджетов, чтобы обеспечить работу цикла на основе метода ‘after’; для использования данной модели в сценариях без графического интерфейса можно было бы использовать простой таймер.
##############################################################################
# запустить, даже если нет потоков try:
import _thread as thread
except ImportError:
import _dummy_thread as thread
# сейчас, если модуль threads
# недоступен в стандартной библиотеке,
# возбуждает исключение ImportError
# и блокирует графический интерфейс
# тот же интерфейс без потоков
# общая очередь
# в глобальной области видимости, совместно используется потоками
import queue, sys
threadQueue = queue.Queue(maxsize=0) # infinite size
##############################################################################
# ГЛАВНЫЙ ПОТОК — периодически проверяет очередь; выполняет действия,
# помещаемые в очередь, в контексте главного потока; один потребитель (GUI) и
# множество производителей (загрузка, удаление, отправка); простого списка
# было бы вполне достаточно, если бы операции list.append и list.pop были
# атомарными; 4 издание: в процессе обработки каждого события от таймера
# выполняет до N операций: обход в цикле всех обработчиков, помещенных в
# очередь, может заблокировать графический интерфейс, а при выполнении
# единственной операции вызов всех обработчиков может занять продолжительное
# время или привести к неэффективному расходованию ресурсов процессора на
# обработку событий от таймера (например, информирование о ходе выполнения
# операций); предполагается, что обработчики выполняются очень быстро или
# выполняют обновление графического интерфейса в процессе работы (вызывают
# метод update): после вызова обработчика планируется очередное событие от
# таймера и управление возвращается в цикл событий; поскольку этот цикл
# выполняется в главном потоке, он не препятствует завершению программы;
##############################################################################
def threadChecker(widget, delayMsecs=100, perEvent=1): # 10 раз/сек, 1/таймер for i in range(perEvent): # передайте другие значения,
try: # чтобы повысить скорость
(callback, args) = threadQueue.get(block=False) # выполнить до N
except queue.Empty: # обработчиков
break # очередь пуста?
else: callback(*args) # вызвать обраб.
widget.after(delayMsecs, # переустановить
lambda: threadChecker(widget, delayMsecs, perEvent))# таймер и
# вернуться в цикл # событий
##############################################################################
# НОВЫЙ ПОТОК — выполняет задание, помещает в очередь обработчик завершения и
# обработчик, возвращающий информацию о протекании процесса; вызывает функцию
# основной операции с аргументами, затем планирует вызов функций on* с
# контекстом; запланированные вызовы добавляются в очередь и выполняются в
# главном потоке, чтобы избежать параллельного обновления графического
# интерфейса; позволяет программировать основные операции вообще без учета
# того, что они будут выполняться в потоках; не вызывайте обработчики в
# потоках: они могут обновлять графический интерфейс в потоке, поскольку
# передаваемая функция будет вызвана в потоке; обработчик ‘progress’ просто
# должен добавлять в очередь функцию обратного вызова с передаваемыми ей
# аргументами; не обновляйте текущие счетчики здесь: обработчик завершения
# будет извлечен из очереди и выполнен функцией threadChecker в главном
# потоке;
############################################################################## def threaded(action, args, context, onExit, onFail, onProgress):
try:
if not onProgress: # ждать завершения этого потока
action(*args) # предполагается, что в случае неудачи будет
else: # возбуждено исключение
def progress(*any):
threadQueue.put((onProgress, any + context))
action(progress=progress, *args)
except
threadQueue.put((onFail, (sys.exc_info(), ) + context)) else:
threadQueue.put((onExit, context))
def startThread(action, args, context, onExit, onFail, onProgress=None): thread.start_new_thread(
threaded, (action, args, context, onExit, onFail, onProgress))
############################################################################## # счетчик или флаг с поддержкой многопоточной модели выполнения: удобно # использовать, чтобы избежать выполнения перекрывающихся во времени операций, # когда потоки изменяют другие общие данные, помимо тех, которые изменяются # обработчиками, помещаемыми в очередь ##############################################################################
class ThreadCounter:
def __init__(self): self.count = 0 self.mutex = thread.allocate_lock() # или Threading.semaphore def incr(self):
self.mutex.acquire() # или с помощью self.mutex:
self.count += 1 self.mutex.release()
def decr(self):
self.mutex.acquire() self.count -= 1 self.mutex.release() def __len__(self): return self.count # True/False, если используется, # как флаг
############################################################################## # реализация самотестирования: разбивает поток на основную операцию, # операцию завершения и операцию информирования о ходе выполнения задания ##############################################################################
if __name__ == ‘__main__’: # самотестирование при запуске в виде сценария
import time # или PP4E.Gui.Tour.scrolledtext
from tkinter.scrolledtext import ScrolledText
def onEvent(i): # реализация порождения потоков
myname = ‘thread-%s’ % i startThread( action = threadaction,
args = (i, 3),
context = (myname,),
onExit = threadexit,
onFail = threadfail,
onProgress = threadprogress)
# основная операция, выполняемая потоком
def threadaction(id, reps, progress): # то, что делает поток for i in range(reps):
time.sleep(1)
if progress: progress(i) # обработчик progress: в очередь if id % 2 == 1: raise Exception # ошибочный номер: неудача
# обработчики завершения/информирования о ходе выполнения задания: # передаются главному потоку через очередь def threadexit(myname):
text.insert(‘end’, ‘%s\texit\n’ % myname) text.see(‘end’)
def threadfail(exc_info, myname): text.insert(‘end’, ‘%s\tfail\t%s\n’ % (myname, exc_info[0])) text.see(‘end’)
def threadprogress(count, myname): text.insert(‘end’, ‘%s\tprog\t%s\n’ % (myname, count)) text.see(‘end’) text.update() # допустимо: выполняется в главном потоке
# создать графический интерфейс и запустить цикл обработки событий от
# таймера в главном потоке
# порождать группу рабочих потоков в ответ на каждый щелчок мышью: # выполнение их может перекрываться во времени
text = ScrolledText()
text.pack()
threadChecker(text) # запустить цикл обработки потоков
# ext.bind(‘<Button-1>’, # в 3.x функция list необходима для получения lambda event: list(map(onEvent, range(6))) ) # всех результатов map, # для range — нет text.mainloop() # вход в цикл событий
Реализация модуля описывается в комментариях, а программный код самотестирования демонстрирует порядок его использования. Обратите внимание, что реализация потока выполнения разбита на основные операции, операции, выполняемые при выходе, и необязательные операции информирования о ходе выполнения задания, — основные операции выполняются в новом потоке, но другие помещаются в очередь и выполняются в главном потоке. То есть чтобы воспользоваться этим модулем, вам, по сути, необходимо разделить реализацию функции потока на действия, выполняемые в самом потоке, выполняемые после его завершения и необязательные действия информирования о ходе выполнения задания. В целом, продолжительным может быть только этап, выполняемый в пределах потока.
Если пример 10.20 запустить, как самостоятельный сценарий, по каждому щелчку мышью на виджете ScrolledTest он будет запускать шесть новых потоков выполнения, каждый из которых будет выполнять функцию threadaction. Функция, выполняемая в потоке, вызывает передаваемую ей функцию информирования о ходе выполнения задания, помещающую обработчик обратного вызова в очередь, который вызывает функцию threadprogress в главном потоке. Когда функция потока завершается, интерфейсный уровень помещает в очередь обработчик, который вызовет threadfail или threadexit в главном потоке, в зависимости от того, возбудила функция потока исключение или нет. Так как все обработчики, помещаемые в очередь, извлекаются и выполняются в цикле обработки событий от таймера в главном потоке, это означает, что все изменения в графическом интерфейсе будут производиться только в главном потоке выполнения и не будут перекрываться во времени.
На рис. 10.12 приводится фрагмент вывода, сгенерированного щелчком мыши на окне. Все сообщения о завершении, неудаче и информирующие о ходе выполнения задания, создаются обработчиками, добавляемыми в очередь дочерними потоками и вызываемыми в цикле обработки событий от таймера в главном потоке.

Рис. 10.12. Сообщения, создаваемые обработчиками, извлекаемыми из очереди
Внимательно рассмотрите этот пример и попробуйте последить логику работы программного кода самотестирования. Это достаточно сложное задание, и вам, вероятно, придется не один раз пробежать по нему глазами, чтобы понять, как он действует. Однако как только вы уловите суть этой парадигмы, вы поймете общую схему работы с разнородными потоками выполнения обобщенным способом. В PyMailGUI, например, когда требуется запустить прием/передачу почты, выполняются операции, похожие на операции в функции onEvent в реализации самотестирования в данном примере.
Использованная литература:
Марк Лутц — Программирование на Python, 4-е издание, I том, 2011