2021년 12월 31일 금요일

2021년 회고

2021년은 이직을 하면서 조금 바쁜 해를 살았다. 새 회사와 도메인에 적응하기 위해 나름 바쁘게 살았고 기존에 알던 지식을 어떻게 사용할 수 있을지 많이 고민을 했었다. 결과적으로는 기술부채를 많이 쌓았지만 경험적인 측면에서는 많이 도움이 되었던 것 같다.


올해 포스팅은 대단한 것을 쓸 기회가 없었을 것 같았기 때문에 개인적으로 그 동안 알고 있던 내용을 정리하는 시간을 갖고자 했다. 데이터 엔지니어가 알아야할 기본적인 내용을 시리즈별로 작성했다.


1) 빅데이터 플랫폼 아키텍처에 대하여.. 데이터 스토리지 관점에서의 흐름
https://parksuseong.blogspot.com/2021/06/1.html
2) 빅데이터 플랫폼 아키텍처에 대하여.. 하둡을 알아보자
https://parksuseong.blogspot.com/2021/08/2.html
3) 빅데이터 플랫폼 아키텍처에 대하여.. 데이터를 처리를 위한 Spark
https://parksuseong.blogspot.com/2021/08/3-spark.html
4) 빅데이터 플랫폼 아키텍처에 대하여.. 배치 스케쥴러(airflow, azkaban, oozie)
https://parksuseong.blogspot.com/2021/08/4-airflow-azkaban-oozie.html
5) 빅데이터 플랫폼 아키텍처에 대하여.. 데이터 시각화를 위한 프레임워크(DashBoard 구현에 필요한 Grafana, Prometheus, influxDB 등)
https://parksuseong.blogspot.com/2021/08/5-dashboard-grafana-prometheus-influxdb.html
6) 빅데이터 플랫폼 아키텍처에 대하여.. 다른 팀과 협업 시 구성하면 좋은 프레임워크(hive, hue)
https://parksuseong.blogspot.com/2021/10/6-hive-hue.html


원래 기술블로그를 보면서 의견이 있어도 댓글을 안달고 그냥 머리속으로 끝냈는데 포스팅을 하다보니 댓글 하나하나가 큰 힘이되고 다시 생각을 정리하는 시간도 갖게되어서 댓글을 달아주신 분들이 너무 감사하다는 생각이 들었다.

특히 전 직장에서 기술적으로 많이 도움을 주셨던 장원주 담당님이 의견을 남겨주셔서 큰 힘이 되었고 댓글이 큰 힘이 되는 것을 알았기 때문에 나도 다른 블로그글들을 보면 댓글을 남겨야겠다고 생각을 했다.(SSG.COM 장원주 담당님 감사합니다.)


2021년은 코로나때문에 스스로도 많이 나태해졌고 무언가 열심히 찾아보고 새로운 방법을 찾아보려고 한 것이 없다는 것이 너무 아쉬운 한해가 되었다. 그나마 가장 열심히(?) 방법을 찾아보려고 했던 것이 있다면 airflow dynamic task at runtime인데 이 부분도 포스팅을 했는데 같은 어려움을 겪으면서 고민을 해 주신분이 계셨고 댓글을 남겨주셔서 감사했다.

해당 포스팅은 airflow dynamic task at runtime에 대한 고찰이며 같은 결론에 도달한 것이 다행이기도 하지만 아쉽기도 하였다.

