파이썬(Python) - Queue 서비스 필요성
오랜만에 포스팅을 작성하게 되었습니다. 최근 정규직으로 이직한지 얼마되지 않아 블로그 관리에
소홀했던 것 같습니다.
최근 업무에서 파이썬 플라스크(Flask) + Redis Queue(RQ)기반 MSA 프로젝트를 진행하게 되어서
과거에 생각했던 주제와 부합하게 되어서 관련 내용을 간단하게 정리해보겠습니다.
많은 웹 서비스 기반 어플리케이션들은 동기적(Synchronous)으로 처리하는 프로세스가 많은 것이
사실입니다.
이는 한 개의 요청에 대해서 비즈니스 로직, DB 관련 작업 등을 하나의 프로세스 안에서 처리 후 요청자
에게 반환해주는 패턴이 많습니다.
요약하면 복잡한 연산 등 수행시간이 현저하게 오래 걸리는 요청을 받았을 경우 사용자 입장에서
대기시간의 증가로 이어질 수 있다는 뜻입니다.
사용자 입장에서 대기 시간의 증가는 결국 사이트 방문 감소 효과로 귀결 되는 것 같습니다.
다시 말하면 요청이 오래 걸리는 작업 요청 후 응답 대기를 하지 않고 다른 작업으로 연속성있게
전달되도록 구현해야할 필요성이 요구 되고 있습니다.
결론은 사용자 측면의 속도 개선과 서비스 안정화 등의 많은 장점을 얻기 위해 비동기(Asynchronous)
태스크 큐 작업이 필요다고 생각합니다.
관리 티어가 늘어나는 단점도 있지만, 다수의 요청을 만족하는 대용량 서비스를 위해서는 꼭 한 번 쯤
도입을 고려해야 하는 태스크라고 볼 수 있습니다.
보통은 Django, Flask 에서 Task Worker인 Celery 등을 많이 사용하는데 본 포스팅에서는 Worker는
다음 자료에서 다루기로 하고 Message Broker 중에서 RQ(Redis-Queue)와 연동하는 예제를
설명드리겠습니다.
그러면 하단에서 간단하게 예제를 작성해보겠습니다.
Flask 관련 포스팅 : Flask 관련
Flask 관련 포스팅 : Flask 관련
파이썬(Python) - ML &DL + RQ 서비스 Architecture
현재 제가 진행 중인 기계 학습 관련 웹 서비스 아키텍쳐 입니다.
중요한 부분은 Flask에서 넘어오는 많은 요청들을 Message Broker 인 RQ가 받아서 적재 후 순서대로
Worker에게 전달 후 처리한다는 것입니다.
여기서 Worker 역할을 하는 서비스(Celery 등)는 각각 플랫폼 마다 다양한 장단점을 포함하고 있어서
프로젝트 도입시에 꼼꼼히 레퍼런스 및 관련 자료를 검토 후 결정하셔야 될 것 같습니다.
그 후 사용자는 대기 시간없이 연속된 작업을 진행할 수 있게 되어 인터렉티브한 서비스를 제공받을 수
있습니다.
아키텍처 다이어그램(참고)
파이썬(Python) - ML &DL + RQ 서비스 소스코드
다운 받으신 소스코드를 실행하기 위해서는 기본적으로 개발 및 실행 환경 설정은 가상환경을
이용해서 진행해 주시면 됩니다.
예제를 실행하기전에 반드시 아래 필수 설치 패키지 목록을 참조해서 사전 설치 부탁드립니다.
그럼 아래 소스를 살펴보겠습니다.
필수 설치 패키지 목록 : Redis-Server, Redis, RQ
1. Redis-Server 설치 방법 : Windows, Linux -> 링크
2. Redis , RQ 설치 -> pip install redis rq
3. RQ Dashboard 설치 -> pip install rq-dashboard
필수 설치 패키지 목록 : Redis-Server, Redis, RQ
1. Redis-Server 설치 방법 : Windows, Linux -> 링크
2. Redis , RQ 설치 -> pip install redis rq
3. RQ Dashboard 설치 -> pip install rq-dashboard
소스 코드
rq_test.py
12345678910111213141516171819202122232425262728293031323334353637383940414243 import osimport timefrom rq import Connection, Queuefrom rq_task import test_fib def main(): # 범위 지정 fib_range = range(30, 40) # 큐 선언 및 작업 적재 q = Queue() async_results = [q.enqueue(test_fib, x) for x in fib_range] start_time = time.time() # 종료 플래그 done = False # 모든 큐 작업 종료시 까지 루프 while not done: os.system('clear') # 시작 시간 print('Asynchronously: (now = %.2f)' % (time.time() - start_time,)) done = True # 모든 큐 실행 for i, x in enumerate(async_results): # 결과 반환 result = x.return_value if result is None: done = False result = '(calculating)' # 결과 콘솔 출력 print('fib(%d) = %s' % (i, result)) print('') # CLI에서 실행 추천 print('To start the actual in the background, run a worker:') print(' python examples/run_worker.py') # 잠시 대기 time.sleep(0.2) print('Done') if __name__ == '__main__': with Connection(): main() cs
- 3번 라인 : Redis Server 연결 및 Message Broker 서비스에 필요한 Connection, rq 임포트
- 4번 라인 : 사용자 요청으로 가정해서 미리 작성해 놓은 연산 작업이 많이 필요한 재귀 함수 관련 메소드 임포트
- 12번 라인 : enqueue 메소드를 통해서 Task를 큐에 적재
- 18번 ~ 39번 라인 : 차근차근 보시면 어렵지 않은 부분입니다. 큐 작업이 Worker에 종료 될 때 까지 대기합니다.
- 42번 라인 : Redis Server 연결 및 프로그램 시작 구문
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 | import os import time from rq import Connection, Queue from rq_task import test_fib def main(): # 범위 지정 fib_range = range(30, 40) # 큐 선언 및 작업 적재 q = Queue() async_results = [q.enqueue(test_fib, x) for x in fib_range] start_time = time.time() # 종료 플래그 done = False # 모든 큐 작업 종료시 까지 루프 while not done: os.system('clear') # 시작 시간 print('Asynchronously: (now = %.2f)' % (time.time() - start_time,)) done = True # 모든 큐 실행 for i, x in enumerate(async_results): # 결과 반환 result = x.return_value if result is None: done = False result = '(calculating)' # 결과 콘솔 출력 print('fib(%d) = %s' % (i, result)) print('') # CLI에서 실행 추천 print('To start the actual in the background, run a worker:') print(' python examples/run_worker.py') # 잠시 대기 time.sleep(0.2) print('Done') if __name__ == '__main__': with Connection(): main() | cs |
- 3번 라인 : Redis Server 연결 및 Message Broker 서비스에 필요한 Connection, rq 임포트
- 4번 라인 : 사용자 요청으로 가정해서 미리 작성해 놓은 연산 작업이 많이 필요한 재귀 함수 관련 메소드 임포트
- 12번 라인 : enqueue 메소드를 통해서 Task를 큐에 적재
- 18번 ~ 39번 라인 : 차근차근 보시면 어렵지 않은 부분입니다. 큐 작업이 Worker에 종료 될 때 까지 대기합니다.
- 42번 라인 : Redis Server 연결 및 프로그램 시작 구문
rq_task.py
rq_task.py
123456 # 테스트 함수 def test_fib(n): if n <= 1: return 1 else: return test_fib(n-1) + test_fib(n-2) cs
- 2번 ~ 6번 라인 : 흔한 팩토리얼 공식 예제입니다.
N이 커질 수록 연산량 및 시간이 늘어나겠죠? Queue에 적재 됩니다.
1 2 3 4 5 6 | # 테스트 함수 def test_fib(n): if n <= 1: return 1 else: return test_fib(n-1) + test_fib(n-2) | cs |
- 2번 ~ 6번 라인 : 흔한 팩토리얼 공식 예제입니다.
N이 커질 수록 연산량 및 시간이 늘어나겠죠? Queue에 적재 됩니다.
Redis추가적인 환경 설정 및 다중 서버 구성 방법은 이 곳을 참고해 주세요.
Redis추가적인 환경 설정 및 다중 서버 구성 방법은 이 곳을 참고해 주세요.
rq_worker.py
rq_worker.py
123456 from rq import Connection, Queue, Worker if __name__ == '__main__': with Connection(): q = Queue() Worker(q).work() cs
- 1번 라인 : Worker가 임포트 된 것을 확인합니다.
- 6번 라인 : Queue에서 넘어온 Task를 Worker가 처리하는 구문
1 2 3 4 5 6 | from rq import Connection, Queue, Worker if __name__ == '__main__': with Connection(): q = Queue() Worker(q).work() | cs |
- 1번 라인 : Worker가 임포트 된 것을 확인합니다.
Python RQ(Redis Queue)의 추가적인 상세 설명은 이 곳을 참고해 주세요.
Python RQ(Redis Queue)의 추가적인 상세 설명은 이 곳을 참고해 주세요.
파이썬(Python) - ML &DL + RQ 서비스 구동 예시
아래 이미지로 Redis Server 실행 부터 총 4단계의 과정으로 RQ 실행 과정을 GIF 이미지로 확인하실 수 있습니다.
실행 화면 - Redis Queue Worker 기동
실행 화면 - RQ Dashboard 실행 후 브라우저에서 확인
실행 화면 - 태스크(Task) 실행 후 Queue 동작 확인(Dashboard)

