2020년 8월 31일 월요일

MAB(Multi Armed Bandit) 대안으로서의 Dynamic Ensemble

MAB(Multi Armed Bandit)

A/B test를 보다 더 유연하게 하면서 최적의 return을 찾아가는 방법이다.

A/B test의 경우 A/B 셋을 준비하고 타겟에 따라 A 혹은 B를 리턴을 주면 되기 때문에 구현하기 쉽기는 하지만 분명히 only A,B라는 한계점이 존재한다.

MAB는 이를 조금 더 확장한 개념으로 n개의 셋을 준비해서 보상에 따라 적절한 셋을 return 하는 것이 concept이다.


가령 상품추천을 할 때에 A모델, B모델, C모델, D모델, 혹은 앙상블된 모델을 미리 준비하고 CTR 등의 보상에 따라 적절한 모델을 return 하도록 구현할 수 있다.

모델의 갯수가 충분하고, 모델을 집계하는데 걸리는 시간이 충분하다면 MAB는 좋은 선택지가 될 것이다.

하지만 불가능한 상황이 분명히 존재한다!!


DE(Dynamic Ensemble)

단순히 A,B 두개의 상품추천 모델만이 존재할 수 있다. 이 때는 A와 B를 적절한 비율로 섞어서 사용해야 하는 경우를 가정해볼 수 있다.

과연 A+B 앙상블된 모델 1개만이 존재할 때 MAB를 어떻게 할 수 있을까?


분명히 MAB는 아니다. 하지만 A와 B를 섞는 비율 n:m을 보상을 통해 동적으로 조절한다면 충분히 비슷한 효과를 낼 수 있을 것이다.

물론 말단에서 어떤 모델을 사용할 것인지 고르는 것이 MAB라면 이는 아예 모델을 만들 때 보상을 사용하고 1개의 모델만 생성되기 때문에 MAB라고 부르기엔 무리가 있다.

하지만 적절한 결과를 return 해야하고, 보상에 사용할 수 있는 방법을 똑같이 사용할 수 있기에 비슷하기도 하다!

그래도 용어를 다르게 한다면 아마 적절한 용어로는 Dynamic Ensemble이 될 것이다.


MAB를 하려다가 상황이 안맞을 것 같아서 DE방식으로 방향을 틀어야할 수도 있는데 개인적인 생각이지만 적절한 대안같다.

2020년 8월 18일 화요일

kafka + logstash saving to hdfs (webhdfs)

상품추천 분석을 위한 로그들을 kafka broker에 꾸준히 쏴주고 있었다. 이제 이를 사용할 때가 되었다.

이 로그들을 하둡에 쌓을 필요가 생겼고 컨슈머를 구현할지, logstash의 webhdfs를 사용할지 고민하다가 logstash의 webhdfs output이 간단해서 이걸 사용하기로 하였다.


필요한 플러그인들이 설치되어있다고 가정하고 conf파일을 작성한다.

크게 input, filter, output 부분을 작성하면 된다. 필요한 부분을 검색해서 쓰긴했는데 나중에 시간날 때 제대로 사용법을 익혀두는게 좋을 것 같다. (꽤 편리한 것 같다.)


input {
     kafka {
            bootstrap_servers => "a:9092,b:9092,c:9092"
            topics => ["topic1","topic2"]
            decorate_events => true
            consumer_threads => 3
            group_id => "dev_recommend"
            }
}


모든 토픽의 파티션은 3개로 되어있기때문에 consumer_threads를 3으로 했다.

데이터를 가공하는 filter부분에서 토픽접근은 [@metadata][kafka][topic]로 하면되고 or조건은 or로 쓰면 되었다. c1은 unix time으로 된 컬럼이고 컬럼끼리의 구분자는 \t으로 되어있다면 아래처럼 작성해주면 되었다.


filter {
    if [@metadata][kafka][topic] == "topic1" or [@metadata][kafka][topic] == "topic2" {                                                                            
        dissect {
            mapping => {"message" => "%{col1} %{col2}   %{col3}   %{col4}   %{col5}   %{col6}   %{col7}   %{col8}   %{col9}   %{col10}  %{col11}"}
        }
        ruby {
                code => "
                      require 'time'
                      require 'date'

                      ts_new = event.get('[ts]').to_i/1000

                      event.set('day_ymd', Time.at(ts_new).localtime.strftime('%y-%m-%d'))
                      event.set('day_h', Time.at(ts_new).localtime.strftime('%H'))
                      event.set('day_ymdh', Time.at(ts_new).localtime.strftime('%y-%m-%d-%H'))
                  "
            }
    }

}

마지막으로 ouput 부분은 다음처럼 작성하면 된다.

output {

    if [@metadata][kafka][topic] == "topic1" {
        webhdfs {
            host => "hadoop url"
            port => 50070
            path => "/data/tsv/recommend-logs/%{day_ymd}/%{day_h}/topic1-log-%{day_ymdh}.log"
            user => "usr"
            codec => line { format => "%{message}"}
            }
        #stdout { codec => rubydebug }
    }
    else if [@metadata][kafka][topic] == "topic2" {
        webhdfs {
            host => "hadoop url"
            port => 50070
            path => "/data/tsv/recommend-logs/%{day_ymd}/%{day_h}/topic2-log-%{day_ymdh}.log"
            user => "usr"
            codec => line { format => "%{message}"}

            }
        #stdout { codec => rubydebug }
    }
}

이제 실행해서 화면에 찍어보면 다음과 같은 형식으로 로그가 찍힐것이다.


