← Все вопросы

Как правильно передавать данные между потоками? Списком вроде небезопасно

Задан 15 месяцев назад1.2к просмотров2 ответа
5

У меня есть несколько потоков-воркеров, которые обрабатывают задания, и один поток, который эти задания производит. Сейчас держу задания в обычном списке и беру через list.pop(), но боюсь гонок и непонятно, как воркеру дождаться нового задания, не крутя while True с sleep.

tasks = []

def producer():
    for i in range(10):
        tasks.append(i)

def worker():
    while tasks:
        item = tasks.pop()  # а если два потока одновременно?
        ...

Как сделать это нормально, по-человечески?

2 ответа

10
✓ Принятый ответ — помог автору

Для обмена данными между потоками в стандартной библиотеке есть готовая потокобезопасная очередь — queue.Queue. Она сама внутри держит замок, так что несколько потоков могут безопасно класть и забирать, и вдобавок умеет блокироваться: get() сам уснёт, пока в очереди не появится элемент — никакого ручного while + sleep.

import queue
import threading

q = queue.Queue()

def producer():
    for i in range(10):
        q.put(i)          # положить задание
    for _ in range(3):
        q.put(None)       # «отравленные пилюли» — сигнал воркерам завершиться

def worker():
    while True:
        item = q.get()    # блокируется, пока нет элемента
        if item is None:  # получили сигнал стоп
            q.task_done()
            break
        print(f"обрабатываю {item}")
        q.task_done()     # отметили, что задание выполнено

threads = [threading.Thread(target=worker) for _ in range(3)]
for t in threads:
    t.start()

producer()
for t in threads:
    t.join()

Ключевые методы:

  • put(x) — положить (по умолчанию блокируется, если очередь с лимитом и забита);
  • get() — забрать, блокируется до появления элемента;
  • task_done() / join() — если хочешь дождаться, пока ВСЕ положенные задания обработаны, вызывай q.join(), а воркеры на каждый элемент дёргают task_done().

Приём с None («poison pill») — стандартный способ корректно остановить воркеров: producer кладёт по одному None на каждый поток, тот видит его и выходит.

3

Важно не путать queue.Queue (для потоков, потокобезопасная) и collections.deque (просто структура данных, быстрая, но без блокировки/ожидания между потоками). Для межпоточного обмена бери именно queue.Queue.

Ещё: если используешь multiprocessing, у потоковой queue.Queue ничего не выйдет между процессами — там своя multiprocessing.Queue. Очереди не взаимозаменяемы.

Ваш ответ

Войдите, чтобы ответить на вопрос.
Поддержать проект