2020년 12월 31일 목요일

2020년 회고

2020년 올해는 기술적으로 이것 저것 많이 시도해봤다. 

데이터엔지니어로서 데이터를 다루는 것도 중요하지만 데이터를 수집하고 저장하고 처리하는 구조를 만드는 일이 더 중요하고 어려운 일이라고 생각하기 때문에 이 부분을 공략하는데 주력했던 것 같다. 또한 한번 만들어놓은 구조는 쉽게 바꾸지않기 때문에 다뤄볼 기회가 적은 것도 사실이다.

1분기 쯤에는 카산드라 클러스터에 ZING GC를 도입하기 위해 여러 테스트를 해봤다. G1GC, CMS GC 등 여러 GC를 비교해보고 결국 ZING을 선택했고 국내에서는 최초로 카산드라+ZING 조합의 레퍼런스가 되었다.

2분기 쯤에는 Spark+Cassandra조합으로 memtable을 헤비하게 사용하다가 결국 장애를 발생했고 sstable로 전환하는 작업을 진행했다. 한번에 성공한 것은 아니고 하나를 고칠 때 마다 네트워크장애, 디스크 장애가 순차적으로 발생하여 하나하나 들여다보는 계기가 되었다. 결국 수 많은 시행착오 끝에 꽤 괜찮게 최적화를 했고 분산환경에서 배치시간이나 성능이슈를 해결하는 노하우가 생겼다.

3분기 쯤에는 추천시스템의 A/B Test를 자동화할 수 있는 궁극기(?)인 MAB(Multi Armed Bandit)을 적용하고 싶었다. 여러 자료를 찾아보고 결국에는 도메인으로 풀어야한다는 생각을 개인적으로 했다. 먼저 데이터 수집이 안되고 있었기 때문에 kafka, logstash로 먼저 데이터를 수집했고 이후 Spark Batch로 필요한 데이터를 연산까지는 했지만 결국은 적용하지는 못했다.

4분기 쯤에 가장 기억나는 일은 2020 Data Conference에서 발표를 했던 일이다. 많은 분들이 도움을 주셨고 팀원분들과 Azul Systems 대표님께 감사하게 생각하고 있다.

그리고 아직 진행 중이기는 하지만 거의 마무리가 된 일 중에 하둡2에서 하둡3으로 업그레이드하면서 하둡, 메소스, 스파크 클러스터를 새로 구축하고 아즈카반을 활용한 배치를 모두 옮기면서 전체적인 구조를 다시 한번 생각해보게 되었다. 시스템 분들의 도움을 많이 받아서 중구난방인 디스크 배치도 다시 잡고 Raid 구성도 서버의 역할마다 다르게 세팅하고 재밌었다.

2020년은 빅데이터 플랫폼을 구성하는 각각의 프레임워크를 조금 더 깊게 다뤄보고 안쓰는 기능도 도입하기 위한 테스트 해보고, 설계도 해보면서 꽤 재밌었다.

블로그도 2019년에 비하면 꽤 올랐다.


하둡 3.1.2는 그냥 집에서 개인적으로 설치한 건데 생각보다 조회수가 많다. 내년에는 프로덕션 환경에서 적용할 수 있는 HA 구성까지 한 3.1.4 버전으로 많이 유입이 되었으면 좋겠다.



아무튼 2021년에는 더 재밌는 일이 찾아오기를 바란다.

2020년 12월 30일 수요일

hadoop 3.x ec policy(erasure coding) vs replication 3

하둡3에서 ec policy라는 기능이 생겼다. 말로만 들었던 기능인데 이참에 테스트할 겸 적용해보기로 한다.

하둡2에서 하둡3으로 데이터 마이그레이션을 할 때 공간이슈 때문에 3복제 대신에 ec policy 적용을 고려해보았다.


적용 자체는 path별로 구분해서 적용할 수 있게 되어있었고 적용하면 replication이 아닌 ec policy에 의해 1 replication + @로 저장&관리되는 형태이다.


각종 정보(ex 패리티비트)로 인해 블록수를 더 많이 자치할 수 있어 namenode 입장에서 약간의 overhead가 있다고는 하지만 case by case인것 같고 내 기준에서는 크게 무리될 정도는 아니었기에 바로 적용을 해보았다. 

3 replica가 1 replica로 즉시 바뀌지는 않고 신규로 write되는 것부터 바뀌는 것 같다. 

즉 해당 옵션은 hdfs path별로 on/off 시켜서 동작할 수 있다.


기존 3 replica의 경우 3배의 용량을 차지하는데 ec policy를 사용하면 1.3배 정도로도 데이터를 저장할 수 있다고 한다. 


아무튼 적용하고 장애 테스트를 해보기 위해 해당 block을 저장하고 있는 datanode들을 down시켰더니 그 즉시 해당 data를 읽을 수 없는 문제가 존재했다.

3 replica의 경우 데이터노드가 몇 대 내려가더라도 1대만 살아있으면 즉시 제대로 동작하는데 ec policy는 그러지 못하는 것 같다. 

(또한 -replication 옵션과 -policy옵션을 동시에 사용이 불가능하다.)


현재까지 테스트한 결론으로는 일회성으로 read하는 단순 저장용이나 또는 복구가능한 집계성 데이터는 ec policy를 적용하고 원천 데이터처럼 자주 read하는 데이터들은 그냥 3 replication이 좋을 것 같다.

어쨌든 배치를 돌려야하는데 즉시 read하지 못하기 때문에 적용이 불가능해보인다.


시간관계상 제대로 알아보지 못했지만 분명한 것은 일괄적용하기에는 무리가 있다.

참조 : https://hadoop.apache.org/docs/r3.1.1/hadoop-project-dist/hadoop-hdfs/HDFSErasureCoding.html

2020년 12월 15일 화요일

hadoop name node heap size

