Published on

celery 의 Exchange와 Routing Key

Authors

celery sqs 라우팅 글 위 글에서 celery를 사용할 때 작업을 큐에 라우팅 하는 방법을 알아봤습니다.

celery 에서는 필요없는 값들도 있었는데요.

Celery에서 작업 큐를 설정할 때 task_queues 에 담겨지는 객체을 통해서 교환과 라우팅키에 대해 추가적으로 알아보겠습니다.

1. Queue(큐)

  • 큐의 이름으로 작업이 대기하는 장소를 식별하는 데 사용됩니다.
  • 큐는 메시지가 대기하는 버퍼와 같은 역할을 하고, 첫번째 파라미터가 이름 그 자체가 되겠습니다.

2. Exchange(교환)

  • 메시지 브로커(예: RabbitMQ) 내에 위치합니다. 메시지 브로커는 메시지를 처리하고 라우팅하는 시스템으로, Exchange는 이 시스템의 핵심 구성 요소 중 하나입니다. ( SQS는 제공안함 )
  • 메시지를 큐로 라우팅하는 역할을 하는 구성 요소입니다. 교환은 메시지를 받아서 특정 큐로 전달하는 규칙을 정의합니다.
  • 유형
    • direct, topic, fanout 등의 유형으로 작업을 진행합니다.

2.1 Direct Exchange

라우팅 키와 큐의 바인딩 키가 정확히 일치할 때 메시지를 특정 큐로 직접 라우팅합니다.

메시지와 큐 사이의 일대일 매핑을 통해 라우팅됩니다.

  1. 메시지 생성: 메시지가 특정 라우팅 키와 함께 전송됩니다.
  2. 큐 바인딩: 큐가 특정 바인딩 키와 함께 교환에 바인딩됩니다.
  3. 메시지 라우팅: 메시지의 라우팅 키와 큐의 바인딩 키가 일치하면, 메시지가 해당 큐로 전달됩니다.

Direct Exchange는 특정 메시지를 특정 큐로 정확히(일대일) 라우팅해야 할 때 유용합니다.

(예를 들어, 특정 타입의 작업을 전담하는 워커가 있을 때 사용됩니다.)

  • 예시: 로그 레벨에 따라 다른 큐로 메시지를 보내는 경우.
    • 라우팅 키가 "info"인 메시지는 "info" 큐로.
    • 라우팅 키가 "error"인 메시지는 "error" 큐로.

from kombu import Exchange, Queue

exchange = Exchange('direct_logs', type='direct') # 미리 direct 선언
queue_info = Queue('info', exchange, routing_key='info')
queue_error = Queue('error', exchange, routing_key='error')

2.2 Topic Exchange

주제(?)를 기반으로 메시지를 라우팅합니다.

Topic Exchange는 라우팅 키의 패턴을 기반으로 메시지를 큐로 라우팅합니다.

참고) 라우팅 키는 점(.)으로 구분된 단어들로 구성 / 는 와일드카드(*, #)를 사용하여 패턴으로 파악해 메시지를 받을 수 있습니다.

  • * : 한 단어와 일치할때
  • # : 0개 이상의 단어와 일치할때(깡패)
  1. 메시지 생성: 메시지가 특정 라우팅 키와 함께 전송됩니다.
  2. 큐 바인딩: 큐가 특정 패턴과 함께 교환에 바인딩됩니다.
  3. 메시지 라우팅: 메시지의 라우팅 키가 큐의 바인딩 패턴과 일치하면, 메시지가 해당 큐로 전달됩니다.

Topic Exchange는 라우팅 규칙을 직접생각하는 구조 로직에 적용하기 유용합니다. 예를 들어, 내가 특정 뉴스의 주제 기반으로 라우팅을 해야하는 경우가 있다고 가정합니다.

  • 라우팅 키가 "news.coin.high"인 메시지는 "news.coin.*" 또는 "news.#" 패턴과 일치하는 큐로 전달됩니다.
pythonCopy code
from kombu import Exchange, Queue

exchange = Exchange('topic_logs', type='topic')
queue_sports = Queue('coin', exchange, routing_key='news.coin.*')
queue_all_news = Queue('all_news', exchange, routing_key='news.#')

2.3 Fanout Exchange

메시지를 모든 연결된 큐로 보내는 방식(브로드캐스트)입니다. 메시지가 Exchange(교환)에 도착하면, 라우팅 키와 상관없이 해당 교환에 연결된 모든 큐에 동일한 메시지를 복사하여 전달합니다.

이 방식은 특정 큐가 아닌 모든 큐에 동일한 메시지를 전달하고자 할 때 유용합니다. 예를 들어, 여러 큐가 동일한 작업을 병렬로 처리하거나, 다양한 서비스가 동일한 이벤트를 동시에 처리해야 하는 경우에 사용됩니다.

  1. 메시지 수신: 메시지가 Fanout Exchange로 도착합니다.
  2. 큐로 브로드캐스트: 이 메시지는 라우팅 키와 무관하게 Exchange에 연결된 모든 큐에 복사되어 전달됩니다.
  3. 큐에서 작업 소비: 각 큐에 도착한 메시지는 해당 큐를 소비하는 워커에 의해 처리됩니다.

Exchange에 queue1, queue2, queue3가 연결되어 있다고 가정해보겠습니다.

이때 메시지 A가 Fanout Exchange로 도착하면, 이 메시지는 queue1, queue2, queue3에 각각 복사되어 전달됩니다.


               +------------+
               |   Message A|
               +-----+------+
                     |
               +-----v-----+
               |  Exchange  |
               |   (fanout) |
               +-----+------+
                     |
   +-----------------+-----------------+
   |                 |                 |
+--v--+           +--v--+           +--v--+
|queue1|           |queue2|           |queue3|
+-----+           +-----+           +-----+

이런 방식은 각 큐에서 동일한 메시지를 독립적으로, 동시에 처리해야 할 때 유용합니다.

로그 메시지를 여러 로깅 서비스로 동시에 보내거나, 알림을 여러 서비스로 동시에 전송할 때 사용됩니다.

3. Routing Key(라우팅 키)

Exchange(교환) 가 메시지를 특정 큐로 라우팅할 때 사용하는 키입니다.

메시지가 교환에 도착할 때, 라우팅 키를 기반으로 어느 큐로 전달할지를 결정합니다.

즉, high_priority(높은 우선 순위)의 라우팅 키가 있을때, 높은 우선 순위를 처리하는 Exchange가 높은 우선 순위의 에 라우팅 해줍니다.


app.conf.task_queues = (
    Queue("high_priority", Exchange("high_priority"), routing_key="high_priority"),
    Queue("default", Exchange("default"), routing_key="default"),
)

4. 정리

Message → Exchage(라우팅 키) → Queue : 이 과정에서 celery 에는 Exchange 기능이 없기에 라우팅키 등의 설정이 필요없는 것이다.

  • 작업 생성 (Message)
    • Celery 애플리케이션에서 작업을 생성합니다. 이 작업은 메시지 형태로 큐에 전달될 것입니다.
  • 교환으로 라우팅 (Exchange with routing key)
  • 큐로 전달 (Queue)
    • Celery는 작업을 Amazon SQS 큐에 직접 보냅니다. 큐의 이름을 사용하여 명시적으로 큐를 지정합니다.
  • 워커가 큐에서 작업을 소비 (Worker processes the task)
    • Celery 워커는 Amazon SQS 큐에서 작업을 소비하여 처리합니다. SQS 큐에 쌓인 작업은 Celery 워커에 의해 처리되고 결과가 반환됩니다.

hongreat 블로그의 글을 봐주셔서 감사합니다!