실행 화면 - 태스크 실행 후 Queue 정상 동작 확인(콘솔)
Flask 공식 Reference : Flask 관련
Flask 공식 Reference : Flask 관련
마무리
이번 예제에서는 머신러닝, 딥러닝 서비스 및 다중 요청 서비스를 위한 Redis-Queue 서비스를
간단하게 구현해 보았습니다.
마이크로 서비스 아키텍쳐(MSA)에서 Backing service 파트에서 보면 보편적으로 Message-Queue
를 활용한 비동기 요청-응답 패턴을 적극적으로 활용하고 있습니다.
단일 서비스에서 한 개의 트랜잭션에 다양한 서비스들이 강한 결합으로 묶여 있을 경우에는 서비스
요청 및 데이터의 손실이라는 이슈 가능성이 높아지고 유연성이 떨어지는 것이 사실입니다.
이 때 메시지 큐를 활용한 트랜잭션 분리 전략은 장애 및 기타 이슈시에 서비스의 보존(영속성)이라는
장점을 얻을 수가 있어 안정적이고 대량 요청-응답 서비스에 적합하다고 볼 수 있습니다.
이번 예제를 통해서 다양한 큐 패키지(RebbitMQ, Kafka 등)장단점을 분석해서 적용하고자 하는
적합한 서비스 아키텍쳐에 대해서 생각해보는 시간이 되었으면 합니다.
다음 포스팅에서도 좋은 예제로 찾아뵙겠습니다.
감사합니다.
소스코드 다운로드 :
flask-with-rq-example.zip
'언어 > Python' 카테고리의 다른 글
파이썬(Python) - 네임드튜플(namedtuple) 사용 예제 및 소스코드 (3) | 2019.08.18 |
---|---|
파이썬(Python) - 매직(Special) 메소드 및 연산자 오버라이딩 설명 및 실습 (2) | 2019.07.23 |
파이썬(Python) - 템플릿 메소드(Template Method) 패턴(디자인패턴) (0) | 2019.03.06 |
파이썬(Python) - 팩토리 메소드(Factory Method) 패턴(디자인패턴) (0) | 2019.02.25 |
파이썬 아나콘다(Anaconda) - 심화 사용법 정리(2) - env 명령어 (2) | 2019.01.26 |