Узел-подписчик и обработка данных

Вторая половина publish/subscribe — узел-подписчик, который слушает топик и реагирует на данные.

Подписчик (subscriber) — объект узла, созданный методом create_subscription; при каждом новом сообщении в топике он вызывает заданный колбэк.

Что мы строим

Сделаем узел безопасности: он подписывается на лидар /scan и, если впереди ближе 0.3 метра появилось препятствие, печатает предупреждение. Это прообраз модуля экстренной остановки.

Код подписчика

Снова language-text — нужен ROS-рантайм.

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import LaserScan

class SafetyWatch(Node):
    def __init__(self):
        super().__init__('safety_watch')
        self.sub = self.create_subscription(
            LaserScan, '/scan', self.on_scan, 10)

    def on_scan(self, msg):
        # ranges — список расстояний по кругу
        front = msg.ranges[len(msg.ranges) // 2]   # луч прямо вперёд
        if front < 0.3:
            self.get_logger().warn(f'Препятствие! {front:.2f} м')

def main():
    rclpy.init()
    node = SafetyWatch()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

Как это работает

create_subscription(LaserScan, '/scan', self.on_scan, 10) говорит: «подпишись на топик /scan с типом LaserScan, и при каждом сообщении вызывай on_scan». Колбэк on_scan получает готовый объект сообщения. Поле ranges — это список расстояний по кругу; средний элемент примерно соответствует направлению «прямо вперёд». Сравниваем с порогом 0.3 м и при необходимости предупреждаем.

Чистая логика отдельно — её можно запустить

Сам узел не исполним в браузере, но алгоритм поиска ближайшего препятствия — это обычный Python. Вынесем его и проверим прямо здесь кнопкой «Запустить».

import math

# имитация одного скана лидара: 8 лучей по кругу, метры
ranges = [2.5, 1.8, 0.9, 0.25, 0.4, 1.2, 3.0, 2.1]
THRESHOLD = 0.3

nearest = min(ranges)
idx = ranges.index(nearest)
angle_deg = idx * (360 / len(ranges))

print(f'Ближайшее препятствие: {nearest} м под углом {angle_deg:.0f}')
if nearest < THRESHOLD:
    print('СТОП: слишком близко!')
else:
    print('Путь свободен')

Вывод:

Ближайшее препятствие: 0.25 м под углом 135
СТОП: слишком близко!

Издатель и подписчик в одном узле

На практике узлы часто и слушают, и говорят. Узел объезда подписывается на /scan и публикует в /cmd_vel: получил данные о препятствии — тут же выдал команду «повернуть». Создавать и подписку, и издателя в одном __init__ — совершенно нормально.

Как работает под капотом

Когда приходит сообщение, DDS кладёт его в очередь подписчика. Executor в spin замечает это и вызывает ваш колбэк с десериализованным объектом. Если сообщения приходят быстрее, чем колбэк успевает их обрабатывать, лишние накапливаются в очереди глубиной из QoS (здесь 10); при переполнении старые отбрасываются. Поэтому колбэк должен быть быстрым — тяжёлую работу выносят в отдельный поток или узел.

Частые ошибки

  • Долгая работа в колбэке. Очередь переполняется, сообщения теряются, реакция запаздывает.
  • Несовпадение типа. Подписка на /scan с типом Image вместо LaserScan не соединится.
  • Обращение к ranges без проверки. Список бывает пустым или короче ожидаемого — индекс может выйти за границы.

Итоги

  • Подписчика создают через create_subscription с колбэком.
  • Колбэк получает готовый объект сообщения и должен быть быстрым.
  • Узел может одновременно и подписываться, и публиковать.
  • Глубина очереди QoS определяет, сколько сообщений буферизуется.
Проверьте себя
1. Что делает create_subscription?
AСоздаёт топик
BПодписывает узел на топик и задаёт колбэк для каждого сообщения
CПубликует сообщение
DУдаляет узел
2. Почему колбэк подписчика должен быть быстрым?
AИначе узел не запустится
BДолгая обработка переполняет очередь и сообщения теряются
CЭто требование синтаксиса Python
DЧтобы экономить заряд
3. Может ли один узел одновременно подписываться и публиковать?
AНет, только что-то одно
BДа, это обычная практика (например, объезд препятствий)
CТолько в ROS 1
DТолько на C++