2021년 11월 12일 금요일

airflow dynamic task at runtime에 대한 고찰

airflow에서 runtime에 동적으로 태스크를 생성하는 것이 옳은 일인가? (task들의 정보는 외부에 존재한다고 가정한다. 그 정보를 가져오려면 수 분이 소요될 수 있다.)

지금까지 스케쥴러에서 dag(flow)를 고정시켜놓고만 사용했었다.
일반적으로 스케쥴러에서 가장 가까운 곳에서 meta 정보를 불러와서 그 정보로 dag를 그린다. 그리고 그 meta 정보는 아마 대부분 스케줄러 DB일 것이다.

airflow를 사용하다가 dag안에 task를 런타임에 동적으로 생성해야하는 경우가 있다면 어떻게 해야할까 고민을 했다. 동적으로 그릴 정보는 airflow DB가 아닌 멀리 떨어져 있는 곳에 존재한다고 가정하자. 참고로 airflow variable은 airflow meta DB에 저장된다. 따라서 dag 정보를 variable에 저장한다면 그 것은 동적이 아닌 정적이라고 봐야한다. 어쨌든 task가 수정되기 위해서는 variable도 수동으로 수정되어야하니까..

조금 더 상세하게 시나리오를 짠다면 어떤 DB에 쿼리를 수행하여 해당 결과 행 만큼 task1, task2, task3.. taskN 을 생성하고 이 task1~taskN을 병렬수행할 수 있겠다.

즉 task_list.append(task1~taskN)로 task_list를 만들고 task_start > task_list > task_end 형태로 그려지는 형태일 것이다.

결론먼저 이야기한다면 개인적으로는 권장하지 않는다. task는 정적으로 생성되어야 한다고 생각한다. (아닐 수도 있지만..)

예를 들어 task들의 정보가 hive에서 select로 가져올 경우 일단 느리다. 1분정도밖에 안걸린다고 하더라도 서비스에서 1분이면 못 기다릴 시간이다. 어떻게든 결과를 가져와서 그린다고 할 지라도 dag가 엄청 느리게 그려지겠지.. 혹시 몰라서 subdag에 넣고 하면 그나마 괜찮을까 했었는데 여러 비슷한 레퍼런스에서도 그런식으로 작성을 했었고.. 시도해봤지만 실패다.

게다가 airflow process 중에 dag manager가 주기적으로 수행하면서 dag를 최신화(refresh) 작업을 하는데 (마치 heartbeat처럼) 그럼 그 때마다 쿼리 결과를 날려야한다면? 그리고 그 쿼리가 3~5분이 걸린다면 재앙이다.

runtime에 수행되도록 이리 저리 머리를 굴려봐도 결과적으로는 realtime으로 task 형상이 보여야하기 때문에 안된다고 생각을 하고 정적으로 flow를 구성하는 것을 권장한다.

하지만 우리에겐 언제나 해결책이 있기 때문에 혹시 비슷한 고민을 했던 분들 중 airflow dynamic task(dag, subdag) at runtime best practice가 존재한다면 댓글 부탁드립니다.

댓글 2개:

  1. 비슷한 고민을 한 글을 보니 반갑네요. Task 정적 구성이 권장되어야 한다는 의견에 동의합니다.

    Airflow 1.x에서 Dag/Task를 수천 개 규모로 런타임 생성하면서 여러 이슈를 겪었습니다.
    외부 저장소에서 스케줄 정의를 가져올 때는, 스케줄 정의에 변화가 생겼을 때만 가져오고, 스케줄 정의를 가져와서 Airflow 스케줄러 노드의 인메모리 캐시에 넣어 두는 방법을 생각할 수 있습니다.
    하지만 그래도 문제는 존재하는데, Dag과 Operator 생성자들을 처리하는 것은 여전히 로드가 크기 때문입니다. 그래서 이 객체들을 업데이트시에만 생성해서 Pickle로 바이너리화 시켜두고 빠르게 불러오는 구조도 고려가 가능하겠습니다.
    그러나 여전히 문제가 존재하는데, 말씀하신 하트비트 뿐만 이니라 각각의 Task를 실행하기 전에도 DagBag을 갱신하는 동작이 있기 때문에.. 하나의 task를 실행할 때 전체 Dag/Operator가 업데이트되는 문제도 있더군요.. 하지만 DAG과 SubDAG을 포함하여 수천 개 정도의 스케줄은 유지 가능했습니다.


    Airflow 2.x에서는, 경험이 없어서 잘 모르지만, 매번 DAG 업데이트를 하지 않고, 변경이 되었을 때만 업데이트를 해 주는 방식으로 개선이 된 것으로 알고 있습니다. (역시 여기서도 DAG 정의파일이, 동적 로드를 위한 파일 하나만 존재할 때는 전체 DAG을 갱신하게 되는 문제가 있겠죠.. 기회가 되면 2.x에서도 테스트해 보고 싶네요)


    이야기를 하다 보니 Task의 동적 구성이 아닌 DAG 동적 구성으로 쓰게 되었는데 , 성능적인 관점 외에 기능적인 관점 에서도 Airflow의 Task를 동적으로유연하게 구성하는 것은 상당히 불편했습니다. UI적인 측면에서도 컨트롤을 하기가 힘들었구요 . 기회가 된다면 스케줄을 좀더 플렉서블하게 동적으로 구성할 수 있는 스케줄러를 새로 만들어 보고 싶네요..

    답글삭제
    답글
    1. 댓글감사합니다.. 같은 고민하는 분이 계셔서 반갑네요 말씀하신 부분에 모두 동감합니다. airflow가 기능 추가해서 넣는건 편해도 로드하것 때문에 그 만큼 무거워지는 것도 있고 확실히 ui측면에서 컨트롤하기가 불편하긴 합니다.(log 찾아보는거나 backfill 부분) 커스터마이징이 쉽다는게 에어플로우의 장점 중 하나일텐데 동적구성이 어렵다는게 조금 아쉽네요. 스케쥴러 야무지게 만들어주시면 야무지게 사용해보도록 하겠습니다 ㅎㅎ

      삭제

2022년 회고

 올해는 블로그 포스팅을 열심히 못했다. 개인적으로 지금까지 경험했던 내용들을 리마인드하자는 마인드로 한해를 보낸 것 같다.  대부분의 시간을 MLOps pipeline 구축하고 대부분을 최적화 하는데 시간을 많이 할애했다. 결국에는 MLops도 데이...