output file은 계속해서 append를 해야하기때문에 text 포맷이다. 이후 실행하면 yy-mm-dd/시간/파일 경로에 데이터가 잘 쌓이는 것을 확인할 수 있다. 
나중에 제대로 사용법(문법)을 익혀둬야겠다.

일단 작업을 돌려놓고 며칠간 사이즈 모니터링을 해봐야겠다.
nohup bin/logstash --path.data recom_data -f recom.conf > /dev/null 2>&1 &


2020년 8월 7일 금요일

Kafka + Spark Streaming saving to Cassandra

추천 시스템 서빙에서 메인 DB는 Cassandra를 사용하고 있는데 대부분이 data를 bulk insert 후 select만 해서 return 하는 구조이다. 하지만 최근 실시간 (방문)로그를 활용해야할 필요가 생겼고 이를 Cassandra에 적재하기로 하였다. 

만약 cassandra에 쌓는다면 real-time 환경에서 transaction에 문제가 없는지, 하루 쌓이는 양은 얼마 정도인지, compaction 전략은 어떤걸 써야할지, 피크 상황에서 서버가 버틸지 등의 고민을 하게 되었다.

이러한 시도가 성공할지, 실패할지는 먼저 구현을 해봐야 알 수 있기 때문에 일단 먼저 구현부터 하기로 했다. 만약 데이터양이 많은 것이 문제가 된다면 사이즈를 줄이는 방향으로 타협을 보는 것으로 문제를 해결해도 되는 상황이다.


구현 방법은 두 가지로 생각했다.
1. kafka -> logstash -> cassandra
2. kafka -> spark streaming -> cassandra

일단 1번은 logstash plugin이 존재하기는 하는데 4~5년전 자료라서 pass.
2번의 경우 어느 정도의 성능이 나올지 몰라서 궁금하기도 했고 2번으로 결정했다.


정리하면 real time으로 Kafka broker -> spark streaming(consum) -> cassandra 로 데이터를 흘러가는 것이 목표이다.

기존에 운영중인 kafka 데이터를 사용하기로 했다. kafka의 데이터는 map형식으로 들어오고 있다. 

{"a_key":"a_value","b_key":"b_value","c_key":"c_value","d_key":"d_value"}를 담을 class를 하나 만들었다.
case class TEST ( COL1:String, COL2:String, COL3:String, COL4:String )

이후 카프카와 연동을 해줬다.

val sparkConf = new SparkConf()
.setAppName("kapark2cassandra")
.set("spark.cassandra.auth.username", "cassandra")
.set("spark.cassandra.auth.password", "cassandra")
.set("spark.cassandra.connection.host", "cassandra cluster")

val ssc = new StreamingContext(sparkConf, Seconds(3))

val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "zk cluster",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "kapark2cassandra",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("kapark2cassandra")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)

stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

다음은 데이터 가공 및 카산드라에 전송이다.
ttl은 1주일(604800)으로 셋팅했다. 

stream.foreachRDD { rdd =>
val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
import spark.implicits._
val ods = rdd
.map(record => record.value.toString.drop(1).dropRight(1).split(",(?=([^\"]*\"[^\"]*\")*[^\"]*$)", -1)
.map(_.split(":(?=([^\\\"]*\\\"[^\\\"]*\\\")*[^\\\"]*$)")).map(arr => (arr(0) -> arr(1))).toMap)
.map(rec =>
TEST(
rec.get("\"COL1\"").mkString.replace("\"", "").trim
, rec.get("\"COL2\"").mkString.replace("\"", "").trim
, rec.get("\"COL3\"").mkString.replace("\"", "").trim
, rec.get("\"COL4\"").mkString.replace("\"", "").trim
)
).toDF()

ods.createOrReplaceTempView("kapark2cassandra")

var result = spark.sql(
"""
select from_unixtime(col1/1000,"yyyy-MM-dd HH:mm:ss") as col1
, col2
, col3
, col4
from kapark2cassandra
where col2 = some
""")
result.write.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "test" , "keyspace" -> "real_time", "ttl" -> "604800"))
.mode(org.apache.spark.sql.SaveMode.Append).save()
}

ssc.start()
ssc.awaitTermination()
저기서 다른 데이터 소스와 join, group by해서 사용할 수 있으면 그렇게 하려고 하는데 스트리밍이 밀리지 않는 선에서 처리가 가능한 양이어야 할 것이다.
object SparkSessionSingleton {
@transient private var instance: SparkSession = _
def getInstance(sparkConf: SparkConf): SparkSession = {
if (instance == null) {
instance = SparkSession
.builder
.config(sparkConf)
.getOrCreate()
}
instance
}
}
마지막으로 val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf) 부분을 만들기 위한 ojbect 생성해주고 
def main(args: Array[String]) {
new kapark2cassandra().run(args)
}
수행했더니 데이터가 카산드라에 잘 들어간다.

통계를 보도록 하자.
대충 1큐에 5천건~8천건 정도이다.
3초로 했을 때 조금씩 튀는 부분이 있는데 아무래도 3초는 무리인것같다.

cassandra 지표를 봐도 딱히 튀는 부분도 없고 결국 final result는 약 100 rows씩 들어가기 때문에 부하 테스트를 하기에는 작은 양이었다.

결론
real time data를 cassandra에 넣으려면 kafka + spark streaming으로 처리하는 것도 하나의 옵션이다.





2022년 회고

 올해는 블로그 포스팅을 열심히 못했다. 개인적으로 지금까지 경험했던 내용들을 리마인드하자는 마인드로 한해를 보낸 것 같다.  대부분의 시간을 MLOps pipeline 구축하고 대부분을 최적화 하는데 시간을 많이 할애했다. 결국에는 MLops도 데이...