( https://www.blogger.com/blog/post/edit/7933278769949015036/5051160839575531588 )


생각해보면 100% dynamic batch flow를 들어본 적도 본 적도 없기는 하다.


블로그 조회수



내년에는 쓸 것을 찾아서 더 열심히 써야겠다. 개인적으로 데이터 쪽이나 사이드 프로젝트를 진행하며 꾸준히 포스팅할 소스를 찾아볼까 한다.

아무튼 2022년에는 더 재밌는 일이 찾아오기를 바란다!

2021년 11월 12일 금요일

airflow dynamic task at runtime에 대한 고찰

airflow에서 runtime에 동적으로 태스크를 생성하는 것이 옳은 일인가? (task들의 정보는 외부에 존재한다고 가정한다. 그 정보를 가져오려면 수 분이 소요될 수 있다.)

지금까지 스케쥴러에서 dag(flow)를 고정시켜놓고만 사용했었다.
일반적으로 스케쥴러에서 가장 가까운 곳에서 meta 정보를 불러와서 그 정보로 dag를 그린다. 그리고 그 meta 정보는 아마 대부분 스케줄러 DB일 것이다.

airflow를 사용하다가 dag안에 task를 런타임에 동적으로 생성해야하는 경우가 있다면 어떻게 해야할까 고민을 했다. 동적으로 그릴 정보는 airflow DB가 아닌 멀리 떨어져 있는 곳에 존재한다고 가정하자. 참고로 airflow variable은 airflow meta DB에 저장된다. 따라서 dag 정보를 variable에 저장한다면 그 것은 동적이 아닌 정적이라고 봐야한다. 어쨌든 task가 수정되기 위해서는 variable도 수동으로 수정되어야하니까..

조금 더 상세하게 시나리오를 짠다면 어떤 DB에 쿼리를 수행하여 해당 결과 행 만큼 task1, task2, task3.. taskN 을 생성하고 이 task1~taskN을 병렬수행할 수 있겠다.

즉 task_list.append(task1~taskN)로 task_list를 만들고 task_start > task_list > task_end 형태로 그려지는 형태일 것이다.

결론먼저 이야기한다면 개인적으로는 권장하지 않는다. task는 정적으로 생성되어야 한다고 생각한다. (아닐 수도 있지만..)

예를 들어 task들의 정보가 hive에서 select로 가져올 경우 일단 느리다. 1분정도밖에 안걸린다고 하더라도 서비스에서 1분이면 못 기다릴 시간이다. 어떻게든 결과를 가져와서 그린다고 할 지라도 dag가 엄청 느리게 그려지겠지.. 혹시 몰라서 subdag에 넣고 하면 그나마 괜찮을까 했었는데 여러 비슷한 레퍼런스에서도 그런식으로 작성을 했었고.. 시도해봤지만 실패다.

게다가 airflow process 중에 dag manager가 주기적으로 수행하면서 dag를 최신화(refresh) 작업을 하는데 (마치 heartbeat처럼) 그럼 그 때마다 쿼리 결과를 날려야한다면? 그리고 그 쿼리가 3~5분이 걸린다면 재앙이다.

runtime에 수행되도록 이리 저리 머리를 굴려봐도 결과적으로는 realtime으로 task 형상이 보여야하기 때문에 안된다고 생각을 하고 정적으로 flow를 구성하는 것을 권장한다.

하지만 우리에겐 언제나 해결책이 있기 때문에 혹시 비슷한 고민을 했던 분들 중 airflow dynamic task(dag, subdag) at runtime best practice가 존재한다면 댓글 부탁드립니다.

2021년 10월 27일 수요일

Hive partition table로 DW를 구축할 때 고려할 점(upsert)

과거에 팀장님께서도 한번 주문했던 내용인데 하둡에 저장된 과거 데이터의 update 시나리오를 고민했던 적이 있다. 당시 결국 만족할만한 방법이 없어서 drop했던 내용인데 그 기억을 살려 hive를 기준으로 다시 포스팅을 해본다.


대부분 HDFS에 적재된 데이터는 Hive로 접근한다. HDFS 데이터를 가지고 DW를 구축할 때 Hive가 가장 편하긴 하지만 HDFS 특징으로 인해 Hive에서의 약간의 제약이 있다. 파일시스템이기 때문에 일반적인 DB처럼 row 1개를 delete, update를 하기 어렵기 때문이다.


대부분의 경우 하이브 테이블은 파티셔닝을 한다. 데이터의 양이 많고 매일 적재되기 때문에 파티셔닝은 필수다. 하지만 이런 경우 소급이 문제가 된다.

가령 과거 데이터를 새로 만들어서 insert overwrite table target_tbl partition(col='yyyymmdd') select * from source_tbl 형태로 데이터를 적재해보자. 이런 경우 upsert로 동작하지 않는다. 해당 파티션의 데이터를 delete 후 새로 엎어치는 작업을 한다. 즉 delete&copy 형태로 동작한다. 

이런 경우 그럼 지워지면 안되는 데이터도 날아가기 때문에 결과적으로는 통으로 데이터를 만들어줘야한다. 이런 경우 old 데이터와 new 데이터를 full outer join을 해서 통으로 새로 다시 만들거나 new_tbl left join old_tbl+ union all old_tbl not exists new_tbl 형태로 통으로 다시 만들어줘야한다.


insert는 해당 파티션에 단순 append이기 때문에 단순 데이터를 적재하는데 쓸수는 있지만 insert overwrite로 밀어넣는 형태라면 해당 파티션이 새로 적재된다는 사실이다.

파일시스템이라서 당연한 결과이기는 하지만 hive를 어떠한 형태로 사용하더라도 hdfs를 사용하기 때문에 방법이 없다. 또한 인덱스의 효과도 일반 DB와 비교해서 크지 않다.


그럼 결과적으로 통으로 새로 만들어야하는가? "그렇다."

그럼 어느 부분에서 효율을 낼 수 있나? 현재로서는 spark를 사용해서 연산 속도를 높이는 것 외에는 없어보인다.


다른 방법이기는 하지만 파티션을 일자 외에도 다른 컬럼을 추가해서 아주 잘(?) 사용한다면 방법이 있을 수도 있다. 하지만 권장하지는 않는다!

2021년 10월 2일 토요일

6) 빅데이터 플랫폼 아키텍처에 대하여.. 다른 팀과 협업 시 구성하면 좋은 프레임워크(hive, hue)

무슨 내용을 쓸까하다가 보안적인 부분에 대해서 포스팅을 안했기 때문에 이번에는 이 부분에 대해서 다뤄보려고 한다. 최근 깃랩에 사이드프로젝트를 만들어보니 토큰 발급이 필수로 바뀌어서 문득 다음 포스팅 주제도 보안적인 부분을 다루면 좋겠다고 생각했다.


선택사항이고 개인적으로 보안적인 부분은 최전방에서 최대한 막고 내부 동료들끼리는 개발 효율을 위해 최대한 풀어줘야한다는 생각이지만 이는 업에 따라 법적인 문제(생각보다 고려할 부분이 많다)도 있기 때문에 잘 고려해서 해야한다. 단지 이 포스팅은 한가지 시나리오일 뿐이다.

상황은 하둡 데이터를 접근해야한다. 당연히 관리자는 모두 접근이 가능하겠지만 만약 다른 부서 사람들이 접근을 해야하는 경우는 어떻게 해야할까? 데이터를 직접 뽑아줄 수도 있지만 분석가는 데이터를 불러와서 모델링을 해야하기 때문에 결국 open을 해야하는 이슈가 있다.

하둡 자체에 kerberos 인증이 있기는 하지만 우리가 로그인할 때의 수준이지 데이터 접근에 대한 개념이 아니다.


그럼 어떻게 할까.
1차적으로 하둡 페이지들에 대한 개인의 접근은 모두 막는다. 즉 모든 방화벽을 열어주지 않는다. 굳이 열어준다면 리소스 매니저만 열어준다.

그럼 사람들이 데이터는 어떻게 접근하는가?
hue만 열어준다. (hue는 절대 스케쥴러 용도가 아닌 단순 파티션이나 데이터 확인용!!) 
그럼 단지 쿼리만 날릴 수 있겠지만 불필요한 접근이 없게 될 것이다.

그리고 개발이나 배치는 hive를 통해서만 접근하도록 한다. 즉 개발서버에서 hive만 접근가능하게 열어주고 hive 쿼리만 날릴 수 있게 한다. 어차피 데이터는 쿼리로 접근할테니.. 중요한 것은 hdfs에 direct로 접근하지 못하게 하는 것이다.

체감되는 트랜드 순서가 MR -> Tez -> Spark인 것 같은데 hive on spark를 구성하면 좋겠지만 MR이든 Tez든 Spark든 모두 Spark로 처리할 필요는 없고 데이터의 사이즈에 따라서 MR이나 Tez를 써야하는 경우도 있다.

갑자기 다른데로 샌것같은데 아무튼 이번 포스팅은 보안(?)이라기보다는 어쨌든 아무나 접근 못하게 하는 방법(?)을 제안해보았다.

결론은 개인 컴퓨터에선 hue만 열어줘도 된다.