hadoop cluster에서 특히 spark cluster를 hadoop data node에 구성을 하기 때문에 name node heap size는 신경 안써도 된다고 생각할 수도 있는데 hdfs가 많아질 수록 신경을 써야한다.

하둡 네임노드의 경우 memory는 data node의 file system에 영향을 받는다. 따라서 하둡 과거버전처럼 block size를 64MB로 할 때보다 128MB로 하면 네임노드의 heap이 확 줄어드는 것을 관찰 할 수 있을 것이다.

아무래도 데이터를 큰 바구니에 담으면 기억해야하는 바구니 갯수가 줄어들테니 당연한 일이다.


아래는 기록용으로 남기려고 한다.

hadoop v2.7.5

하둡 2.7.5버전에서의 block size 256MB이고 모든 데이터가 3 replica를 갖는다. 

file과 directories 갯수에 따른 메모리 사용량이다.


27270751 files and directories, 22352862 blocks = 49623613 total filesystem object(s).

Heap Memory used 27.18 GB of 39.51 GB Heap Memory. Max Heap Memory is 56.89 GB.

Non Heap Memory used 104.05 MB of 106.5 MB Commited Non Heap Memory. Max Non Heap Memory is -1 B.



hadoop v3.1.4

현재 하둡 3.1.4에서 block size도 동일하게 256MB를 잡았고 동일하게 3 replica이다. 100% 마이그레이션을 하지는 않았는데 replica 쪽에 개선이 있어서 메모리를 덜 사용할 것이다.


7,743 files and directories, 11,767 blocks (11,767 replicated blocks, 0 erasure coded block groups) = 19,510 total filesystem object(s).

Heap Memory used 1.99 GB of 5.08 GB Heap Memory. Max Heap Memory is 56.89 GB.

Non Heap Memory used 80.31 MB of 83.31 MB Commited Non Heap Memory. Max Non Heap Memory is <unbounded>.


데이터, 배치 마이그레이션 끝내고 꾸준히 모니터링을 하면서 얼마나 사용하는지 봐야겠다.



2020년 12월 4일 금요일

apache mesos를 세팅하면서

resource manager 중에 yarn vs mesos 뭐가 더 좋은지는 모르겠지만 mesos가 ui 부분에서는 확실히 이쁜 것 같다. 자원도 이쁘게 할당하고 현재 connect된 agent(slave)와 task 각각의 상태를 보는 것이 편리하다. 


물론 사용할 기능적인 측면에서 yarn과 mesos는 100% 같다고 볼 수 있기는 하지만 가장 큰 차이점은 ui의 깔끔함 정도라고 생각을 한다. 찾아봐도 딱히 어떤 것이 좋다 할 수는 없고 대부분의 배치를 mesos로 돌리기 때문에 그런 것 일테지만 어쨌든 개인적으로는 mesos가 더 마음에 든다.

포스팅 시점으로 1.11.0 버전이 최신이다. 설치는 가이드 문서를 참고다. 



완전히 새로 세팅된 OS에서 깔끔하게 offline으로 설치를 하려고 빌드를 하고 실행을 해보면 결국 libaprutil-1.so.0 등의 수 많은 dependency가 없다는 에러때문에 실행이 되지않는다. 따라서 결국에는 가이드 문서에서 요구하는 dependency를 만족시키기 위해서는 초반에 OS를 설치하고 /usr/lib64에 설치될 dependency를 마구마구 설치해도록하자. 


mesos의 경우 zookeeper를 활용하여 HA를 구성하고 마스터는 2대 이상으로 구성을 한다.
master 3대, 나머지는 agent로 생각을하고 그냥 올리면 된다.

빌드를 한 곳에 실행 스크립트가 생기고 각 마스터 3대에서 아래처럼 마스터를 실행한다.
./mesos-master.sh --work_dir=/data01/sw/mesos --logging_level=ERROR --log_dir=/appData01/mesos/log --zk=zk://master1:2181,master2:2181,master3:2181/mesos --quorum=2 --cluster=test-mesos-cluster &
마스터가 3대이기 때문에 quorum은 2로 한다. 만약 2대일 경우 1로 주면 된다.

나머지 서버에서는 agent를 실행한다.
./mesos-slave.sh --master=zk://master1:2181,master2:2181,master3:2181/mesos --work_dir=/data01/mesos --resources='cpus:40;mem:112640' --logging_level=ERROR --no-systemd_enable_support &

resources 옵션을 통해서 해당 서버에서 cpu, gpu, memory, disk 등을 얼마나 포함할건지를 줄 수 있다. 위처럼 하면 --resources='cpus:40;mem:112640' 옵션을 주면 각 agent에서 cpu가 40개, memory가 110G씩 올라온다.
위 링크 문서에서 보면 --resources='cpus:24;gpus:2;mem:24576;disk:409600;ports:[21000-24000,30000-34000];bugs(debug_role):{a,b,c}' 옵션을 줄 수 있는데 port range와 role을 줄 수 있는데 이를 잘 사용한다면 설계할 때 최적으로 설계를 할 수 있을 것이다. 하지만 mesos cluster를 분리해서 운영하는게 더 관리하기는 편해보인다.

마지막으로 spark job을 수행할 때 --master 옵션을 다음처럼 주면 된다. 
--master mesos://zk://master1:2181,master2:2181,master3:2181/mesos

즉 전체를 예로 들면 다음과 같다.

bin/spark-submit \
--master mesos://zk://master1:2181,master2:2181,master3:2181/mesos \
--executor-memory 32G \
--driver-memory 32G \
--class "aaa" \
--total-executor-cores 300 \
/jar_path/test-assembly-1.0.jar

그리고 코드 상에서는 예를 들어 spark.executor.cores 같은 옵션을 통해 각 익스큐터당 자원을 잘 할당할 수 있고 이는 task가 한쪽 서버에 안몰리고 이쁘게 잘 펼쳐져서 배치가 돌 것 이다. 이 부분은 추후에 포스팅을 이어서 해보려고 한다.




