Передача сообщений: send и receive
Процессы оживают, когда начинают общаться. Отправка — send, приём — receive с паттерн-матчингом. Сообщения копятся в почтовом ящике (mailbox) процесса.
Каждый процесс — это маленький сервер: он ждёт сообщение, матчит его и реагирует. Mailbox гарантирует, что ни одно сообщение не потеряется.
Отправляем сообщение по PID через send, принимаем через receive, разбирая его шаблонами:
pid = spawn(fn ->
receive do
{:greet, name} -> IO.puts("Привет, #{name}!")
{:bye, name} -> IO.puts("Пока, #{name}.")
_ -> IO.puts("Не понял сообщение")
end
end)
send(pid, {:greet, "Ann"}) # процесс выведет: Привет, Ann!
Чтобы процесс жил и обрабатывал много сообщений, receive заворачивают в рекурсивный цикл:
defmodule Echo do
def loop do
receive do
{:say, msg, from} ->
send(from, {:echo, msg})
loop() # хвостовая рекурсия — цикл сервера
end
end
end
Как работает под капотом (BEAM)
У каждого процесса есть mailbox — очередь входящих сообщений. send асинхронен: он кладёт копию сообщения в mailbox получателя и тут же возвращается, не дожидаясь обработки. receive перебирает сообщения в mailbox и берёт первое, совпавшее с одним из шаблонов; остальные остаются ждать. Если совпадений нет, процесс блокируется (засыпает, не тратя CPU), пока не придёт подходящее сообщение или не истечёт after-таймаут. Цикл сервера — это хвостовая рекурсия, поэтому он крутится бесконечно в константной памяти.
send(pid, msg) --копия msg--> [ mailbox: msg3 msg2 msg1 ]
|
receive: матчит первое
подходящее, остальные ждут
Та же идея на Python ▶
Имитируем mailbox очередью и «receive» через разбор по шаблону сообщения.
from collections import deque
mailbox = deque() # как mailbox процесса
def send(msg): # асинхронно кладём в ящик
mailbox.append(msg)
def receive_one(): # как receive с матчингом
if not mailbox:
return "ящик пуст — процесс бы заснул"
msg = mailbox.popleft()
match msg:
case ("greet", name): return f"Привет, {name}!"
case ("bye", name): return f"Пока, {name}."
case _: return "Не понял сообщение"
send(("greet", "Ann"))
send(("bye", "Bob"))
print(receive_one()) # Привет, Ann!
print(receive_one()) # Пока, Bob.
Частые ошибки
- Забыть зациклить receive. Один
receiveобработает ровно одно сообщение, и процесс завершится — для сервера нужен рекурсивный цикл. - Переполнить mailbox. Если сообщения приходят быстрее, чем обрабатываются, ящик растёт и память течёт.
- Selective receive в горячем цикле. Когда нужное сообщение «глубоко» в ящике, перебор может быть дорогим — учитывайте порядок.
Best practices
- Тегируйте сообщения атомами (
{:greet, ...}) — это делает матчинг ясным и расширяемым. - Всегда замыкайте сервер хвостовым
loop()и держите состояние в аргументах цикла. - Добавляйте
after-таймаут вreceiveтам, где ожидание не должно длиться вечно.
Итог. send/receive — это азбука общения процессов. Но писать цикл, состояние и матчинг руками каждый раз утомительно и опасно. Эту рутину забирает OTP с его GenServer — финал нашего курса.
Синхронный запрос поверх асинхронных сообщений
Сам по себе send асинхронен и не возвращает ответа — это «отправил и забыл». Но часто нужен именно ответ: спросил процесс и жду результат. Реализуют это поверх асинхронных примитивов так: отправитель кладёт в сообщение свой собственный PID (self()) и затем входит в receive, ожидая ответ; получатель обрабатывает запрос и шлёт результат обратно по этому PID. Получается синхронный «запрос-ответ» из двух асинхронных сообщений — ровно этот паттерн позже инкапсулирует GenServer.call.
Чтобы такое ожидание не зависало навсегда, в receive добавляют ветку after с таймаутом: after 5000 -> {:error, :timeout}. Это страховка от ситуации, когда ответ так и не пришёл (процесс упал, потерялось сообщение). Привычка ставить таймаут на ожидание — признак зрелого кода: висящий в receive процесс молчалив и незаметен, пока однажды не обернётся утечкой. Когда мы дойдём до GenServer, вы увидите, что таймаут call'а встроен по умолчанию именно из этих соображений.