2021년 11월 12일 금요일

airflow dynamic task at runtime에 대한 고찰

airflow에서 runtime에 동적으로 태스크를 생성하는 것이 옳은 일인가? (task들의 정보는 외부에 존재한다고 가정한다. 그 정보를 가져오려면 수 분이 소요될 수 있다.)

지금까지 스케쥴러에서 dag(flow)를 고정시켜놓고만 사용했었다.
일반적으로 스케쥴러에서 가장 가까운 곳에서 meta 정보를 불러와서 그 정보로 dag를 그린다. 그리고 그 meta 정보는 아마 대부분 스케줄러 DB일 것이다.

airflow를 사용하다가 dag안에 task를 런타임에 동적으로 생성해야하는 경우가 있다면 어떻게 해야할까 고민을 했다. 동적으로 그릴 정보는 airflow DB가 아닌 멀리 떨어져 있는 곳에 존재한다고 가정하자. 참고로 airflow variable은 airflow meta DB에 저장된다. 따라서 dag 정보를 variable에 저장한다면 그 것은 동적이 아닌 정적이라고 봐야한다. 어쨌든 task가 수정되기 위해서는 variable도 수동으로 수정되어야하니까..

조금 더 상세하게 시나리오를 짠다면 어떤 DB에 쿼리를 수행하여 해당 결과 행 만큼 task1, task2, task3.. taskN 을 생성하고 이 task1~taskN을 병렬수행할 수 있겠다.

즉 task_list.append(task1~taskN)로 task_list를 만들고 task_start > task_list > task_end 형태로 그려지는 형태일 것이다.

결론먼저 이야기한다면 개인적으로는 권장하지 않는다. task는 정적으로 생성되어야 한다고 생각한다. (아닐 수도 있지만..)

예를 들어 task들의 정보가 hive에서 select로 가져올 경우 일단 느리다. 1분정도밖에 안걸린다고 하더라도 서비스에서 1분이면 못 기다릴 시간이다. 어떻게든 결과를 가져와서 그린다고 할 지라도 dag가 엄청 느리게 그려지겠지.. 혹시 몰라서 subdag에 넣고 하면 그나마 괜찮을까 했었는데 여러 비슷한 레퍼런스에서도 그런식으로 작성을 했었고.. 시도해봤지만 실패다.

게다가 airflow process 중에 dag manager가 주기적으로 수행하면서 dag를 최신화(refresh) 작업을 하는데 (마치 heartbeat처럼) 그럼 그 때마다 쿼리 결과를 날려야한다면? 그리고 그 쿼리가 3~5분이 걸린다면 재앙이다.

runtime에 수행되도록 이리 저리 머리를 굴려봐도 결과적으로는 realtime으로 task 형상이 보여야하기 때문에 안된다고 생각을 하고 정적으로 flow를 구성하는 것을 권장한다.

하지만 우리에겐 언제나 해결책이 있기 때문에 혹시 비슷한 고민을 했던 분들 중 airflow dynamic task(dag, subdag) at runtime best practice가 존재한다면 댓글 부탁드립니다.

2022년 회고

 올해는 블로그 포스팅을 열심히 못했다. 개인적으로 지금까지 경험했던 내용들을 리마인드하자는 마인드로 한해를 보낸 것 같다.  대부분의 시간을 MLOps pipeline 구축하고 대부분을 최적화 하는데 시간을 많이 할애했다. 결국에는 MLops도 데이...