Published on

Celery와 AWS SQS를 사용한 작업 라우팅 및 처리방법과 옵션 기능으로 기본개념 알아보기

Authors

특정 로직을 수행하는 함수 A 가 있습니다. 이 함수가 여러작업을 공유하는 큐가 아닌, 하나의 큐에서 작업이 In and Out 되도록 하는 방법과 기록합니다.

튜토리얼 구현 환경

  • Django-DRF
  • Celery
  • AWS SQS

1. Celery 설정 (celery_app.py)

Celery 애플리케이션을 설정을 해두는 것으로 어떤 환경에서 작업을 수행할 것 인지 세팅됩니다.

이곳에서 많은 설정들이 이뤄지며, 스케쥴링을 할 수 있게 하는 등의 (celery beat 세팅) celery app config를 수행 합니다.

1.1 코드


import os
from celery import Celery
from kombu import Exchange, Queue

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings.local")

# 이곳에서 기본적인 환경을 구성합니다.
app = Celery("config")
app.config_from_object("django.conf:settings", namespace="CELERY")

app.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="KST",
    enable_utc=True,
    result_backend=None,
    broker_url='sqs://<aws_access_key_id>:<aws_secret_access_key>@',
    broker_transport_options={
        'region': 'ap-northeast-2', # 각자의 환경으로 설정하면 됩니다.
        'polling_interval': 5,    # 메시지 폴링 간격
        'visibility_timeout': 60,  # 가시성 타임아웃
        'wait_time_seconds': 20,
        'queue_name_prefix': 'celery-',
    },
)

#SQS에서는 필요없음
app.conf.task_queues = (
    Queue("add_function", Exchange("add_function"), routing_key="add_function"),
    Queue("default", Exchange("default"), routing_key="default"),
)

#SQS에서는 필요없음
app.conf.task_routes = {
    "myapp.tasks.add_test": {"queue": "add_function"},
}

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

위의 Celery 애플리케이션을 설정을 하나씩 알아보겠습니다.

1.2 app.config 에 대해서

app.config 에 대한 파라미터 등을 정리했습니다.
  • Celery("config"): "config"라는 이름으로 Celery 애플리케이션을 생성합니다. 이 이름은 애플리케이션을 식별하는 데 사용됩니다.

  • config_from_object: Django 설정 파일에서 Celery 관련 설정을 로드합니다. namespace 파라미터는 Django 설정 파일에서 CELERY_ 접두사가 붙은 설정만 로드하도록 지정하는 것을 의미합니다.

  • task_serializer: "json" 으로 작업 메세지를 직렬화 한다는 의미입니다. 직렬화는 JSON을 사용하는 것이 가독성과 호환성이 뛰어납니다. YAML, Pickle, MessagePack 등의 옵션도 사용할 수 있지만, 여러 상황상 json이 범용성이 좋습니다. 다른 포맷을 사용하는 것도 각각 장단점이 존재하는데, 다음에 다뤄보겠습니다.

  • accept_content: Celery가 수락할 콘텐츠 유형을 지정합니다. 여기서는 JSON 형식만 허용합니다.

  • result_serializer: 작업 결과를 직렬화하는 데 사용할 포맷을 지정합니다. "json"은 JSON 형식을 사용합니다.

  • timezone: 사용할 시간대를 "KST"는 한국 표준시로 설정합니다. 애플리케이션의 운영 시간대를 설정하여 로컬 시간대를 기준으로 작업을 스케줄링하거나 시간을 표시할 때 지정합니다.

  • enable_utc: UTC 사용 여부를 설정합니다. True로 설정하면 UTC를 사용합니다. 시스템 간의 시간 일관성을 유지하고, 전역적으로 일관된 시간 기준을 제공하기 위해 UTC를 사용합니다.

  • result_backend: 작업 결과를 저장할 백엔드를 지정합니다. 저는 None으로 설정하여 결과 백엔드를 사용하지 않았습니다. result_backend를 사용하는 경우는 아래 케이스에서 유용할 것 같습니다.

    • 작업 상태 추적: 작업이 얼마나 진행되었는지, 성공했는지 실패했는지 등의 상태를 추적해야 할 경우.
    • 영구 저장: 작업 결과를 영구적으로 저장하여, 나중에 분석하거나 참조해야 하는 경우.
  • broker_url: 메시지 브로커 URL을 설정합니다. 어떤 메세지브로커를 사용할 것인지 지정하는 것과 같은 수준의 설정입니다.

  • broker_transport_options: SQS 브로커와 관련된 추가 옵션을 설정합니다.

    • region: SQS의 AWS 리전을 지정합니다.
    • polling_interval: SQS 큐를 폴링하는 간격(초)을 지정합니다. 여기서는 5초로 설정됩니다.
    • visibility_timeout: 메시지를 가져온 후 다른 소비자가 볼 수 없도록 하는 시간(초)을 지정합니다. 여기서는 60초로 설정됩니다. 전반적으로 소비자는 워커와 같은 개념으로, 왜 볼 수 없도록 하는지도 매우 중요한 개념이니..다음에 다뤄보겠습니다.
    • wait_time_seconds: SQS 롱 폴링 대기 시간을 지정합니다. 여기서는 20초로 설정됩니다.
    • queue_name_prefix: 큐 이름의 접두사를 지정합니다.

1.3 task queues

(이 부분은 SQS를 사용할 경우 필요하지 않은 부분입니다) Celery 작업 큐를 설정합니다. 각 큐는 이름, 교환(Exchange), 라우팅 키로 정의됩니다.

  • 이름: 큐를 식별하는 데 사용되며, 작업이 대기하는 장소를 나타냅니다.
  • 교환(Exchange): 메시지를 받아서 특정 큐로 라우팅하는 역할을 합니다.
  • 라우팅 키(Routing Key): 메시지가 교환에서 큐로 전달될 때 사용하는 키입니다.