2021년 8월 28일 토요일

5) 빅데이터 플랫폼 아키텍처에 대하여.. 데이터 시각화를 위한 프레임워크(DashBoard 구현에 필요한 Grafana, Prometheus, influxDB 등)

1편 Bigdata Architecture, 2편 Hadoop, 3편 Spark, 4편 Scheduler에 이어서 5편은 무슨 주제로 포스팅을 할까 하다가.. Hive나 Hue같은 부가적인 프레임워크보다는 시각화를 먼저 쓰는게 좋다고 문득 생각이 들었다.

데이터를 저장(Hadoop)하고 Spark로 처리하고 Scheduler로 정기적인 작업을 할 수 있다면 꾸준히 output data가 어딘가에 쌓일 것이다. 그 것을 HDFS로 저장하거나 또는 다른 DB에 저장해서 필요할 때 가져다 쓰면 된다.


대시보드를 만들 때 Tableau, Power BI 등 유료 대시보드를 쓰면 여러 기능들을 사용할 수 있고 잘만들면 drill down을 하면서 Olap처럼 사용할 수도 있다. 하지만 서버 상태 모니터링이나 실적 등 처럼 정형화된 그래프만 봐도 충분한 경우가 있다. 이럴 땐 굳이 유료 대시보드를 구매할 필요 없이 범용적으로 사용하는 Grafana를 권장한다.


Grafana는 Graph뿐만 아니라 alert 등을 걸고 이메일 알림을 받을 수 있다. 또한 공통적으로 사용하는 화면들은 다른 사람들이 잘 만들어놓고 grafana 공식 홈페이지에 dash board를 검색하여 json을 가져다가 쓰기만 하면 된다.


Grafana labs


Grafana는 대시보드다. 그럼 데이터는 어디에 있는 것을 사용하면 될까? mysql, postgresql, influxDB 등 다양한 connector(plugins)가 존재하며 데이터가 있는 DB와 연결하면 일정 주기마다 데이터가 불러와지면서 화면에 그려준다. 보통은 쿼리를 작성하고 그 쿼리가 일정 주기마다 DB에 날린다.


불러온 데이터는 각종 Graph로 표현할 수 있다. 클릭 몇번으로도 구현이 될 정도로 아주 간단하다. 버전마다 사용가능한 차트가 다르고 다른 사람들이 작성해놓은 dash board를 다운받아서 사용할 경우 데이터 형식만 맞춰주면 그대로 가져다 쓸 수 있다.


보통 시계열 데이터는 InfluxDB에 넣고 사용을 한다. time series DB이기 때문에 Timestamp로 구분을 하고 PK를 사용하지 않기 때문에 메트릭정보, 실시간 로그 분석 등에 사용하기 알맞다. Opensource olap 중에 하나인 druid와 철학을 같이하는 것처럼 보인다.


이렇게 grafana에 direct로 DB를 연결해서 보여주는 방식도 있지만 별도 수집기를 둘 수도 있다. 이는 Prometheus라는 프레임워크가 해줄 수 있다. Prometheus 역시 time series로 데이터를 수집하며 각종 metric을 수집할 수 있다.

중요한 것은 데이터를 필요한 곳에 넣어주느냐, 데이터가 필요한 곳에서 가져가느냐의 차이이다. 둘다 가능하지만 후자가 조금 더 안전하다. Destination의 문제로 인해 데이터를 더 이상 받으면 안될 때에도 데이터를 억지로 밀어넣고 있는다면 당연히 장애가 심각해질 것이다. 따라서 push 방식보다는 pull 방식을 더 선호하고 이 외에도 데이터를 받는 곳에서 장애가 나게된다면 pull 방식은 단지 데이터를 가져가지 않을 뿐이니 문제가 없다. 단지 수집기로 쓰기에 알맞다.


과거에 포스팅을 했지만 프로메테우스는 확장하기가 어렵다. 확장을 하려면 프로메테우스들끼리 묶고 다시 상단에서 그 데이터를 pull해오는 방식으로 구현을 해야한다. 즉 프로메테우스를 층층이 쌓아서 피라미드(?)처럼 설계를 해야한다. 다른 방식이 있는지는 모르겠지만 그 정도로 프로메테우스를 heavy하게 사용할 일이 있을까?



프로메테우스 아키텍처이다. 위에서 언급했던 데로 Grafana에서 direct로 DB를 붙게된다면 15초, 30초 마다 한번씩 쿼리를 수행하기 때문에 DB입장에서는 부담이 될 수 있다. 하지만 프로메테우스를 사용한다면 프로메테우스가 그 DB역할을 대신해주고, 프로메테우스는 pull 방식으로 데이터를 수집하므로 조금 더 안전한 구조가 될 수 있다.

아키텍처처럼 Pushgateway를 두고 Prometheus server가 데이터를 수집할 수도 있고 수집대상에 exporter를 띄워서 Prometheus server가 수집할 수도 있다. 즉 Prometheus server가 몸통이다. 또한 여기에서도 Alertmanager를 별도로 두어 email 등의 알람을 줄 수 있다. 또한 Prometheus 자체가 데이터를 들고있으니 그래프를 그리는 것 또한 가능하다. 하지만 이쁘지 않기 때문에 그라파나를 통해서 그리는 것이 훨씬 더 이쁘게 그릴 수 있다.


Prometheus는 필수는 아니고 Dashboard를 위해서는 Grafana만 있으면 된다. 
Grafana를 잘 활용하자.


Hive Error) Cannot insert into target table because column number/types are different

Hive에 Insert를 할 때 이런 에러를 볼 수 있다. 
target table은 partition table이고 source 테이블보다 컬럼이 한개(div_col)가 더 많다. 
대충 Insert 구문은 이런식으로 작성을 했다. 
INSERT OVERWRITE TABLE TARGET_TBL_NMPARTITION(partition_col='yyyymm') SELECT '0' as div_col, * FROM SOURCE_TBL_NM

"Cannot insert into target table because column number/types are different : Table insclause-0 has N columns, and the N+1 columns are partitioned and we not required any filters we have to dump/store from non partitioned table to partitioned table."

