Published on

bind 옵션으로 Celery 작업 인스턴스 가져오기

Authors

celery 를 이용한 작업을 정의할 때 데코레이터 함수를 사용하고, 이 내부에서 bind=True 옵션을 지정해 알아올 수 있습니다.

이를 통해서 작업을 수행할 함수안에서 작업 Id(pk) 를 가져오고 싶은 경우, 작업요청에 대한 메타데이터를 가져오고 싶은 경우에 활용 할 수 있습니다.

작업 함수에 self 인수가 전달되고, 이 self 인수는 작업 인스턴스를 참조하며, 이를 통해 작업의 메타데이터와 다양한 유틸리티 메서드에 접근할 수 있는 것 입니다.

1. 기본 작업 함수와 바인딩된 작업 함수의 차이

1.1 기본 작업 함수

기본 작업 함수는 단순히 작업의 본체만을 포함합니다.


@app.task
def add(x, y):
    return x + y

위 작업 함수는 x와 y를 더한 결과를 반환합니다. 이 함수는 Celery 작업의 메타데이터나 유틸리티 메서드에 접근할 수 없습니다.

1.2 바인딩된 작업 함수 (bind=True 사용)

바인딩된 작업 함수는 self 인수를 통해 작업 인스턴스에 접근할 수 있습니다.


@app.task(bind=True)
def add(self, x, y):
    return x + y

여기서 self는 작업 인스턴스에 대한 참조를 나타내며, 이를 통해 작업의 다양한 속성과 메서드에 접근할 수 있습니다.

2. 바인딩된 작업 함수의 장점

  • 작업 메타데이터 접근: 작업의 ID, 실행 시간, 재시도 횟수 등 작업과 관련된 메타데이터에 접근할 수 있습니다.
  • 유틸리티 메서드 사용: 작업 인스턴스에 내장된 다양한 유틸리티 메서드를 사용할 수 있습니다. 예를 들어, 작업을 재시도하거나 태를 업데이트하는 메서드에 활용될 수 있습니다.
  • 에러 처리 및 재시도: 작업 중에 예외가 발생했을 때, self.retry 메서드를 사용하여 작업을 재시도할 수 있습니다.

2.1 바인딩된 작업 함수에서 작업 메타데이터 사용


@app.task(bind=True)
def add(self, x, y):
    print(f"Task ID: {self.request.id}")  # 작업 ID 출력
    return x + y

에러 발생 시 작업 재시도


@app.task(bind=True, max_retries=3)
def add(self, x, y):
    try:
        if x < 0 or y < 0:
            raise ValueError("Negative values not allowed")
        return x + y
    except Exception as exc:
        print(f"Retrying due to error: {exc}")
        self.retry(exc=exc, countdown=5)  # 5초 후 재시도

3. self 에 대해

self를 통해 Celery 인스턴스의 일부 속성과 메서드에 접근할 수 있지만, 이 self는 Celery 인스턴스 자체가 아니라, Celery 작업 인스턴스를 나타냅니다.

3.1 self.request의 주요 속성

  1. self.request: 작업 요청에 대한 정보를 포함합니다.
  2. self.retry(): 작업을 재시도합니다.
  3. self.update_state(): 작업의 상태를 수동으로 업데이트합니다.
  4. self.name: 작업의 이름을 반환합니다.
  5. self.app: 현재 작업이 속한 Celery 애플리케이션 인스턴스를 반환합니다.
  6. self.AsyncResult: 작업 결과를 비동기적으로 가져올 수 있는 객체를 반환합니다.
  7. self.backend: 현재 사용 중인 백엔드에 접근할 수 있습니다.

3.2 코드 예시

이제 몇 가지 예시를 통해 bind=True를 사용하여 유용한 정보를 얻는 방법을 보여드리겠습니다

작업 이름 출력


@app.task(bind=True)
def print_task_name(self):
    print(f"task_name: {self.name}")

작업 상태 업데이트


@app.task(bind=True)
def long_running_task(self):
    self.update_state(state='PROGRESS', meta={'progress': 50})
    # 작업 수행
    return '완료'

작업 재시도


@app.task(bind=True, max_retries=3)
def add(self, x, y):
    try:
        if x < 0 or y < 0:
            raise ValueError("음수에러")
        return x + y
    except Exception as exc:
        print(f"재시도 횟수: {self.request.retries}")
        self.retry(exc=exc, countdown=5)

Celery 애플리케이션 이름 출력


@app.task(bind=True)
def print_app_name(self):
    print(f"App Name: {self.app.main}")

hongreat 블로그의 글을 봐주셔서 감사합니다!