2020년 11월 30일 월요일

Apache Hadoop 3.1.4 HA(High Availability) Install Guide with Cloudera guide

과거에 개인적으로 하둡 2~3 버전을 완전분산 모드로 설치를 해보면서 확실히 크게 달라진 점은 없었다. 당시에는 HA구성을 할 필요가 없었는데 이번에 HA 구성을 해보려고 한다.
하지만 프로덕션 환경에서 설치하기위해서 최적의 구성을 생각했어야했는데 개인적으로 생각한 내용과 클라우데라에서 제안한 아키텍처를 보더라도 크게 차이는 없었다.


출처 : 클라우데라 제안 구성

여기서 상황에 맞게 필요한 것만 구성을 하고 결론은 마스터 3대, 배치서버1대, 데이터노드 N대로 구성하고 다음과 같은 룰을 따르고자 한다.

리소스 매니저는 Yarn을 한번 올려보고.. 실제로 사용은 Mesos로 컨트롤 할 것이다. Mesos의 장점이 하둡 클러스터의 구간을 나눠놓고 1번 메소스, 2번 메소스, 3번 메소스 클러스터로 분리운영이 가능한데 이는 서버가 장애가 날때, 혹은 배치들을 분리했을 때 운영상 장점이 있기 때문이다.

대략적인 구조는 이런식으로 가져간다.


마스터1 : 주키퍼 저널노드 네임노드(HA) ZKFC 리소스매니저(HA) 메소스마스터

마스터2 : 주키퍼 저널노드 네임노드(HA) ZKFC 리소스매니저(HA) 메소스마스터

마스터3 : 주키퍼 저널노드 잡히스토리서버 스파크히스토리서버 메소스마스터

슬레이브서버들 : 데이터노드 메소스슬레이브


사실 yarn을 쓸 일은 거의 없긴한데 일단 sqoop이나 다른 프레임워크들과 etl을 하면서 필요할 일이 있으니 넣어두고 ha를 굳이 안해도되는데 간단하니까 해보자. 왠만하면 최소화로 가져가야 서버를 리부팅할때 올려야할 서비스를 빼먹지 않기 때문에 간소화하는 것을 좋아한다.

설치는 자바설치, /etc/hosts, 인증키 배포 등 기본적으로 세팅해놓아야 할 설정들은 모두 되었다고 가정을 한다.


먼저 마스터 3대에 주키퍼를 설치한다. (3.6.2 설치함.)
zoo_sample.cfg를 복사해서 zoo.cfg를 만들고
dataDir=path
server.1=master1:2888:3888
server.2=master2:2888:3888
server.3=master3:2888:3888
적당한 곳으로 세팅한다.
다 되었으면 주키퍼를 띄운다. (./zkServer.sh start)

다음은 바로 하둡이다.
압축을 풀고 
tar -zxvf hadoop-3.1.4.tar.gz
링크를 걸고
ln -s hadoop-3.1.4 hadoop
필요한 환경세팅을 세팅한다.



기존에는 slaves, masters로 나뉘어져있던 파일이 /etc/workers로 통합이 되었고 여기에 datanode가 올라갈 서버들을 적는다.

각각의 서버는 data01, appData01~04로 구성이 되어있고 하둡은 data01에 두고 log나 data를 쓰는 부분은 appData01로 구성한다. 하둡 같은 경우는 os부분만 레이드1(미러링) 하고 나머지는 솔직히 안해도 된다고 본다. 어차피 3복제이고 하둡을 다시 올리는데 큰 리소스가 안들기 때문이다.

그럼 설정파일을 작성하자.
하둡 설정파일들이 default로라도 되어있으면 좋은데 그냥 깡통이라서 사실 하나씩 쓰기가 너무 괴롭다..

hadoop-env.sh에 몇몇 필요한 경로 세팅을 하고 저장한다.
주석을 풀고 커스터마이징해서 작성하면 된다.
HADOOP_CONF_DIR
HADOOP_HEAPSIZE_MAX
HADOOP_WORKERS
HADOOP_LOG_DIR
HADOOP_PID_DIR
HADOOP_SECURE_PID_DIR


다음은 hdfs-site.xml이다.
<configuration>
        <!-- for NameNode -->
        <property>
                <name>dfs.namenode.name.dir</name>
                <value>/appData01/hadoop/hdfs/name,/appData02/hadoop/hdfs/name,/appData03/hadoop/hdfs/name,/appData04/hadoop/hdfs/name</value>
        </property>

        <!-- for DataNode -->
        <property>
                <name>dfs.datanode.data.dir</name>
                <value>/appData01/hadoop/hdfs/data,/appData02/hadoop/hdfs/data,/appData03/hadoop/hdfs/data,/appData04/hadoop/hdfs/data,/appData05/hadoop/hdfs/data</value>
        </property>

        <!-- HA -->
        <property>
                <name>dfs.nameservices</name>
                <value>hadoop-cluster-bdnode</value>
        </property>
        <property>
                <name>dfs.ha.namenodes.hadoop-cluster-bdnode</name>
                <value>namenode1,namenode2</value>
        </property>
        <property>
                <name>dfs.namenode.rpc-address.hadoop-cluster-bdnode.namenode1</name>
        <value>master1:8020</value>
        </property>
            <property>
                    <name>dfs.namenode.rpc-address.hadoop-cluster-bdnode.namenode2</name>
                        <value>master2:8020</value>
            </property>
        <property>
                <name>dfs.namenode.http-address.hadoop-cluster-bdnode.namenode1</name>
                <value>master1:50070</value>
        </property>
        <property>
                <name>dfs.namenode.http-address.hadoop-cluster-bdnode.namenode2</name>
                <value>master2:50070</value>
        </property>
        <property>
                <name>dfs.namenode.shared.edits.dir</name>
                <value>qjournal://master1:8485;master2:8485;master3:8485/hadoop-cluster-bdnode</value>
        </property>
        <property>
                <name>dfs.client.failover.proxy.provider.hadoop-cluster-bdnode</name>
                <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
        </property>
        <property>
                <name>dfs.ha.fencing.methods</name>
                <value>sshfence</value>
        </property>
        <property>
                <name>dfs.ha.fencing.ssh.private-key-files</name>
                <value>/home/scom/.ssh/id_rsa</value>
        </property>
        <property>
                <name>dfs.journalnode.edits.dir</name>
                <value>/appData01/hadoop/hdfs/journalnode</value>
        </property>
        <property>
                <name>dfs.ha.automatic-failover.enabled</name>
                <value>true</value>
        </property>
        <property>
                <name>dfs.blocksize</name>
                <value>268435456</value>
        </property>
        <property>
                <name>dfs.namenode.handler.count</name>
                <value>100</value>
        </property>
        <property>
                <name>dfs.hosts</name>
                <value>/data01/sw/hadoop/etc/hadoop/workers</value>
        </property>
        <property>
                <name>dfs.webhdfs.enabled</name>
                <value>true</value>
        </property>
        <property>
                <name>dfs.namenode.datanode.registration.ip-hostname-check</name>
                <value>false</value>
        </property>

        <property>
                <name>dfs.datanode.du.reserved</name>
                <!-- cluseter variant 90G -->
                <value>96636764160</value>
                <description>Reserved space in bytes per volume. Always leave this much space free for non dfs use.</description>
        </property>