넣고자 하는 데이터에 SELECT * FROM 구문이 문제가 되는 것으로 partition 컬럼이 return 되면서 컬럼이 한개가 더 +1 되는 상황이다.
그래서 이런 경우에는 타겟 테이블 컬럼 이름을 Select 절에 다 써줘서 갯수를 맞춰줘야한다.

즉 INSERT OVERWRITE TABLE TARGET_TBL_NMPARTITION(partition_col='yyyymm') SELECT '0' as div_col, Col1, Col2, Col3... FROM SOURCE_TBL_NM

컬럼명이 천개가 넘어가면 설계를 바꾸는것을 권장하고싶다. 굳이 통으로 한개를 들고다닐 필요가 없다면 말이다. 

교훈 : 테이블의 컬럼 갯수는 적당히 쓰자.. 테이블당 컬럼이 1000개가 넘어가면 골때린다.. hdfs를 사용하는 Hive가 Column store가 된다는 일은 앞으로도 없을테니까...



2021년 8월 22일 일요일

4) 빅데이터 플랫폼 아키텍처에 대하여.. 배치 스케쥴러(airflow, azkaban, oozie)

2, 3포스팅을 통해 데이터를 관리하는 하둡과 처리하는 스파크가 세팅되있다면 이제 정기적으로 작업를 수행할 수 있는 배치 스케쥴러가 필요하다.

스케쥴러란 정기적으로 원하는 시간에 특정 작업(스크립트 등)을 수행하기 위해 필요한 시스템이다. 자동화를 잘만 해놓는다면 상당히 많은 리소스를 절약할 수 있다.

빅데이터 진영에는 다양한 스케쥴러가 존재하는데 airflow, azkaban, oozie 등 많은데 아마 요즘은 airflow가 대세가 된 것 같다. 사실 스케쥴러는 단순히 스케쥴러 역할(마치 crontab처럼)이 있다면 배치를 수행하는데 큰 문제가 없기 때문에 어떤 것이든 상관이 없다고 생각한다.

먼저 oozie의 경우 xml로 작성해야하기 때문에 흐름과 맞지 않고 매끄러운 flow를 그리기 어렵다.

사실 분석 클러스터(spark cluster)로 연산을 하게 될 경우 코드(스크립트)를 짜서 분석 서버에 작업을 던지기만 하면되서 단순히 실행 shell script(bash operator)를 원하는 시간대에 설정만 할 수 있으면 된다. 만약 각 스크립트가 수행되는 worker에서 data를 share하고자 할 때에는 조금 불편할 수도 있는데 이 때에는 xcom같은 기능을 활용해서 local로 data를 떨구고 다른 worker에서 읽을 수 있도록 해야하며 이는 작업 후에 data를 지워주거나 잘 못 되었을 경우 다시 받아야할 때 생각보다 비효율적이고 껄끄러운 작업이 된다. 그래서 개인적으로는 share가 필요한 data는 hadoop에 쓰는 식으로해야 추후 스케쥴러 서버의 disk full 등의 장애를 피할 수 있다.(개인적인 의견)

물론 분석 클러스터에서 작업을 하지 않고 로컬 머신에서 돌아가는 경우도 있다. ML/DL job들은 대부분이 단일머신에서 돌아가고 있을테니까.. 


airflow의 경우 파이썬으로 작성하기 때문에 자유로운 커스터마이징이 가능하고 flow를 그리는 것이 쉽다. dag안에 subdag 등을 생성하는 것도 쉽다. 파이썬 기반이다보니 누구나 접근하기 쉽다. 장점은 파이썬 기반이기 때문에 쉽고 자유도가 높다는 점이다. 그렇다면 단점은 무엇일까? 단순히 스케쥴러 역할로만 사용한다면 큰 불편함은 못 느낄 것이다. 단지 DAG의 CICD를 신경쓴다면 크게 어려움은 없을 것이다.


그렇다면 azkaban은 어떨까? azkaban 역시 단순 스케쥴러 역할만 하게 되면 airflow와 똑같다. 오히려 airflow보다 ui에서는 더 간편해보인다. airflow와 마찬가지로 Scheduler 자체의 variable를 가질 수 있으며 최소화로 사용하도록 한다. 소급에 필요하거나 adhoc하게 돌릴용으로만 variable을 사용해야하는데 남발하고 핵심 재료(?)로 사용하게 될 경우 dependency가 많이 걸리게 된다. 극단적으로 아예 안써도 된다고 충분히 다른 곳을 이용해서 해결하는 것을 권장하고싶다.


아무튼 결론은 어떤 스케쥴러를 사용해도 비슷하고 스케쥴러 선정에 고려해야할 부분은 여러가지가 있겠지만 나열해본다면 다음과 같다.

1. UI가 사용하기 편하고 깔끔하다.
2. 재수행이 편하다.
3. flow(dag)를 그리는 것이 간편하다.
4. log보는 것이 편하다.
5. 다른 시스템과 호환이 잘 된다. (굳이 bash로 짤거면 필요는 없긴하다.)

이 정도만 고려하면 충분하지 않을까..

2021년 8월 21일 토요일

3) 빅데이터 플랫폼 아키텍처에 대하여.. 데이터를 처리를 위한 Spark

지난 포스팅에서 하둡에 대해서 알아보았다. 최초 하둡을 세팅하고부터는 사실상 전체 리부팅을 할 일이 거의 없고 데이터를 열심히 사용하고 관리를 하게 된다. 이렇게 열심히 모은 데이터를 이제 처리를 해야한다.



빅데이터가 주목받기 시작한 것은 여러가지 이유가 있겠지만 In-memory 방식이 가능하게 되면서 성능이 비약적으로 발전하고 더 많은 아웃풋을 낼 수 있었기 때문이라고 생각한다. 


지금은 Spark가 보편화되고 확고하게 자리 잡았지만 그 전에는 MapReduce 방식으로 데이터를 처리했다. Hive등 다른 프레임워크를 이용하지 않고 자바로 순수 맵리듀스 방식으로 배치를 짠다고 생각해보자. Mapper와 Reducer를 구현해야 한다는 것 자체가 배치의 규모에 따라 상당히 복잡해질 수 있다. 물론 disk I/O기반이라서 아무리 데이터가 커도 터질 염려를 거의 하지 않고 꾸역꾸역 돌아가는 배치를 만들 수는 있는데 너무 비효율적이다.


