- Published on
bind 옵션으로 Celery 작업 인스턴스 가져오기
- Authors
- Name
- hongreat
- ✉️hongreat95@gmail.com
celery 를 이용한 작업을 정의할 때 데코레이터 함수를 사용하고, 이 내부에서 bind=True
옵션을 지정해 알아올 수 있습니다.
이를 통해서 작업을 수행할 함수안에서 작업 Id(pk) 를 가져오고 싶은 경우, 작업요청에 대한 메타데이터를 가져오고 싶은 경우에 활용 할 수 있습니다.
작업 함수에 self 인수가 전달되고, 이 self 인수는 작업 인스턴스를 참조하며, 이를 통해 작업의 메타데이터와 다양한 유틸리티 메서드에 접근할 수 있는 것 입니다.
1. 기본 작업 함수와 바인딩된 작업 함수의 차이
1.1 기본 작업 함수
기본 작업 함수는 단순히 작업의 본체만을 포함합니다.
@app.task
def add(x, y):
return x + y
위 작업 함수는 x와 y를 더한 결과를 반환합니다. 이 함수는 Celery 작업의 메타데이터나 유틸리티 메서드에 접근할 수 없습니다.
1.2 바인딩된 작업 함수 (bind=True 사용)
바인딩된 작업 함수는 self 인수를 통해 작업 인스턴스에 접근할 수 있습니다.
@app.task(bind=True)
def add(self, x, y):
return x + y
여기서 self는 작업 인스턴스에 대한 참조를 나타내며, 이를 통해 작업의 다양한 속성과 메서드에 접근할 수 있습니다.
2. 바인딩된 작업 함수의 장점
- 작업 메타데이터 접근: 작업의 ID, 실행 시간, 재시도 횟수 등 작업과 관련된 메타데이터에 접근할 수 있습니다.
- 유틸리티 메서드 사용: 작업 인스턴스에 내장된 다양한 유틸리티 메서드를 사용할 수 있습니다. 예를 들어, 작업을 재시도하거나 태를 업데이트하는 메서드에 활용될 수 있습니다.
- 에러 처리 및 재시도: 작업 중에 예외가 발생했을 때, self.retry 메서드를 사용하여 작업을 재시도할 수 있습니다.
2.1 바인딩된 작업 함수에서 작업 메타데이터 사용
@app.task(bind=True)
def add(self, x, y):
print(f"Task ID: {self.request.id}") # 작업 ID 출력
return x + y
에러 발생 시 작업 재시도
@app.task(bind=True, max_retries=3)
def add(self, x, y):
try:
if x < 0 or y < 0:
raise ValueError("Negative values not allowed")
return x + y
except Exception as exc:
print(f"Retrying due to error: {exc}")
self.retry(exc=exc, countdown=5) # 5초 후 재시도
3. self 에 대해
self
를 통해 Celery 인스턴스의 일부 속성과 메서드에 접근할 수 있지만, 이 self
는 Celery 인스턴스 자체가 아니라, Celery 작업 인스턴스를 나타냅니다.
3.1 self.request의 주요 속성
- self.request: 작업 요청에 대한 정보를 포함합니다.
- self.retry(): 작업을 재시도합니다.
- self.update_state(): 작업의 상태를 수동으로 업데이트합니다.
- self.name: 작업의 이름을 반환합니다.
- self.app: 현재 작업이 속한 Celery 애플리케이션 인스턴스를 반환합니다.
- self.AsyncResult: 작업 결과를 비동기적으로 가져올 수 있는 객체를 반환합니다.
- self.backend: 현재 사용 중인 백엔드에 접근할 수 있습니다.
3.2 코드 예시
이제 몇 가지 예시를 통해 bind=True
를 사용하여 유용한 정보를 얻는 방법을 보여드리겠습니다
작업 이름 출력
@app.task(bind=True)
def print_task_name(self):
print(f"task_name: {self.name}")
작업 상태 업데이트
@app.task(bind=True)
def long_running_task(self):
self.update_state(state='PROGRESS', meta={'progress': 50})
# 작업 수행
return '완료'
작업 재시도
@app.task(bind=True, max_retries=3)
def add(self, x, y):
try:
if x < 0 or y < 0:
raise ValueError("음수에러")
return x + y
except Exception as exc:
print(f"재시도 횟수: {self.request.retries}")
self.retry(exc=exc, countdown=5)
Celery 애플리케이션 이름 출력
@app.task(bind=True)
def print_app_name(self):
print(f"App Name: {self.app.main}")