</configuration>


다음은 core-site.xml이다.
<configuration>
        <property>
                <name>fs.defaultFS</name>
                <value>hdfs://hadoop-cluster-bdnode</value>
        </property>
        <property>
                <name>ha.zookeeper.quorum</name>
                <value>master1:2181,master2:2181,master3:2181</value>
        </property>
        <!-- trash -->
        <property>
                <name>fs.trash.interval</name>
                <value>14400</value>
                <description>Number of minutes after which the checkpoint gets deleted. If zero, the trash feature is disabled. </description>
        </property>
        <!-- for HUE-->
        <property>
                <name>hadoop.proxyuser.hue.hosts</name>
                <value>*</value>
        </property>
        <property>
                <name>hadoop.proxyuser.hue.groups</name>
                <value>*</value>
        </property>
</configuration>


다음은 yarn-site.xml 이다.
<configuration>

<!-- Site specific YARN configuration properties -->
        <property>
                <name>yarn.nodemanager.aux-services</name>
                <value>mapreduce_shuffle</value>
        </property>
        <property>
                <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
                <value>org.apache.hadoop.mapred.ShuffleHandler</value>
        </property>
        <property>
                <name>yarn.nodemanager.local-dirs</name>
                <value>/appData01/hadoop/yarn/nm-local-dir,/appData02/hadoop/yarn/nm-local-dir,/appData03/hadoop/yarn/nm-local-dir,/appData04/hadoop/yarn/nm-local-dir</value>
        </property>
        <property>
                <name>yarn.resourcemanager.address.rm1</name>
                <value>master1:8032</value>
        </property>
        <property>
                <name>yarn.resourcemanager.webapp.address.rm1</name>
                <value>master1:8088</value>
        </property>
        <property>
                <name>yarn.resourcemanager.scheduler.address.rm1</name>
                <value>master1:8030</value>
        </property>
        <property>
                <name>yarn.resourcemanager.resource-tracker.address.rm1</name>
                <value>master1:8031</value>
        </property>
        <property>
                <name>yarn.resourcemanager.admin.address.rm1</name>
                <value>master1:8041</value>
        </property>

        <property>
                <name>yarn.resourcemanager.address.rm2</name>
                <value>master2:8032</value>
        </property>
        <property>
                <name>yarn.resourcemanager.webapp.address.rm2</name>
                <value>master2:8088</value>
        </property>
        <property>
                <name>yarn.resourcemanager.scheduler.address.rm2</name>
                <value>master2:8030</value>
        </property>
        <property>
                <name>yarn.resourcemanager.resource-tracker.address.rm2</name>
                <value>master2:8031</value>
        </property>
        <property>
                <name>yarn.resourcemanager.admin.address.rm2</name>
                <value>master2:8041</value>
        </property>

        <property>
                <name>yarn.resourcemanager.fs.state-store.uri</name>
                <value>/appData01/hadoop/yarn/system/rmstore,/appData02/hadoop/yarn/system/rmstore,/appData03/hadoop/yarn/system/rmstore,/appData04/hadoop/yarn/system/rmstore</value>
        </property>
        <property>
                <name>yarn.resourcemanager.hostname.rm1</name>
                <value>master1</value>
        </property>
        <property>
                <name>yarn.resourcemanager.hostname.rm2</name>
                <value>master2</value>
        </property>

        <!-- configure yarn -->
        <property>
                <name>yarn.nodemanager.resource.cpu-vcores</name>
                <value>10</value>
        </property>
        <property>
                <name>yarn.nodemanager.resource.memory-mb</name>
                <value>40960</value>
        </property>
        <property>
                <name>yarn.scheduler.minimum-allocation-mb</name>
                <value>1024</value>
        </property>
        <property>
                <name>yarn.scheduler.maximum-allocation-mb</name>
                <value>24576</value>
        </property>

        <!-- ResourceManager 시작시 state 복구여부 --> 
        <property> 
                 <name>yarn.resourcemanager.recovery.enabled</name> 
                 <value>true</value> 
        </property> 
        <!-- persistent store로 사용할 class --> 
        <property> 
                 <name>yarn.resourcemanager.store.class</name> 
                 <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value> 
        </property> <!-- Zookeeper 서버 리스트 --> 
        <property> 
                 <name>yarn.resourcemanager.zk-address</name> 
                 <value>master1:2181,master2:2181,master3:2181</value> 
        </property>
        <property>
                <name>yarn.resourcemanager.ha.enabled</name>
                <value>true</value>
        </property>

        <!-- ResourceManager가 leader election에 참가할 cluster 이름 지정 -->
        <property>
                <name>yarn.resourcemanager.cluster-id</name>
                <value>rm-cluster</value>
        </property>
        <!-- cluster에서 HA를 구성할 ResourceManager id 지정 -->
        <property>
                <name>yarn.resourcemanager.ha.rm-ids</name>
                <value>rm1,rm2</value>
        </property>