그리고 혜성처럼 Spark가 등장했고 분산처리를 인메모리 방식으로 처리할 수 있었다. 현재는 3.1.2 버전까지 릴리즈 되었다.


먼저 스파크는 단일머신에서 Standalone mode로 띄워서 사용하거나, 여러 머신을 묶어서 Cluster Mode로 사용가능하다. 분석 클러스터용 서버를 여러대를 구비해놓고 사용하게 된다. 그리고 당연히 데이터와 가까이 있는 것이 좋을 것이다. 


Python이나 Scala로 배치를 구현하게 되는데 하둡에서 데이터를 읽고, 쿼리를 사용하여 Group by 등의 연산을 하고 어딘가에 하둡에 데이터를 쓰는 방식이 주로 이용된다. 이러한 작업은 대부분 비슷하며 ./bin/spark-submit으로 작업을 넘기는데 이 때 어느정도 리소스를 사용할건지, driver와 excuter의 갯수, memory, core 등을 지정할 수 있다. 


초창기 버전1까지는 데이터를 다룰 때 RDD를 사용했고 2버전부터 Dataframe, Dataset을 지원하게 되는데 이는 분산 데이터 구조이며 보통 dataframe이나 dataset을 사용하게 되는데 rdd로 사용해야할 경우에도 dataframe이나 dataset으로 변환시켜서 사용하도록 하자. RDD는 쭉 나열된 데이터라면 dataframe부터는 구조화된 테이블형식이라서 SQL문으로도 처리가 가능하다는 것이 장점이니까..!! 그리고 Spark SQL의 성능이 비약적으로 발전했기 때문에 당연히 사용해야한다.


분산 데이터 구조이다보니 데이터가 흩어져있다. foreach문을 돌릴 때 쪼개져있는 데이터들이 excuter에서 병렬로 돈다는 것이다. 정말 편리한 기능이고 쓰면서 고려해야할 부분은 다른 당연히 데이터가 적절히 분배를 시켜야 좋은 효율을 낼 수 있다. 


스파크로 이런 배치성 작업말고도 streaming 처리도 가능하다.(어쨌든 mini batch이긴 하지만..) 실시간으로 들어오는 곳(kafka 등)에 빨때를 꽂고 주기를 설정하여 배치 작업을 진행하게 되는데 아무래도 이런 부분은 kafka에서 장애가 났을 때 데이터 중복처리 등 offset 관리를 어떻게 할지를 더 고려하게 될 것이다.


그리고 Spark에는 ML 라이브러리도 존재한다. 일반적으로 로직이 단일 머신에서 통으로 돌아가기 때문에 분산 처리를 위한 ML lib에는 많은 것을 지원하고 있지는 않다. 하지만 사용하기 편하고 쉽게 사용가능하도록 가이드도 제공되고 있다.


그리고 GraphX도 존재하는데 사용안해봤다.


Spark의 특성중 고려할 부분은 lazy evaluation이다. 배치를 쭉 짜더라도 순서대로 즉시 도는 것이 아니다. 상태를 계속해서 저장하고 transformation과 action으로 작업이 이루어진다. action이 이루어질 때 비로소 작업이 수행되며 그 전까지는 어떻게 수행되어야 최적으로 수행될지 옵티마이저가 계산을 하면서 최적의 루트를 찾는다. 이는 큰 장점이다.


하지만 transformation이 너무 길어지고 복잡해질 경우 스파크가 이해를 못하고 에러를 뱉는 경우도 존재하는데 이 때 의도적으로 action을 한번씩 취해주면 해결이 된다.(중간 결과를 하둡에 쓰고 다시 읽어서 시작한다던가 하는 방식으로...)


사실 스파크 자체는 사용하는 것은 어렵지 않다. 아파치 재단에서 나온 수 많은 프레임워크들과 호환도 잘 되고 잘만 조합해서 사용하면 좋은 생태계를 구성할 수 있다. 데이터를 쓰거나, 읽을 때 병렬로 수행된다는 것은 성능을 높일 수도 있지만 그 만큼 순간적인 부하가 일어나는 부분을 고려해야하기도 하지만 스루풋을 조절하는 방법도 제공한다. Free하면서 고성능을 낼 수 있기 때문에 하둡+스파크 조합이 확고하게 자리를 잡을 수 있었을 것이다.

2021년 8월 1일 일요일

2) 빅데이터 플랫폼 아키텍처에 대하여.. 하둡을 알아보자

전 포스팅(1편)에서 BDP를 큰 관점에서 훑어보았는데 개인적인 사정으로 2편이 조금 늦어졌다. 그래도 시작한 김에 꾸준히 연재해보고자 한다.


데이터가 부각되면서 저장소의 개념과 종류도 많아지고 여러가지를 적재적소에 조합하여 사용하는 시대가 왔다. 기존의 서비스 트랜잭션들은 RDB로 했어야했지만 이는 성능보다는 안정성에 우선이 된다. 방대한 양의 데이터를 pk가 존재하는 RDB에 운영이 되고 있는 데이터에 넣자니 CPU나 memory가 감당할 수 없고 결국에는 HDFS라는 파일시스템으로 저장하는 하둡이 확고히 정착되었다.


하둡은 HDFS로 저장하며 말 그대로 파일시스템으로 저장한다. 단일 머신이 아닌 n개의 머신을 묶어서 분산하여 데이터가 저장된다.

먼저 위키독스에 올라온 하둡 아키텍처를 보자.


분산 저장소라는 것은 서버(머신)이 n개가 묶여서 동작한다는 것이다.

이 하둡에 여러가지 에드온들이 달리긴 하지만 가장 필수적인 것부터 하나씩 알아보자.


하둡은 네임노드와 데이터노드로 구성되어있다.

먼저 데이터 노드는 실제로 데이터가 저장되는 서버다. 데이터 노드가 10대가 실행중이다는 말은 10개의 서버에서 각각 10개의 데이터 노드가 올라와있다는 것을 의미한다. "A라는 파일을 하둡에 저장해라!"라는 명령이 떨어지면 block size 설정에 의해 n개의 조각으로 쪼개지고 이 n개의 파일은 각각 replication 설정(만약 replica가 3이라면)에 의해 3개의 복제본(원본포함 3개)이 random으로 각 데이터 노드에 저장이 된다.

