데이터엔지니어로서 데이터를 다루는 것도 중요하지만 데이터를 수집하고 저장하고 처리하는 구조를 만드는 일이 더 중요하고 어려운 일이라고 생각하기 때문에 이 부분을 공략하는데 주력했던 것 같다. 또한 한번 만들어놓은 구조는 쉽게 바꾸지않기 때문에 다뤄볼 기회가 적은 것도 사실이다.
2020년 12월 31일 목요일
2020년 회고
데이터엔지니어로서 데이터를 다루는 것도 중요하지만 데이터를 수집하고 저장하고 처리하는 구조를 만드는 일이 더 중요하고 어려운 일이라고 생각하기 때문에 이 부분을 공략하는데 주력했던 것 같다. 또한 한번 만들어놓은 구조는 쉽게 바꾸지않기 때문에 다뤄볼 기회가 적은 것도 사실이다.
2020년 12월 30일 수요일
hadoop 3.x ec policy(erasure coding) vs replication 3
하둡3에서 ec policy라는 기능이 생겼다. 말로만 들었던 기능인데 이참에 테스트할 겸 적용해보기로 한다.
하둡2에서 하둡3으로 데이터 마이그레이션을 할 때 공간이슈 때문에 3복제 대신에 ec policy 적용을 고려해보았다.
적용 자체는 path별로 구분해서 적용할 수 있게 되어있었고 적용하면 replication이 아닌 ec policy에 의해 1 replication + @로 저장&관리되는 형태이다.
각종 정보(ex 패리티비트)로 인해 블록수를 더 많이 자치할 수 있어 namenode 입장에서 약간의 overhead가 있다고는 하지만 case by case인것 같고 내 기준에서는 크게 무리될 정도는 아니었기에 바로 적용을 해보았다.
3 replica가 1 replica로 즉시 바뀌지는 않고 신규로 write되는 것부터 바뀌는 것 같다.
즉 해당 옵션은 hdfs path별로 on/off 시켜서 동작할 수 있다.
기존 3 replica의 경우 3배의 용량을 차지하는데 ec policy를 사용하면 1.3배 정도로도 데이터를 저장할 수 있다고 한다.
아무튼 적용하고 장애 테스트를 해보기 위해 해당 block을 저장하고 있는 datanode들을 down시켰더니 그 즉시 해당 data를 읽을 수 없는 문제가 존재했다.
3 replica의 경우 데이터노드가 몇 대 내려가더라도 1대만 살아있으면 즉시 제대로 동작하는데 ec policy는 그러지 못하는 것 같다.
(또한 -replication 옵션과 -policy옵션을 동시에 사용이 불가능하다.)
현재까지 테스트한 결론으로는 일회성으로 read하는 단순 저장용이나 또는 복구가능한 집계성 데이터는 ec policy를 적용하고 원천 데이터처럼 자주 read하는 데이터들은 그냥 3 replication이 좋을 것 같다.
어쨌든 배치를 돌려야하는데 즉시 read하지 못하기 때문에 적용이 불가능해보인다.
시간관계상 제대로 알아보지 못했지만 분명한 것은 일괄적용하기에는 무리가 있다.
참조 : https://hadoop.apache.org/docs/r3.1.1/hadoop-project-dist/hadoop-hdfs/HDFSErasureCoding.html
2020년 12월 15일 화요일
hadoop name node heap size
hadoop cluster에서 특히 spark cluster를 hadoop data node에 구성을 하기 때문에 name node heap size는 신경 안써도 된다고 생각할 수도 있는데 hdfs가 많아질 수록 신경을 써야한다.
하둡 네임노드의 경우 memory는 data node의 file system에 영향을 받는다. 따라서 하둡 과거버전처럼 block size를 64MB로 할 때보다 128MB로 하면 네임노드의 heap이 확 줄어드는 것을 관찰 할 수 있을 것이다.
아무래도 데이터를 큰 바구니에 담으면 기억해야하는 바구니 갯수가 줄어들테니 당연한 일이다.
아래는 기록용으로 남기려고 한다.
hadoop v2.7.5
하둡 2.7.5버전에서의 block size 256MB이고 모든 데이터가 3 replica를 갖는다.
file과 directories 갯수에 따른 메모리 사용량이다.
27270751 files and directories, 22352862 blocks = 49623613 total filesystem object(s).
Heap Memory used 27.18 GB of 39.51 GB Heap Memory. Max Heap Memory is 56.89 GB.
Non Heap Memory used 104.05 MB of 106.5 MB Commited Non Heap Memory. Max Non Heap Memory is -1 B.
hadoop v3.1.4
현재 하둡 3.1.4에서 block size도 동일하게 256MB를 잡았고 동일하게 3 replica이다. 100% 마이그레이션을 하지는 않았는데 replica 쪽에 개선이 있어서 메모리를 덜 사용할 것이다.
7,743 files and directories, 11,767 blocks (11,767 replicated blocks, 0 erasure coded block groups) = 19,510 total filesystem object(s).
Heap Memory used 1.99 GB of 5.08 GB Heap Memory. Max Heap Memory is 56.89 GB.
Non Heap Memory used 80.31 MB of 83.31 MB Commited Non Heap Memory. Max Non Heap Memory is <unbounded>.
데이터, 배치 마이그레이션 끝내고 꾸준히 모니터링을 하면서 얼마나 사용하는지 봐야겠다.
2020년 12월 4일 금요일
apache mesos를 세팅하면서
2020년 11월 30일 월요일
Apache Hadoop 3.1.4 HA(High Availability) Install Guide with Cloudera guide
하지만 프로덕션 환경에서 설치하기위해서 최적의 구성을 생각했어야했는데 개인적으로 생각한 내용과 클라우데라에서 제안한 아키텍처를 보더라도 크게 차이는 없었다.
출처 : 클라우데라 제안 구성 |
여기서 상황에 맞게 필요한 것만 구성을 하고 결론은 마스터 3대, 배치서버1대, 데이터노드 N대로 구성하고 다음과 같은 룰을 따르고자 한다.
리소스 매니저는 Yarn을 한번 올려보고.. 실제로 사용은 Mesos로 컨트롤 할 것이다. Mesos의 장점이 하둡 클러스터의 구간을 나눠놓고 1번 메소스, 2번 메소스, 3번 메소스 클러스터로 분리운영이 가능한데 이는 서버가 장애가 날때, 혹은 배치들을 분리했을 때 운영상 장점이 있기 때문이다.
대략적인 구조는 이런식으로 가져간다.
마스터1 : 주키퍼 저널노드 네임노드(HA) ZKFC 리소스매니저(HA) 메소스마스터
마스터2 : 주키퍼 저널노드 네임노드(HA) ZKFC 리소스매니저(HA) 메소스마스터
마스터3 : 주키퍼 저널노드 잡히스토리서버 스파크히스토리서버 메소스마스터
슬레이브서버들 : 데이터노드 메소스슬레이브
사실 yarn을 쓸 일은 거의 없긴한데 일단 sqoop이나 다른 프레임워크들과 etl을 하면서 필요할 일이 있으니 넣어두고 ha를 굳이 안해도되는데 간단하니까 해보자. 왠만하면 최소화로 가져가야 서버를 리부팅할때 올려야할 서비스를 빼먹지 않기 때문에 간소화하는 것을 좋아한다.
설치는 자바설치, /etc/hosts, 인증키 배포 등 기본적으로 세팅해놓아야 할 설정들은 모두 되었다고 가정을 한다.
zoo_sample.cfg를 복사해서 zoo.cfg를 만들고
<configuration><!-- for NameNode --><property><name>dfs.namenode.name.dir</name><value>/appData01/hadoop/hdfs/name,/appData02/hadoop/hdfs/name,/appData03/hadoop/hdfs/name,/appData04/hadoop/hdfs/name</value></property><!-- for DataNode --><property><name>dfs.datanode.data.dir</name><value>/appData01/hadoop/hdfs/data,/appData02/hadoop/hdfs/data,/appData03/hadoop/hdfs/data,/appData04/hadoop/hdfs/data,/appData05/hadoop/hdfs/data</value></property><!-- HA --><property><name>dfs.nameservices</name><value>hadoop-cluster-bdnode</value></property><property><name>dfs.ha.namenodes.hadoop-cluster-bdnode</name><value>namenode1,namenode2</value></property><property><name>dfs.namenode.rpc-address.hadoop-cluster-bdnode.namenode1</name><value>master1:8020</value></property><property><name>dfs.namenode.rpc-address.hadoop-cluster-bdnode.namenode2</name><value>master2:8020</value></property><property><name>dfs.namenode.http-address.hadoop-cluster-bdnode.namenode1</name><value>master1:50070</value></property><property><name>dfs.namenode.http-address.hadoop-cluster-bdnode.namenode2</name><value>master2:50070</value></property><property><name>dfs.namenode.shared.edits.dir</name><value>qjournal://master1:8485;master2:8485;master3:8485/hadoop-cluster-bdnode</value></property><property><name>dfs.client.failover.proxy.provider.hadoop-cluster-bdnode</name><value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value></property><property><name>dfs.ha.fencing.methods</name><value>sshfence</value></property><property><name>dfs.ha.fencing.ssh.private-key-files</name><value>/home/scom/.ssh/id_rsa</value></property><property><name>dfs.journalnode.edits.dir</name><value>/appData01/hadoop/hdfs/journalnode</value></property><property><name>dfs.ha.automatic-failover.enabled</name><value>true</value></property><property><name>dfs.blocksize</name><value>268435456</value></property><property><name>dfs.namenode.handler.count</name><value>100</value></property><property><name>dfs.hosts</name><value>/data01/sw/hadoop/etc/hadoop/workers</value></property><property><name>dfs.webhdfs.enabled</name><value>true</value></property><property><name>dfs.namenode.datanode.registration.ip-hostname-check</name><value>false</value></property><property><name>dfs.datanode.du.reserved</name><!-- cluseter variant 90G --><value>96636764160</value><description>Reserved space in bytes per volume. Always leave this much space free for non dfs use.</description></property></configuration>
<configuration><property><name>fs.defaultFS</name><value>hdfs://hadoop-cluster-bdnode</value></property><property><name>ha.zookeeper.quorum</name><value>master1:2181,master2:2181,master3:2181</value></property><!-- trash --><property><name>fs.trash.interval</name><value>14400</value><description>Number of minutes after which the checkpoint gets deleted. If zero, the trash feature is disabled. </description></property><!-- for HUE--><property><name>hadoop.proxyuser.hue.hosts</name><value>*</value></property><property><name>hadoop.proxyuser.hue.groups</name><value>*</value></property></configuration>
<configuration><!-- Site specific YARN configuration properties --><property><name>yarn.nodemanager.aux-services</name><value>mapreduce_shuffle</value></property><property><name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name><value>org.apache.hadoop.mapred.ShuffleHandler</value></property><property><name>yarn.nodemanager.local-dirs</name><value>/appData01/hadoop/yarn/nm-local-dir,/appData02/hadoop/yarn/nm-local-dir,/appData03/hadoop/yarn/nm-local-dir,/appData04/hadoop/yarn/nm-local-dir</value></property><property><name>yarn.resourcemanager.address.rm1</name><value>master1:8032</value></property><property><name>yarn.resourcemanager.webapp.address.rm1</name><value>master1:8088</value></property><property><name>yarn.resourcemanager.scheduler.address.rm1</name><value>master1:8030</value></property><property><name>yarn.resourcemanager.resource-tracker.address.rm1</name><value>master1:8031</value></property><property><name>yarn.resourcemanager.admin.address.rm1</name><value>master1:8041</value></property><property><name>yarn.resourcemanager.address.rm2</name><value>master2:8032</value></property><property><name>yarn.resourcemanager.webapp.address.rm2</name><value>master2:8088</value></property><property><name>yarn.resourcemanager.scheduler.address.rm2</name><value>master2:8030</value></property><property><name>yarn.resourcemanager.resource-tracker.address.rm2</name><value>master2:8031</value></property><property><name>yarn.resourcemanager.admin.address.rm2</name><value>master2:8041</value></property><property><name>yarn.resourcemanager.fs.state-store.uri</name><value>/appData01/hadoop/yarn/system/rmstore,/appData02/hadoop/yarn/system/rmstore,/appData03/hadoop/yarn/system/rmstore,/appData04/hadoop/yarn/system/rmstore</value></property><property><name>yarn.resourcemanager.hostname.rm1</name><value>master1</value></property><property><name>yarn.resourcemanager.hostname.rm2</name><value>master2</value></property><!-- configure yarn --><property><name>yarn.nodemanager.resource.cpu-vcores</name><value>10</value></property><property><name>yarn.nodemanager.resource.memory-mb</name><value>40960</value></property><property><name>yarn.scheduler.minimum-allocation-mb</name><value>1024</value></property><property><name>yarn.scheduler.maximum-allocation-mb</name><value>24576</value></property><!-- ResourceManager 시작시 state 복구여부 --><property><name>yarn.resourcemanager.recovery.enabled</name><value>true</value></property><!-- persistent store로 사용할 class --><property><name>yarn.resourcemanager.store.class</name><value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value></property> <!-- Zookeeper 서버 리스트 --><property><name>yarn.resourcemanager.zk-address</name><value>master1:2181,master2:2181,master3:2181</value></property><property><name>yarn.resourcemanager.ha.enabled</name><value>true</value></property><!-- ResourceManager가 leader election에 참가할 cluster 이름 지정 --><property><name>yarn.resourcemanager.cluster-id</name><value>rm-cluster</value></property><!-- cluster에서 HA를 구성할 ResourceManager id 지정 --><property><name>yarn.resourcemanager.ha.rm-ids</name><value>rm1,rm2</value></property></configuration>
<configuration><property><name>mapreduce.framework.name</name><value>yarn</value></property><!-- mapreduce2 memory setting --><property><name>yarn.app.mapreduce.am.resource.mb</name><value>1536</value></property><property><name>mapreduce.map.memory.mb</name><value>4096</value></property><property><name>mapreduce.reduce.memory.mb</name><value>8192</value></property><!-- mapreduce java memory setting --><property><name>mapreduce.map.java.opts.max.heap</name><value>3276</value></property><property><name>mapreduce.reduce.java.opts.max.heap</name><value>6553</value></property><property><name>yarn.app.mapreduce.am.resource.mb</name><value>2048</value></property><property><name>yarn.app.mapreduce.am.env</name><value>HADOOP_MAPRED_HOME=/data01/sw/hadoop</value></property><property><name>mapreduce.map.env</name><value>HADOOP_MAPRED_HOME=/data01/sw/hadoop</value></property><property><name>mapreduce.reduce.env</name><value>HADOOP_MAPRED_HOME=/data01/sw/hadoop</value></property></configuration>
2020년 11월 28일 토요일
2020 Data Conference Speaker로 참여한 후기 (주제 : Spark+Cassandra 기반 Big Data를 활용한 추천 시스템 서빙 파이프라인 최적화)
2020년 11월 22일 일요일
Artificial Intelligence - Bayesian Networks
2020년 10월 26일 월요일
Source and target differ in block-size. Use -pb to preserve block-sizes during copy
이때 distcp 명령어를 사용하고 이는 mapreduce로 수행이 되기 때문에 성능이 좋다.
(distcp는 하둡 클러스터간 파일을 복사할 때 유용하다.)
2020년 8월 31일 월요일
MAB(Multi Armed Bandit) 대안으로서의 Dynamic Ensemble
MAB(Multi Armed Bandit)
A/B test를 보다 더 유연하게 하면서 최적의 return을 찾아가는 방법이다.
A/B test의 경우 A/B 셋을 준비하고 타겟에 따라 A 혹은 B를 리턴을 주면 되기 때문에 구현하기 쉽기는 하지만 분명히 only A,B라는 한계점이 존재한다.
MAB는 이를 조금 더 확장한 개념으로 n개의 셋을 준비해서 보상에 따라 적절한 셋을 return 하는 것이 concept이다.
가령 상품추천을 할 때에 A모델, B모델, C모델, D모델, 혹은 앙상블된 모델을 미리 준비하고 CTR 등의 보상에 따라 적절한 모델을 return 하도록 구현할 수 있다.
모델의 갯수가 충분하고, 모델을 집계하는데 걸리는 시간이 충분하다면 MAB는 좋은 선택지가 될 것이다.
하지만 불가능한 상황이 분명히 존재한다!!
DE(Dynamic Ensemble)
단순히 A,B 두개의 상품추천 모델만이 존재할 수 있다. 이 때는 A와 B를 적절한 비율로 섞어서 사용해야 하는 경우를 가정해볼 수 있다.
과연 A+B 앙상블된 모델 1개만이 존재할 때 MAB를 어떻게 할 수 있을까?
분명히 MAB는 아니다. 하지만 A와 B를 섞는 비율 n:m을 보상을 통해 동적으로 조절한다면 충분히 비슷한 효과를 낼 수 있을 것이다.
물론 말단에서 어떤 모델을 사용할 것인지 고르는 것이 MAB라면 이는 아예 모델을 만들 때 보상을 사용하고 1개의 모델만 생성되기 때문에 MAB라고 부르기엔 무리가 있다.
하지만 적절한 결과를 return 해야하고, 보상에 사용할 수 있는 방법을 똑같이 사용할 수 있기에 비슷하기도 하다!
그래도 용어를 다르게 한다면 아마 적절한 용어로는 Dynamic Ensemble이 될 것이다.
MAB를 하려다가 상황이 안맞을 것 같아서 DE방식으로 방향을 틀어야할 수도 있는데 개인적인 생각이지만 적절한 대안같다.
2020년 8월 18일 화요일
kafka + logstash saving to hdfs (webhdfs)
2020년 8월 7일 금요일
Kafka + Spark Streaming saving to Cassandra
case class TEST ( COL1:String, COL2:String, COL3:String, COL4:String )
val sparkConf = new SparkConf()
.setAppName("kapark2cassandra")
.set("spark.cassandra.auth.username", "cassandra")
.set("spark.cassandra.auth.password", "cassandra")
.set("spark.cassandra.connection.host", "cassandra cluster")
val ssc = new StreamingContext(sparkConf, Seconds(3))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "zk cluster",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "kapark2cassandra",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("kapark2cassandra")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
stream.foreachRDD { rdd =>
val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
import spark.implicits._
val ods = rdd
.map(record => record.value.toString.drop(1).dropRight(1).split(",(?=([^\"]*\"[^\"]*\")*[^\"]*$)", -1)
.map(_.split(":(?=([^\\\"]*\\\"[^\\\"]*\\\")*[^\\\"]*$)")).map(arr => (arr(0) -> arr(1))).toMap)
.map(rec =>
TEST(
rec.get("\"COL1\"").mkString.replace("\"", "").trim
, rec.get("\"COL2\"").mkString.replace("\"", "").trim
, rec.get("\"COL3\"").mkString.replace("\"", "").trim
, rec.get("\"COL4\"").mkString.replace("\"", "").trim
)
).toDF()
ods.createOrReplaceTempView("kapark2cassandra")
var result = spark.sql(
"""
select from_unixtime(col1/1000,"yyyy-MM-dd HH:mm:ss") as col1
, col2
, col3
, col4
from kapark2cassandra
where col2 = some
""")
result.write.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "test" , "keyspace" -> "real_time", "ttl" -> "604800"))
.mode(org.apache.spark.sql.SaveMode.Append).save()
}
ssc.start()
ssc.awaitTermination()
object SparkSessionSingleton {
@transient private var instance: SparkSession = _
def getInstance(sparkConf: SparkConf): SparkSession = {
if (instance == null) {
instance = SparkSession
.builder
.config(sparkConf)
.getOrCreate()
}
instance
}
}
def main(args: Array[String]) {
new kapark2cassandra().run(args)
}