1.4 task routes

(이 부분은 SQS를 사용할 경우 필요하지 않은 부분입니다) 특정 작업이 어느 큐로 라우팅될지를 설정합니다.

  • "myapp.tasks.add_test": {"queue": "add_function"}: add_test 작업이 add_function 큐로 라우팅되도록 설정합니다.

1.5 autodiscover_tasks

Django의 INSTALLED_APPS 설정에 정의된 애플리케이션에서 자동으로 작업을 검색합니다. 이를 통해 각 Django 앱에 정의된 Celery 작업이 자동으로 로드됩니다.

2. 작업 정의 (tasks.py)

위에서 celery 와 queue등을 어렵게 설정했고, 그렇게 설정되어 어떤 작업을 수행할 것인지 이곳에서 정의하면 됩니다. 작업이란 결국 하나의 함수로 실행하는 것이고, 그 내부에 어떤 함수들과 작업들이 있을지는 개개인마다 달라질 것 입니다.

2.1 작업 정의 코드

import time
from celery_app import app
from django.utils import timezone
from .models import TaskResult

@app.task(bind=True)
def add_test(self, x, y):
    start_time = time.time()

    # 작업 로직
    result = x + y
    task_id = self.request.id

	# 사실상 이것 때문에 result backend가 대체된것.
    TaskResult.objects.create(
        task_id=task_id,
        status="SUCCESS",
        result=result,
        date_done=timezone.now(),
    )
    return result

x, y 를 더해주는 간단한 작업입니다.

여기서 주목해야할 것은 bind 옵션인데, 이것을 True 로 설정하면, 작업인스턴스를 self 형태로 받아올 수 있습니다.

이를 통해 작업 id도 받아올 수 있고, 그를 TaskResult에 넣어두는 식으로 Result Backend를 내부적으로 구현할 수도 있겠습니다.

3. Celery 실행

위의 정의된 celery 를 드디어 실행해 볼 때가 왔습니다.

celery -A {작업 정의 모듈} worker -l info -Q {큐이름}

위에서 task.py 로 했기 때문에 경로에 따라 다르겠지만 root경로라면 task로 하고, add_function 큐를 지정해서 실행할 수 있습니다.

celery -A task worker -l info -Q add_function
옵션설명값 예시기본값
-A, --appCelery 애플리케이션을 지정합니다.-A myproject.celery_app
-l, --loglevel로그 레벨을 설정합니다.DEBUG, INFO, WARNING, ERROR, CRITICALINFO
-Q, --queues지정된 큐에서 작업을 소비합니다.-Q add_function,default

4. 작업 호출 (run_task.py or anything.py)

작업을 비동기로 호출하고 결과를 확인하는 방법입니다.

확인을 어디서 하느냐는 프로젝트마다 다르겠지만 클라이언트 단에서 하는 경우도 있고 상황마다 매우 다릅니다.

지금은 1인(?)이 작업을 만들고 확인하는 과정이므로 Self 확인 방법을 코드로 구현했습니다. (약간 all hit 북&장구 메타.)

4.1 작업 호출 코드

from myapp.tasks import add_test

def run_celery():
    # 작업을 비동기로 호출
    result = add_test.apply_async((4, 6), queue="add_function")

    # 작업 ID 확인
    task_id = result.id
    print(f"Task ID: {task_id}")

    # 작업의 결과를 기다림
    try:
        result_value = result.wait(timeout=30)  # 최대 10초 동안 대기
        print(f"Task Result: {result_value}")
    except TimeoutError:
        print("The task is still running after 30 seconds.")

# 실행
run_celery()

apply_async로 add_test 함수를 실행합니다. 여기서 add_test 의 self 인자는 무시됩니다.

실행할 때도 queue를 지정했습니다.

5. 추가 설정 및 최적화

작업을 최적화하고 문제가 생겼을때 파악(?)하거나 재시도하여 해결하는 방법도 있습니다.

5.1 가시성 타임아웃 및 재시도 설정

작업이 실패했을 때 자동으로 재시도하도록 설정합니다.

pythonCopy code
@app.task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 5, 'countdown': 10})
def add_test(self, x, y):
    try:
        result = x + y
        task_id = self.request.id

        # 작업 결과를 데이터베이스에 저장
        TaskResult.objects.create(
            task_id=task_id,
            status="SUCCESS",
            result=result,
            date_done=timezone.now(),
        )

        return result
    except Exception as exc:
        raise self.retry(exc=exc)

5.2 SQS 죽은 편지 큐(DLQ) 설정

메시지가 여러 번 실패한 후에는 DLQ로 이동시켜 추가적인 처리를 할 수 있습니다.

AWS SQS 콘솔에서 DLQ를 설정하고, 주 큐에서 "Redrive policy"를 통해 최대 재시도 횟수를 설정할 수 있습니다.

6. 마치며

이번에 celery 와 sqs를 이용한 까다로운 조건의 프로젝트를 작업하면서 알게된 내용을 간단하게 정리해봤습니다.

내용이 길어져서 큰 개요처럼 다뤄봤는데, celery 와 비동기 워커의 개념들을 차차 상세하게 다뤄보겠습니다.

  • hongreat 블로그의 글을 봐주셔서 감사합니다!^^
  • 내용에 잘못된 부분이나 의문점이 있으시다면 댓글 부탁 & 환영 합니다~!
  • (하단의 버튼을 누르시면 댓글을 보거나 작성할 수 있습니다.)
Buy Me A Coffee