2020년 8월 7일 금요일

Kafka + Spark Streaming saving to Cassandra

추천 시스템 서빙에서 메인 DB는 Cassandra를 사용하고 있는데 대부분이 data를 bulk insert 후 select만 해서 return 하는 구조이다. 하지만 최근 실시간 (방문)로그를 활용해야할 필요가 생겼고 이를 Cassandra에 적재하기로 하였다. 

만약 cassandra에 쌓는다면 real-time 환경에서 transaction에 문제가 없는지, 하루 쌓이는 양은 얼마 정도인지, compaction 전략은 어떤걸 써야할지, 피크 상황에서 서버가 버틸지 등의 고민을 하게 되었다.

이러한 시도가 성공할지, 실패할지는 먼저 구현을 해봐야 알 수 있기 때문에 일단 먼저 구현부터 하기로 했다. 만약 데이터양이 많은 것이 문제가 된다면 사이즈를 줄이는 방향으로 타협을 보는 것으로 문제를 해결해도 되는 상황이다.


구현 방법은 두 가지로 생각했다.
1. kafka -> logstash -> cassandra
2. kafka -> spark streaming -> cassandra

일단 1번은 logstash plugin이 존재하기는 하는데 4~5년전 자료라서 pass.
2번의 경우 어느 정도의 성능이 나올지 몰라서 궁금하기도 했고 2번으로 결정했다.


정리하면 real time으로 Kafka broker -> spark streaming(consum) -> cassandra 로 데이터를 흘러가는 것이 목표이다.

기존에 운영중인 kafka 데이터를 사용하기로 했다. kafka의 데이터는 map형식으로 들어오고 있다. 

{"a_key":"a_value","b_key":"b_value","c_key":"c_value","d_key":"d_value"}를 담을 class를 하나 만들었다.
case class TEST ( COL1:String, COL2:String, COL3:String, COL4:String )

이후 카프카와 연동을 해줬다.

val sparkConf = new SparkConf()
.setAppName("kapark2cassandra")
.set("spark.cassandra.auth.username", "cassandra")
.set("spark.cassandra.auth.password", "cassandra")
.set("spark.cassandra.connection.host", "cassandra cluster")

val ssc = new StreamingContext(sparkConf, Seconds(3))

val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "zk cluster",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "kapark2cassandra",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("kapark2cassandra")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)

stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

다음은 데이터 가공 및 카산드라에 전송이다.
ttl은 1주일(604800)으로 셋팅했다. 

stream.foreachRDD { rdd =>
val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
import spark.implicits._
val ods = rdd
.map(record => record.value.toString.drop(1).dropRight(1).split(",(?=([^\"]*\"[^\"]*\")*[^\"]*$)", -1)
.map(_.split(":(?=([^\\\"]*\\\"[^\\\"]*\\\")*[^\\\"]*$)")).map(arr => (arr(0) -> arr(1))).toMap)
.map(rec =>
TEST(
rec.get("\"COL1\"").mkString.replace("\"", "").trim
, rec.get("\"COL2\"").mkString.replace("\"", "").trim
, rec.get("\"COL3\"").mkString.replace("\"", "").trim
, rec.get("\"COL4\"").mkString.replace("\"", "").trim
)
).toDF()

ods.createOrReplaceTempView("kapark2cassandra")

var result = spark.sql(
"""
select from_unixtime(col1/1000,"yyyy-MM-dd HH:mm:ss") as col1
, col2
, col3
, col4
from kapark2cassandra
where col2 = some
""")
result.write.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "test" , "keyspace" -> "real_time", "ttl" -> "604800"))
.mode(org.apache.spark.sql.SaveMode.Append).save()
}

ssc.start()
ssc.awaitTermination()
저기서 다른 데이터 소스와 join, group by해서 사용할 수 있으면 그렇게 하려고 하는데 스트리밍이 밀리지 않는 선에서 처리가 가능한 양이어야 할 것이다.
object SparkSessionSingleton {
@transient private var instance: SparkSession = _
def getInstance(sparkConf: SparkConf): SparkSession = {
if (instance == null) {
instance = SparkSession
.builder
.config(sparkConf)
.getOrCreate()
}
instance
}
}
마지막으로 val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf) 부분을 만들기 위한 ojbect 생성해주고 
def main(args: Array[String]) {
new kapark2cassandra().run(args)
}
수행했더니 데이터가 카산드라에 잘 들어간다.

통계를 보도록 하자.
대충 1큐에 5천건~8천건 정도이다.
3초로 했을 때 조금씩 튀는 부분이 있는데 아무래도 3초는 무리인것같다.

cassandra 지표를 봐도 딱히 튀는 부분도 없고 결국 final result는 약 100 rows씩 들어가기 때문에 부하 테스트를 하기에는 작은 양이었다.

결론
real time data를 cassandra에 넣으려면 kafka + spark streaming으로 처리하는 것도 하나의 옵션이다.





댓글 1개:

  1. 그룹 오프셋 초기화
    ./kafka-consumer-groups.sh --bootstrap-server zk cluster --group kapark2cassandra --describe
    ./kafka-consumer-groups.sh --bootstrap-server zk cluster --group=kapark2cassandra --topic kapark2cassandra --reset-offsets --to-earliest --execute
    ./kafka-consumer-groups.sh --bootstrap-server zk cluster --group kapark2cassandra --describe

    답글삭제