과거에 데이터 사이즈(파일 사이즈)가 작았을 때는 block size가 32~64MB 정도 수준이었으나 지금은 128~256MB가 default로 많이 사용된다. 더 잘게 쪼개는 것이 좋을지, 큰 덩어리로 쪼개는 것이 좋을지는 서비스의 상황에 따라 다르겠지만 아무래도 큰 덩어리로 쪼개도 충분하다면 네트워크 IO라던지, 어느 노드에 데이터가 저장되어있는지(meta information)의 정보가 가벼워질 수 있다.


결론은 데이터노드는 데이터가 저장되는 곳이다. 그럼 네임노드는 무얼 하는 친구들일까? 네임노드는 실제로 데이터가 저장되는 서버가 아니다. 어느 데이터노드에 어떤 데이터가 저장되어있는지, 데이터 노드의 health check라던지 이런 meta성 정보를 스스로 저장하고 관리한다. 


그럼 데이터노드가 죽을 경우는 어떻게 될까? 

만약 1개의 서버가 down되었을 경우 다른 서버에 해당 데이터의 복제본이 존재하기 때문에 문제가 되지 않는다. 물론 재수없게 해당 복제본이 한 노드에 몰빵이 되어있을 경우는 문제가 될 수 있지만 보통은 최소 수십대 이상을 묶어놓기 때문에 동일한 복제본이 한 서버에 들어갈 확률은 그리 높지않다.


만약 네임노드가 죽는다면 어떻게 될까?

사람으로 치면 뇌(메타정보를 들고있는)가 꺼지는 것이다. 당연히 위험하다.

하둡을 설치할 때 네임노드를 어떻게 할 것인지를 선택해야한다. 

1. NameNode+Secondary NameNode 형태로 설치

이 경우는 네임노드가 2대라고해서 한대가 꺼졌을 때 복구해주는 구조가 아니다. Secondary NameNode는 check point 역할을 할 뿐 Name node를 대체하지 못한다. DB에서 처럼 단지 edit log 등을 들고있을 뿐이다. 즉 장애상황에서는 위험하기 때문에 별도로 이러한 정보를 들고 있는 네임스페이스 이미지를 백업을 해둔다면 어떻게든 복구가 가능은 하겠지만 운영상황에서 크리티컬 장애를 만날 경우 복구를 못할 가능성을 항상 염두해야한다.


2. NameNode HA(High Availability) 구성

이 경우에는 세컨더리 네임노드 없이 NameNode를 HA구성하는 것이다. 당연히 한대는 Active, 한대는 StandBy 형태로 되어있다. 액티브가 죽으면 스탠바이가 살아날 것이고 (물론 그 시간 동안은 장애다. 어쩔 수 없는...) StandBy가 Active가 되면서 고장난 서버를 고쳐서 다시 띄우면 된다. 이를 위해 ZKFC(ZKFailoverController)라는 것도 각 네임노드에 띄워주긴 해야하지만 실행만 하면되는 간단한거라서 pass.

HA 구성 방법은 링크를 참고하자.(링크)


주키퍼(Zookeeper)

네임노드가 죽었으니 빨리 대체 네임노드를 띄워야 하거나, 혹은 왜 복제본은 3개인지(홀수) 이런 것은 주키퍼(zookeeper)라는 분산 코디네이터(프레임워크)가 도와준다. 보통 분산 환경에서는 주키퍼는 필수이다. 예를 들어서 1개의 파일을 3개로 쪼개서 저장했는데 내부적인 문제로 1개의 사본이 잘못된 정보를 들고 있다면 다수결을 통해서 "1개가 잘못되었으니 2개짜리가 진짜 정보다" 라는 것을 판단할 수 있고 주키퍼와 각 서버가 통신하며 "네가 보내주는 health 정보가 정상이 아니구나.. 잠깐 내려가서 쉬고 있으렴" 이런 역할을 수행할 수 있다.


주키퍼의 경우 크게 만질 것도 없고 보통 3대 서버에 주키퍼를 설치해놓는다면 주키퍼를 사용하는 프레임워크들에서 이 주키퍼 클러스터를 함께 써도 안전하게 사용이 가능해보인다. 가령 하둡과 카프카, 스파크 스트리밍 등을 한대의 주키퍼 클러스터를 써도 대용량의 처리를 하지 않는다면 괜찮아보인다.)



지금까지 네임노드, 데이터노드, 주키퍼를 언급했는데 여기까지가 끝이 아니다. 단순히 데이터를 저장하는 것이 끝이 아니기 때문이다. 컴퓨터(서버)는 각각이 cpu 코어와 memory, disk를 갖고있기 때문에 이 자원들을 사용해야한다. 이를 위해 알아야할 것이 노드매니저, 리소스매니저다.


네임노드는 그래도 뇌역할을 하니까 연산작업을 안시키더라도 데이터노드들의 리소스들은 가만히 두면 너무 아깝다. 심지어 데이터를 들고 있으니 연산을 하기 최적의 서버들이다. 이 리소스들을 사용하기 위해서 각 데이터노드들에서 노드매니저에서 설정하고 띄워줘야한다.

정리하면 데이터노드들에서는 데이터노드와 노드매니저가 반드시 떠있어야한다고 보면되고 노드매니저는 향후 리소스매니저(yarn, mesos 등)에서 리소스를 사용할 수 있도록 관리자 역할을 한다.


여기서 멈출까 하다가.. 더 추가해서 과거에는 하드웨어 성능이 좋지 않았고 초창기에는 disk 연산방식을 많이 썼다. 각 데이터노드의 disk에서 데이터를 불러와서 연산하고 다시 disk에 쓰고.. 이러한 MR(mapreduce) 방식으로 처리했지만 요즘은 RAM이 너무 좋아져서 Spark(In-memory 방식)가 확고하게 자리를 잡았다. 아무리 ssd라 할지라도 memory보다 빠를 순 없고  Network I/O를 줄이는 것은 한계가 있기 때문에 Disk I/O 시간을 줄이는 것은 확실하게 성능 보장을 할 수 있으니까..


또한 데이터를 하둡에 저장할 때 text로 저장할건지, parquet, orc 등으로 저장가능하다. 이러한 부분이 참 편리하다.


예를 딱 한가지만 더 들고 이번 포스팅은 마무리..