</configuration>


다음은 mapred-site.xml이다
<configuration>
        <property>
                <name>mapreduce.framework.name</name>
                <value>yarn</value>
        </property>

        <!-- mapreduce2 memory setting -->
        <property>
                <name>yarn.app.mapreduce.am.resource.mb</name>
                <value>1536</value>
        </property>
        <property>
                <name>mapreduce.map.memory.mb</name>
                <value>4096</value>
        </property>
        <property>
                <name>mapreduce.reduce.memory.mb</name>
                <value>8192</value>
        </property>

        <!-- mapreduce java memory setting -->
        <property>
                <name>mapreduce.map.java.opts.max.heap</name>
                <value>3276</value>
        </property>
        <property>
                <name>mapreduce.reduce.java.opts.max.heap</name>
                <value>6553</value>
        </property>

                <property>
                <name>yarn.app.mapreduce.am.resource.mb</name>
                <value>2048</value>
        </property>

                <property>
                                 <name>yarn.app.mapreduce.am.env</name>
                                 <value>HADOOP_MAPRED_HOME=/data01/sw/hadoop</value>
                </property>
                <property>
                                <name>mapreduce.map.env</name>
                            <value>HADOOP_MAPRED_HOME=/data01/sw/hadoop</value>
                </property>
                <property>
                                <name>mapreduce.reduce.env</name>
                                <value>HADOOP_MAPRED_HOME=/data01/sw/hadoop</value>
                </property>

</configuration>


여기까지 하면 하둡 파일은 다 작성이 되었고 이 디렉토리를 마스터2~3, 데이터노드들에 복사를 하자.

scp -r hadoop-3.1.4 ip:/data01/sw/ 


네임노드 HA는 주키퍼로 한 것인데 주키퍼는 가벼우면서도 참으로 편리하다.
zkfc 실행 전 포맷 한번 해주고
hdfs zkfc -formatZK

마스터 1~3번기에서 저널노드 실행
현재 : hdfs --daemon start journalnode 
과거 : hadoop-daemon.sh start journalnode

마스터 1번기에서 네임노드 초기화
hdfs namenode -format

마스터 1번기에서 네임노드 실행
현재 : hdfs --daemon start namenode 
과거 : hadoop-daemon.sh start namenode


마스터 1번기에서 zkfc 실행
현재 : hdfs --daemon start zkfc 
과거 : hadoop-daemon.sh start zkfc

이제 마스터 1번기가 active 상태가 되었다.
이제 마스터 2번기를 standby로 올리자.

마스터 2번기에서 스탠바이상태로 세팅
hdfs namenode -bootstrapStanby

마스터 2번기에서 스탠바이 네임노드 실행
hdfs --daemon start namenode 

마스터 2번기에서 zkfc 실행
hdfs --daemon start zkfc

이제 리소스매니저(yarn)을 올린다.
마스터 1번기에서 yarn --daemon start resourcemanager 실행.

마스터 3번기에서 jobhistory server 실행
현재 : mapred --daemon start historyserver
과거 : sbin/mr-jobhistory-daemon.sh start historyserver

마스터 2번기에서도 yarn을 올린다.
yarn --daemon start resourcemanager

마지막으로 데이터노드들에서 노드매니저를 올린다.
yarn --daemon start nodemanager

그림 작업은 끝이났다.
마스터1:50070 혹은 마스터2:50070에 접근을 해보면 active/standby를 확인할 수 있다.
테스트를 해보기 위해서 active namenode를 kill해보면 아직 사용을 안해서 거의 즉시 failover가 되기는 한다. 이제 배치에서도 네임스페이스를 활용하도록 하자!




네임노드 ha와 리소스매니저 ha가 끝이났다.
하둡이 버전이 올라가면서 가장 크게 바뀐 것은 아무래도 명령어가 바뀌고 workers 파일로 바뀌었다는것, 포트가 바뀌었다는 것, 내부적으로는 압축 효율이 바뀌었다는 점 정도인 것 같다. 그리고 ui가 많이 바뀌었다. 적응이 안된다.

만약에 설치를 하다가 제대로 안되면(특히 포맷) 해당 개인이 사용하려고 생성했던 디렉토리를 삭제하고 다시 해보도록 하자. 그 안에 파일들이 이미 만들어져서 사용되고 있기 때문에 안될 가능성이 높다.

아래 명령어로 네임노드 Active/Standby를 확인 가능하다. 한번만 쳐도 알아낼 수 있는 명령어가 있었으면 좋겠다.
hdfs haadmin -getServiceState namenode1
hdfs haadmin -getServiceState namenode2

리소스매니저 Active/Standby 상태 확인은 다음과 같다.
yarn rmadmin -getServiceState rm1
yarn rmadmin -getServiceState rm2

개인적으로 네임노드2대를 VIP로 사용하여 인터페이스를 해야하는 경우가 아니라면 딱히 VIP는 필요없어보이고 어차피 네임스페이스로 접근하여 해결하도록 하자.

2020년 11월 28일 토요일

2020 Data Conference Speaker로 참여한 후기 (주제 : Spark+Cassandra 기반 Big Data를 활용한 추천 시스템 서빙 파이프라인 최적화)

