2020년 8월 18일 화요일

kafka + logstash saving to hdfs (webhdfs)

상품추천 분석을 위한 로그들을 kafka broker에 꾸준히 쏴주고 있었다. 이제 이를 사용할 때가 되었다.

이 로그들을 하둡에 쌓을 필요가 생겼고 컨슈머를 구현할지, logstash의 webhdfs를 사용할지 고민하다가 logstash의 webhdfs output이 간단해서 이걸 사용하기로 하였다.


필요한 플러그인들이 설치되어있다고 가정하고 conf파일을 작성한다.

크게 input, filter, output 부분을 작성하면 된다. 필요한 부분을 검색해서 쓰긴했는데 나중에 시간날 때 제대로 사용법을 익혀두는게 좋을 것 같다. (꽤 편리한 것 같다.)


input {
     kafka {
            bootstrap_servers => "a:9092,b:9092,c:9092"
            topics => ["topic1","topic2"]
            decorate_events => true
            consumer_threads => 3
            group_id => "dev_recommend"
            }
}


모든 토픽의 파티션은 3개로 되어있기때문에 consumer_threads를 3으로 했다.

데이터를 가공하는 filter부분에서 토픽접근은 [@metadata][kafka][topic]로 하면되고 or조건은 or로 쓰면 되었다. c1은 unix time으로 된 컬럼이고 컬럼끼리의 구분자는 \t으로 되어있다면 아래처럼 작성해주면 되었다.


filter {
    if [@metadata][kafka][topic] == "topic1" or [@metadata][kafka][topic] == "topic2" {                                                                            
        dissect {
            mapping => {"message" => "%{col1} %{col2}   %{col3}   %{col4}   %{col5}   %{col6}   %{col7}   %{col8}   %{col9}   %{col10}  %{col11}"}
        }
        ruby {
                code => "
                      require 'time'
                      require 'date'

                      ts_new = event.get('[ts]').to_i/1000

                      event.set('day_ymd', Time.at(ts_new).localtime.strftime('%y-%m-%d'))
                      event.set('day_h', Time.at(ts_new).localtime.strftime('%H'))
                      event.set('day_ymdh', Time.at(ts_new).localtime.strftime('%y-%m-%d-%H'))
                  "
            }
    }

}

마지막으로 ouput 부분은 다음처럼 작성하면 된다.

output {

    if [@metadata][kafka][topic] == "topic1" {
        webhdfs {
            host => "hadoop url"
            port => 50070
            path => "/data/tsv/recommend-logs/%{day_ymd}/%{day_h}/topic1-log-%{day_ymdh}.log"
            user => "usr"
            codec => line { format => "%{message}"}
            }
        #stdout { codec => rubydebug }
    }
    else if [@metadata][kafka][topic] == "topic2" {
        webhdfs {
            host => "hadoop url"
            port => 50070
            path => "/data/tsv/recommend-logs/%{day_ymd}/%{day_h}/topic2-log-%{day_ymdh}.log"
            user => "usr"
            codec => line { format => "%{message}"}

            }
        #stdout { codec => rubydebug }
    }
}

이제 실행해서 화면에 찍어보면 다음과 같은 형식으로 로그가 찍힐것이다.


output file은 계속해서 append를 해야하기때문에 text 포맷이다. 이후 실행하면 yy-mm-dd/시간/파일 경로에 데이터가 잘 쌓이는 것을 확인할 수 있다. 
나중에 제대로 사용법(문법)을 익혀둬야겠다.

일단 작업을 돌려놓고 며칠간 사이즈 모니터링을 해봐야겠다.
nohup bin/logstash --path.data recom_data -f recom.conf > /dev/null 2>&1 &


댓글 없음:

댓글 쓰기

2022년 회고

 올해는 블로그 포스팅을 열심히 못했다. 개인적으로 지금까지 경험했던 내용들을 리마인드하자는 마인드로 한해를 보낸 것 같다.  대부분의 시간을 MLOps pipeline 구축하고 대부분을 최적화 하는데 시간을 많이 할애했다. 결국에는 MLops도 데이...