예를 들어서 어떤 데이터에 대해 Group by 명령을 수행하면 대충 이러한 일련의 과정을 거친다. 각 데이터 노드에 존재하는 데이터들을 Disk에서 Memory로 올리고 Key 연산을 위해 Network I/O도 일어나고 그 결과를 다시 HDFS로 저장한다. 이러한 과정이 매우 크고 길 경우 Disk I/O는 꽤나 큰 비효율을 발생시키기 때문에 In-Memory 방식으로 처리하면 좋을 것이다. 또한 A노드에서 B노드에 존재하는 데이터가 필요할 경우 Network I/O가 발생할 것인데 만약 모든 노드에 특정 데이터가 필요할 경우도 있을 것이고 반복해서 이러한 suffle이 발생하는 것도 큰 비효율성을 낳는다. 따라서 최적화가 필요하다. 이러한 작업을 MR로 할건지, Spark로 처리할 건지는 다음 포스팅에서 설명해보도록 하자. Spark 위주가 될 것이며 쉽게 인메모리 연산엔진이라고 보면 된다. (요즘 MR은 거의 안쓴다. 자바로 짜는 것도 복잡하고.. 길어진다.) 


이번 시간에는 하둡에 대해서 간단하게 알아봤는데 권한이나 복제방식 등을 다 서술하기에는 너무 많아진다. 단지 리눅스 경로처럼 파일에 접근이 가능하고 사용할 수 있기 때문에 사용법은 아주 쉽고 권한의 경우는 Hue, Hive에서 막는 것이 가장 간단하고 쉬워보인다. 내가 설계한다면 하둡을 리소스 매니저 UI page말고는 열어주지 않는 것이 향후 관리에 편해지는 것 같다. (다른이에게 데이터를 오픈할 경우에는 Hue와 Hive를 통해서만 접근이 가능하도록...)


쓰다보니 너무 의식의 흐름대로 쓰고 말았다.. 다음에는 더 집중하고 정리해서 쓰는걸로..


2021년 6월 27일 일요일

1) 빅데이터 플랫폼 아키텍처에 대하여.. 데이터 스토리지 관점에서의 흐름

최근 몇 년동안 AI, ML, DL이 뜨면서 BDP라는 용어의 사용이 뜸해졌다. 그렇다고하여 빅데이터라는 영역이 많이 시들해졌다고 생각하는 것은 큰 오판이다. 오히려 더 발전하고 견고해지면서 당연 시 여기는 현재라고 보는게 맞다. 당연히 데이터 작업을 하기 위해서는 그 것들을 커버할 시스템이 필요하기 때문이다. 

너무나도 당연시 여겨지지만 일부 혹은 단순히 운영해본 사람은 많겠지만 설계부터 설치, 운영까지 모두 해보기는 쉽지 않다. 많지는 않지만 스터디나 세미나를 하면서 BDP나 DE에 대해 설명하고 인터뷰를 하면서 깊게 설명하긴 어려웠으나 적어도 DE를 처음 접하고 이 쪽으로 커리어를 쌓으려는 사람들에게 최소한의 것들은 공유하며 도움이 되기를 바라는 마음에 포스팅을 시작한다.

이론적인 부분이 깊게 필요할 경우 시간과 의지가 된다면 문서화하여 Slideshare에 공유할 생각이며 100% 개인적인 의견으로 작성하는 내용이니 최근 동향보다는 이미 견고하게 자리잡은 내용을 중심으로 서술하고자 한다.


- 데이터 스토리지 관점에서의 흐름

데이터가 부각되기 전 시대의 가장 간단한 서비스의 구조를 생각해보자.
클라이언트 - 서버 - DB 구조이다. 여기서 데이터는 모두 DB에 저장되어있고 DB의 경우 물리적 한계가 명확하기 때문에 서비스용 데이터와 일부 로그성 데이터만이 저장할 수 있을 것이다. 서비스용 DB는 안정성이 증명되고 운영 노하우가 많이 축적된 SMP 구조 RDB를 사용하기 때문이다.

그럼 SMP말고 다른 아키텍처인 MPP는 무엇일까?

SMP, MPP 구조. 출처 - softline

만약 운영 DB를 가지고 Group by 등의 분석을 한다고 생각해보자. 이러한 환경을 OLTP라고 하며(실시간으로 트랜잭션이 발생) 이는 분명히 서비스에 지장을 줄 것이고 결국에는 OLAP(배치성 트랜잭션 발생)를 위한 별도 분석용 DB를 한벌 더 마련하고 그 곳에 데이터를 부어 분석용 쿼리를 수행할 수 있을 것이다. 하지만 SMP 구조는 구조상 scale up을 해야하고 이는 한계점에 다다를 수록 비싸진다.


이에 따라 분석용 DB로 사용할 수 있는 MPP구조의 DB가 수면위로 올라오게 되었다. MPP 구조의 DB 특성상 리소스가 부족할 경우 scale out을 할 수 있기 때문에 node를 추가하는 방식으로 확장하기 편하다. 
가령 1억건의 데이터를 SMP DB를 사용하여 count 연산한다면 아무리 슈퍼맨(노드)이라 할지라도 1억번을 세는데 시간이 오래걸린다. 하지만 100명의 운동선수(노드)가 각자 100만건씩 각자 세서 마지막에 갯수를 합산한다면? -> 당연히 훨씬 빠를 뿐만아니라 운동선수 1명 섭외비가 훨씬 쌀 것이다. 또한 슈퍼맨의 경우 더 센 슈퍼맨을 구하려면 섭외하기 힘들지만 운동선수의 경우 여러명을 더 섭외해서 하던 작업을 똑같이 하면 된다는 장점이 있다.

이런 시나리오가 가능하게 하기 위해서는 데이터가 각각의 운동선수(노드)에 똑같이 분산되야하고 연산에 따라 다르지만 pk가 없다면 효율을 극대화시킬 수 있다. 하지만 데이터를 유입시킬 때 앞 단에서 잘 쌓아주기만 하면 pk가 없다는 것은 위기보다는 기회일 수 있다.

데이터를 잘 분산시킨다는 것은 일부 노드에 데이터가 몰빵되어 일하지 않는 노드가 발생하면 안된다는 것과 같다. 따라서 적절한 분산키를 활용하여 데이터가 잘 분산되게 해야한다.