연초에 항상 1~3년간의 장기 계획, 1달씩 단기 계획을 세우면서 어떤 부분을 새로 혹은 보강하기 위해 레벨업 할지 목표를 정하는데 계획에 없던 컨퍼런스에 연사로 참여하게 되었습니다. 컴퓨터월드/IT Daily에서 주최하고 양재 엘타워에서 진행하기 때문에 큰 컨퍼런스라서 부담이 되었는데 함께 일하는 빅데이터 파트분들과 Azul Systems에서 많이 도와주셨습니다.

데이터 활용 AI&빅데이터, 보안 트랙 부문에서 발표를 진행했고 아무래도 실무를 하는 입장에서 개발적인 부분을 많이 넣고 싶었고, 카산드라를 많이 사용하기를 바라면서 홍보하고 싶고, 그리고 Zing JVM을 적용하면서 성능상 이점을 본 것들을 소개하고 싶었습니다.

주제는 Spark+Cassandra 기반 Big Data를 활용한 추천 시스템 서빙 파이프라인 최적화로 정했고 발표 내용은 어느 정도 카산드라를 사용하는 입장에서는 꼭 고려해야할 부분이라서 어렵지 않은 부분이지만 분산 환경에서 프레임워크들을 사용하다 보면 누구나 겪을 수 있는 상황이라는 측면에서 제가 경험했던 상황으로 풀어보려고 했습니다. 어떻게 트러블 슈팅을 하고 최적화를 했는지, 코드를 짜더라도 Network&Disk I/O 같은 인프라 환경을 고려해야한다는 것을 주로 설명하고자 했습니다.

