Как правильно передавать данные между потоками? Списком вроде небезопасно
У меня есть несколько потоков-воркеров, которые обрабатывают задания, и один поток, который эти задания производит. Сейчас держу задания в обычном списке и беру через list.pop(), но боюсь гонок и непонятно, как воркеру дождаться нового задания, не крутя while True с sleep.
tasks = []
def producer():
for i in range(10):
tasks.append(i)
def worker():
while tasks:
item = tasks.pop() # а если два потока одновременно?
...
Как сделать это нормально, по-человечески?
2 ответа
Для обмена данными между потоками в стандартной библиотеке есть готовая потокобезопасная очередь — queue.Queue. Она сама внутри держит замок, так что несколько потоков могут безопасно класть и забирать, и вдобавок умеет блокироваться: get() сам уснёт, пока в очереди не появится элемент — никакого ручного while + sleep.
import queue
import threading
q = queue.Queue()
def producer():
for i in range(10):
q.put(i) # положить задание
for _ in range(3):
q.put(None) # «отравленные пилюли» — сигнал воркерам завершиться
def worker():
while True:
item = q.get() # блокируется, пока нет элемента
if item is None: # получили сигнал стоп
q.task_done()
break
print(f"обрабатываю {item}")
q.task_done() # отметили, что задание выполнено
threads = [threading.Thread(target=worker) for _ in range(3)]
for t in threads:
t.start()
producer()
for t in threads:
t.join()
Ключевые методы:
put(x)— положить (по умолчанию блокируется, если очередь с лимитом и забита);get()— забрать, блокируется до появления элемента;task_done()/join()— если хочешь дождаться, пока ВСЕ положенные задания обработаны, вызывайq.join(), а воркеры на каждый элемент дёргаютtask_done().
Приём с None («poison pill») — стандартный способ корректно остановить воркеров: producer кладёт по одному None на каждый поток, тот видит его и выходит.
Важно не путать queue.Queue (для потоков, потокобезопасная) и collections.deque (просто структура данных, быстрая, но без блокировки/ожидания между потоками). Для межпоточного обмена бери именно queue.Queue.
Ещё: если используешь multiprocessing, у потоковой queue.Queue ничего не выйдет между процессами — там своя multiprocessing.Queue. Очереди не взаимозаменяемы.