그렇다면 이런 질문이 존재할 수도 있다. 만약 작업중에 A운동선수에게 있는 자료가 B운동선수에게도 필요하면 전달하는데 더 큰 리소스가 발생할 수 있지 않을까? 슈퍼맨의 경우 모든 자료를 혼자 다 들고 있기 때문에 전달하는 리소스가 필요 없을텐데... -> 맞는 말이다. 이러한 현상을 shuffle이라고 하며 분산 프레임워크를 사용할 경우 shuffle을 최소화하는 것도 중요한 개념이다. 애초에 필요한 데이터는 작업 지시자가 broadcast를 해주던지 혹은 모든 운동선수에게 복제본을 전달해주던지 하는 최적화가 필요하다.


여기까지 SMP와 MPP에 대해 큰 그림에서 설명한 것이고 데이터의 저장을 어떻게 해야 읽을 때 잘 읽을 수 있을지.. 샤딩 혹은 RDB와는 조금 다른 DB인 nosql이라는 개념도 있다. 이 부분은 추후에 넘어가면서 기술하도록 하자. 이후 각광받은 것이 하둡이다. HDFS로 저장하며 말 그대로 파일시스템으로 저장하는 방식이다. 하둡은 결과적으로 mpp구조와 비슷하기는 하지만 저장소 자체를 의미하며 데이터를 처리하기 위한 엔진은 별도 선택을 해야한다. 


데이터를 다룬다는 것은 결과적으로 저장 및 처리 자체가 핵심이며 당연히 DB를 잘 이해하면 좋다. 본론에 들어가기 전에 빅데이터 아키텍처를 먼저 훑고 하둡에 대해서 설명하는 것이 좋겠다. 이번 포스팅은 여기서 마친다!


2021년 1월 25일 월요일

Spark Read Parquet - Timestamp and Timezone Confusion

Spark 배치에서 Parquet File을 데이터를 읽을 때 timestamp가 +0900시간이 되는 문제가 발생했다. timezone 문제인데 처음에는 win server to linux 마이그레이션 문제인 줄 알았는데 그게 아니었다. (txt로 저장하면 이런 문제가 없다.)

Spark 버전은 2.4.7 이다.

서버에서 timedatectl을 쳐보면 다음과 같다.


Time Zone이 Asia/Seoul (KST, +0900)으로 되어있다.
따라서 Spark에서 Parquet을 읽을 때 KST로 변환해서 읽는 것 같다. 

그럼 간단하게 생각하면.. UTC로 읽으면 해결될 문제이고 분산환경이기 driver, excuter 모두 해결되어야 할 문제 같은데 이 부분이 조금 찝찝하다.


어쨌든 방법 중에 한가지는 Spark Submit을 할 때 아래 옵션을 준다.

--conf spark.sql.session.timeZone='UTC' \
--conf spark.driver.extraJavaOptions='-Duser.timezone=UTC' \
--conf spark.executor.extraJavaOptions='-Duser.timezone=UTC' \

그럼 해결된다.

또 다른 방법은 코드 상에서 spark session을 생성할 때 옵션을 주고 해결이 가능하다.
.config("spark.sql.session.timeZone", "UTC") 
.config("spark.driver.extraJavaOptions", "-Duser.timezone=UTC") 
.config("spark.executor.extraJavaOptions", "-Duser.timezone=UTC")

아래는 공식문서에서 발췌한 내용이다.
Spark internally stores timestamps as UTC values, and timestamp data that is brought in without a specified time zone is converted as local time to UTC with microsecond resolution. When timestamp data is exported or displayed in Spark, the session time zone is used to localize the timestamp values. The session time zone is set with the configuration 'spark.sql.session.timeZone' and will default to the JVM system local time zone if not set.


분산환경이기도 하고 deploy mode에 따라 옵션들이 어떻게 동작하는지는 테스트해볼 필요가 있겠다.

--20210321 추가
송희진님께서 모든 스파크 배치에 일괄적용을 위해 spark_submit에 아래처럼 추가
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit 
--conf spark.sql.session.timeZone='UTC' \
--conf spark.driver.extraJavaOptions='-Duser.timezone=UTC' \
--conf spark.executor.extraJavaOptions='-Duser.timezone=UTC' \
"$@"

2021년 1월 14일 목요일

HA hadoop+Spark 조합으로 namespace 사용할 때 발생할 수 있는 에러(UnknownHostException)

현재 HA hadoop + Spark + mesos + azkaban 조합으로 배치를 구성하고 있는데 서버에 직접 들어가서 shell script를 수행하면 정상적으로 동작을 한다.

scala source에서는 hadoop url을 namespace로 접근한다.

그러나 아즈카반에서 수행을 하면 에러를 뱉는다.

Exception in thread "main" java.lang.IllegalArgumentException: java.net.UnknownHostException: hadoop-cluster-bdnode
....
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.net.UnknownHostException: hadoop-cluster-bdnode
... 40 more
....
java.lang.RuntimeException: azkaban.jobExecutor.utils.process.ProcessFailureException: Process exited with code 1
...
Caused by: azkaban.jobExecutor.utils.process.ProcessFailureException: Process exited with code 1
at azkaban.jobExecutor.utils.process.AzkabanProcess.run(AzkabanProcess.java:125)
at azkaban.jobExecutor.ProcessJob.run(ProcessJob.java:304)


shell script에서는 동작할 때에는 hadoop conf(hdfs-site.xml, core-site.xml) 정보를 알고 있다. bashrc에 hadoop_conf_dir을 해놨기 때문이다. 하지만 스케쥴러 azkaban에서 수행하려고 하면 해당 구성을 다시 shell script에 export 해주거나 spark-submit file안에서 알려줘야 한다. 배치를 수행하는 command shell을 모두 바꿀 수가 없어서 그냥 후자로 적용했다.
export HADOOP_HOME=/data01/sw/hadoop
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop


이제 잘 동작한다.




2022년 회고

 올해는 블로그 포스팅을 열심히 못했다. 개인적으로 지금까지 경험했던 내용들을 리마인드하자는 마인드로 한해를 보낸 것 같다.  대부분의 시간을 MLOps pipeline 구축하고 대부분을 최적화 하는데 시간을 많이 할애했다. 결국에는 MLops도 데이...