그리고 국내에선 Cassandra+Zing 조합으로 레퍼런스가 없어서 이 부분이 가지는 이점을 최대한 설명하고자 했는데 국내에서는 일단 Cassandra를 사용하는 분들이 늘어나기를 기대합니다.
(카산드라 한국 사용자모임 그룹 홍보 : https://www.facebook.com/groups/cassandra.kr)







2020년 11월 22일 일요일

Artificial Intelligence - Bayesian Networks

베이지안 네트워크는 원인과 결과가 있는 지식을 그래프 형태로 표현한 것으로 그래프의 노드(사건)와 노드를 화살표로 연결하여 원인과 결과를 표현하고 그런 사건이 일어날 조건부 확률이 주어진다면 이와 관련된 조건 확률을 비교적 효율적으로 계산할 수 있는 모델이다. 즉 원인과 결과에 따른 지식을 포함하는 문제를 해결할 수 있는 모델 중의 하나이다.

이를 활용하면 스팸 필터, 사고 예방 등에 활용할 수 있다.


J. Pearl UCLA 교수의 유명한 예를 살펴보자.

- bill이 밖에 나가서 일을 할 동안 도둑이 들어올 것을 염려하여 Alarm이 울리는 방범 시스템을 설치했다. 그런데 Bill은 방범 시스템이 작동해도 알람 소리를 듣지 못하므로 이웃에 있는 John과 Mary에게 각각 방범 시스템이 작동하면 자기에게 전화를 부탁하였다.
- 방범 시스템은 Burglary와 Earthquake에 Alarm이 울린다.


총 5개의 확률 변수는 위와 같이 그래프로 나타낼 수 있다.

대충 살펴보면 Burglary가 들었을 확률은 0.001이고, Alarm이 울렸을 때 John이 전화했을 확률은 0.9, 알람이 안울렸는데 John이 전화할 경우는 0.05라고 주어져있다.

고로 다음과 같다.


또한 이를 활용한다면 P(Bur|J∨M) 등의 확률을 구할 수 있다.


2020년 10월 26일 월요일

Source and target differ in block-size. Use -pb to preserve block-sizes during copy

hadoop distcp -update $source $target

하둡 클러스터 2대를 운영하고 있는 상황에서 매일 A클러스터에서 B클러스터로 데이터를 백업을 한다. 현재 하둡은 source target 모두 2.7.1 버전이다.


이때 distcp 명령어를 사용하고 이는 mapreduce로 수행이 되기 때문에 성능이 좋다.
(distcp는 하둡 클러스터간 파일을 복사할 때 유용하다.)


정상적으로 수행되면 아래와 같은 로그를 볼 수 있는데..

INFO tools.DistCp: DistCp job-id: job_1587237267517_10878
INFO mapreduce.Job: Running job: job_1587237267517_10878
INFO mapreduce.Job: Job job_1587237267517_10878 running in uber mode : false
INFO mapreduce.Job:  map 0% reduce 0%
INFO mapreduce.Job:  map 55% reduce 0%
INFO mapreduce.Job:  map 82% reduce 0%
INFO mapreduce.Job:  map 91% reduce 0%
INFO mapreduce.Job:  map 95% reduce 0%
INFO mapreduce.Job:  map 100% reduce 0%
INFO mapreduce.Job: Job job_1587237267517_10878 completed successfully
INFO mapreduce.Job: Counters: 33

갑자기 실패가 났길래 타겟쪽에 용량이 부족한가? 라는 의심이 들었고 로그를 봤다.

HDFS-DISTCP ERROR - Caused by: java.io.IOException: Couldn't run retriable-command: Copying $source to $target
HDFS-DISTCP ERROR - at org.apache.hadoop.tools.util.RetriableCommand.execute(RetriableCommand.java:101)
HDFS-DISTCP ERROR - at org.apache.hadoop.tools.mapred.CopyMapper.copyFileWithRetry(CopyMapper.java:281)

Caused by: java.io.IOException: Check-sum mismatch between $source and $target. Source and target differ in block-size. Use -pb to preserve block-sizes during copy. Alternatively, skip checksum-checks altogether, using -skipCrc. (NOTE: By skipping checksums, one runs the risk of masking data-corruption during file-transfer.)


Check-sum이 mismatch했고 소스와 타겟간에 block-size가 다르다고 한다.
해결책은 그냥 무시하는 옵션을 주면 된다. -skipcrccheck

default block size가 256MB인데 보니까 source쪽이 128MB로 생성되었더라.
그런데 file size는 128이 넘는다..


무슨현상인지 이해는 잘 안가지만 어쨌든 단순히 백업용도이고 파일 복사가 되면 타겟쪽에서는 다시 256MB로 blocksize가 맞춰지는 것으로 확인했고 이대로 넘어가려고 한다.

정리하면 큰 문제가 없으면
hadoop distcp -update -skipcrccheck $source $target 으로 하자!









2020년 8월 31일 월요일

MAB(Multi Armed Bandit) 대안으로서의 Dynamic Ensemble

MAB(Multi Armed Bandit)

A/B test를 보다 더 유연하게 하면서 최적의 return을 찾아가는 방법이다.

A/B test의 경우 A/B 셋을 준비하고 타겟에 따라 A 혹은 B를 리턴을 주면 되기 때문에 구현하기 쉽기는 하지만 분명히 only A,B라는 한계점이 존재한다.

MAB는 이를 조금 더 확장한 개념으로 n개의 셋을 준비해서 보상에 따라 적절한 셋을 return 하는 것이 concept이다.


가령 상품추천을 할 때에 A모델, B모델, C모델, D모델, 혹은 앙상블된 모델을 미리 준비하고 CTR 등의 보상에 따라 적절한 모델을 return 하도록 구현할 수 있다.

모델의 갯수가 충분하고, 모델을 집계하는데 걸리는 시간이 충분하다면 MAB는 좋은 선택지가 될 것이다.

하지만 불가능한 상황이 분명히 존재한다!!


DE(Dynamic Ensemble)

단순히 A,B 두개의 상품추천 모델만이 존재할 수 있다. 이 때는 A와 B를 적절한 비율로 섞어서 사용해야 하는 경우를 가정해볼 수 있다.

과연 A+B 앙상블된 모델 1개만이 존재할 때 MAB를 어떻게 할 수 있을까?


분명히 MAB는 아니다. 하지만 A와 B를 섞는 비율 n:m을 보상을 통해 동적으로 조절한다면 충분히 비슷한 효과를 낼 수 있을 것이다.

물론 말단에서 어떤 모델을 사용할 것인지 고르는 것이 MAB라면 이는 아예 모델을 만들 때 보상을 사용하고 1개의 모델만 생성되기 때문에 MAB라고 부르기엔 무리가 있다.

하지만 적절한 결과를 return 해야하고, 보상에 사용할 수 있는 방법을 똑같이 사용할 수 있기에 비슷하기도 하다!

그래도 용어를 다르게 한다면 아마 적절한 용어로는 Dynamic Ensemble이 될 것이다.


MAB를 하려다가 상황이 안맞을 것 같아서 DE방식으로 방향을 틀어야할 수도 있는데 개인적인 생각이지만 적절한 대안같다.

2020년 8월 18일 화요일

kafka + logstash saving to hdfs (webhdfs)

상품추천 분석을 위한 로그들을 kafka broker에 꾸준히 쏴주고 있었다. 이제 이를 사용할 때가 되었다.

이 로그들을 하둡에 쌓을 필요가 생겼고 컨슈머를 구현할지, logstash의 webhdfs를 사용할지 고민하다가 logstash의 webhdfs output이 간단해서 이걸 사용하기로 하였다.


필요한 플러그인들이 설치되어있다고 가정하고 conf파일을 작성한다.

크게 input, filter, output 부분을 작성하면 된다. 필요한 부분을 검색해서 쓰긴했는데 나중에 시간날 때 제대로 사용법을 익혀두는게 좋을 것 같다. (꽤 편리한 것 같다.)


input {
     kafka {
            bootstrap_servers => "a:9092,b:9092,c:9092"
            topics => ["topic1","topic2"]
            decorate_events => true
            consumer_threads => 3
            group_id => "dev_recommend"
            }
}


모든 토픽의 파티션은 3개로 되어있기때문에 consumer_threads를 3으로 했다.

데이터를 가공하는 filter부분에서 토픽접근은 [@metadata][kafka][topic]로 하면되고 or조건은 or로 쓰면 되었다. c1은 unix time으로 된 컬럼이고 컬럼끼리의 구분자는 \t으로 되어있다면 아래처럼 작성해주면 되었다.


filter {
    if [@metadata][kafka][topic] == "topic1" or [@metadata][kafka][topic] == "topic2" {                                                                            
        dissect {
            mapping => {"message" => "%{col1} %{col2}   %{col3}   %{col4}   %{col5}   %{col6}   %{col7}   %{col8}   %{col9}   %{col10}  %{col11}"}
        }
        ruby {
                code => "
                      require 'time'
                      require 'date'

                      ts_new = event.get('[ts]').to_i/1000

                      event.set('day_ymd', Time.at(ts_new).localtime.strftime('%y-%m-%d'))
                      event.set('day_h', Time.at(ts_new).localtime.strftime('%H'))
                      event.set('day_ymdh', Time.at(ts_new).localtime.strftime('%y-%m-%d-%H'))
                  "
            }
    }

}

마지막으로 ouput 부분은 다음처럼 작성하면 된다.

output {

    if [@metadata][kafka][topic] == "topic1" {
        webhdfs {
            host => "hadoop url"
            port => 50070
            path => "/data/tsv/recommend-logs/%{day_ymd}/%{day_h}/topic1-log-%{day_ymdh}.log"
            user => "usr"
            codec => line { format => "%{message}"}
            }
        #stdout { codec => rubydebug }
    }
    else if [@metadata][kafka][topic] == "topic2" {
        webhdfs {
            host => "hadoop url"
            port => 50070
            path => "/data/tsv/recommend-logs/%{day_ymd}/%{day_h}/topic2-log-%{day_ymdh}.log"
            user => "usr"
            codec => line { format => "%{message}"}

            }
        #stdout { codec => rubydebug }
    }
}

이제 실행해서 화면에 찍어보면 다음과 같은 형식으로 로그가 찍힐것이다.


output file은 계속해서 append를 해야하기때문에 text 포맷이다. 이후 실행하면 yy-mm-dd/시간/파일 경로에 데이터가 잘 쌓이는 것을 확인할 수 있다. 
나중에 제대로 사용법(문법)을 익혀둬야겠다.

일단 작업을 돌려놓고 며칠간 사이즈 모니터링을 해봐야겠다.
nohup bin/logstash --path.data recom_data -f recom.conf > /dev/null 2>&1 &


2020년 8월 7일 금요일

Kafka + Spark Streaming saving to Cassandra

추천 시스템 서빙에서 메인 DB는 Cassandra를 사용하고 있는데 대부분이 data를 bulk insert 후 select만 해서 return 하는 구조이다. 하지만 최근 실시간 (방문)로그를 활용해야할 필요가 생겼고 이를 Cassandra에 적재하기로 하였다. 

만약 cassandra에 쌓는다면 real-time 환경에서 transaction에 문제가 없는지, 하루 쌓이는 양은 얼마 정도인지, compaction 전략은 어떤걸 써야할지, 피크 상황에서 서버가 버틸지 등의 고민을 하게 되었다.

이러한 시도가 성공할지, 실패할지는 먼저 구현을 해봐야 알 수 있기 때문에 일단 먼저 구현부터 하기로 했다. 만약 데이터양이 많은 것이 문제가 된다면 사이즈를 줄이는 방향으로 타협을 보는 것으로 문제를 해결해도 되는 상황이다.


구현 방법은 두 가지로 생각했다.
1. kafka -> logstash -> cassandra
2. kafka -> spark streaming -> cassandra

일단 1번은 logstash plugin이 존재하기는 하는데 4~5년전 자료라서 pass.
2번의 경우 어느 정도의 성능이 나올지 몰라서 궁금하기도 했고 2번으로 결정했다.


정리하면 real time으로 Kafka broker -> spark streaming(consum) -> cassandra 로 데이터를 흘러가는 것이 목표이다.

기존에 운영중인 kafka 데이터를 사용하기로 했다. kafka의 데이터는 map형식으로 들어오고 있다. 

{"a_key":"a_value","b_key":"b_value","c_key":"c_value","d_key":"d_value"}를 담을 class를 하나 만들었다.
case class TEST ( COL1:String, COL2:String, COL3:String, COL4:String )

이후 카프카와 연동을 해줬다.

val sparkConf = new SparkConf()
.setAppName("kapark2cassandra")
.set("spark.cassandra.auth.username", "cassandra")
.set("spark.cassandra.auth.password", "cassandra")
.set("spark.cassandra.connection.host", "cassandra cluster")

val ssc = new StreamingContext(sparkConf, Seconds(3))

val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "zk cluster",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "kapark2cassandra",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("kapark2cassandra")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)

stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

다음은 데이터 가공 및 카산드라에 전송이다.
ttl은 1주일(604800)으로 셋팅했다. 

stream.foreachRDD { rdd =>
val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
import spark.implicits._
val ods = rdd
.map(record => record.value.toString.drop(1).dropRight(1).split(",(?=([^\"]*\"[^\"]*\")*[^\"]*$)", -1)
.map(_.split(":(?=([^\\\"]*\\\"[^\\\"]*\\\")*[^\\\"]*$)")).map(arr => (arr(0) -> arr(1))).toMap)
.map(rec =>
TEST(
rec.get("\"COL1\"").mkString.replace("\"", "").trim
, rec.get("\"COL2\"").mkString.replace("\"", "").trim
, rec.get("\"COL3\"").mkString.replace("\"", "").trim
, rec.get("\"COL4\"").mkString.replace("\"", "").trim
)
).toDF()

ods.createOrReplaceTempView("kapark2cassandra")

var result = spark.sql(
"""
select from_unixtime(col1/1000,"yyyy-MM-dd HH:mm:ss") as col1
, col2
, col3
, col4
from kapark2cassandra
where col2 = some
""")
result.write.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "test" , "keyspace" -> "real_time", "ttl" -> "604800"))
.mode(org.apache.spark.sql.SaveMode.Append).save()
}

ssc.start()
ssc.awaitTermination()
저기서 다른 데이터 소스와 join, group by해서 사용할 수 있으면 그렇게 하려고 하는데 스트리밍이 밀리지 않는 선에서 처리가 가능한 양이어야 할 것이다.
object SparkSessionSingleton {
@transient private var instance: SparkSession = _
def getInstance(sparkConf: SparkConf): SparkSession = {
if (instance == null) {
instance = SparkSession
.builder
.config(sparkConf)
.getOrCreate()
}
instance
}
}
마지막으로 val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf) 부분을 만들기 위한 ojbect 생성해주고 
def main(args: Array[String]) {
new kapark2cassandra().run(args)
}
수행했더니 데이터가 카산드라에 잘 들어간다.

통계를 보도록 하자.
대충 1큐에 5천건~8천건 정도이다.
3초로 했을 때 조금씩 튀는 부분이 있는데 아무래도 3초는 무리인것같다.

cassandra 지표를 봐도 딱히 튀는 부분도 없고 결국 final result는 약 100 rows씩 들어가기 때문에 부하 테스트를 하기에는 작은 양이었다.

결론
real time data를 cassandra에 넣으려면 kafka + spark streaming으로 처리하는 것도 하나의 옵션이다.





2022년 회고

 올해는 블로그 포스팅을 열심히 못했다. 개인적으로 지금까지 경험했던 내용들을 리마인드하자는 마인드로 한해를 보낸 것 같다.  대부분의 시간을 MLOps pipeline 구축하고 대부분을 최적화 하는데 시간을 많이 할애했다. 결국에는 MLops도 데이...