2020년 5월 26일 화요일

Apache Kafka Cluster 설치 및 테스트

블로그 포스팅하려고 집 컴퓨터에 카프카를 세팅하려고했는데 확실히 집에서 하려고 하니 제약사항이 많았다. 그래도 이왕 해보기로 한거 끝까지 해본다.

일단 생각하는 것은 kafka broker는 3대를 클러스터링하고 컨슈머,프로듀서를 각각 세팅해놓으려고 한다.

먼저 centos7 가상환경을 3대를 준비했다. (나중에 프로듀서, 컨슈머로 사용하려고 2대 더 준비했다. 총5대 )

/etc/hosts에 kafka 클러스터로 쓸 host 3대를 추가한다.

192.168.20.130 kafka-srv01
192.168.20.131 kafka-srv02
192.168.20.128 kafka-srv03

먼저 java를 설치한다. (zulu 1.8)



kafka cluster는 zookeeper가 관리하기 때문에 zookeeper와 함께 설치되어야한다.
현재 나온 카프카 최신버전은 2.5.0 버전이고 카프카에 포함되어있는 주키퍼가 있지만 주키퍼를 별도로 설치했다. (주키퍼 3.5.8)

디렉토리 구조는 다음처럼 가져갔다.



먼저 주키퍼 부터 세팅하자.

먼저 디렉토리를 만들었다.
mkdir -p /data01/tmp/zookeeper

각 노드에서 id를 세팅해주었다.

echo 1 > /data01/tmp/zookeeper/myid
echo 2 > /data01/tmp/zookeeper/myid
echo 3 > /data01/tmp/zookeeper/myid

다음으로 /data01/sw/zookeeper/conf에서 zoo_sample.cfg를 복사해서 zoo.cfg를 만들었다.

아래처럼 수정한다.

dataDir=/data01/tmp/zookeeper
(생략)
initLimit=5
syncLimit=2

server.1=kafka-srv01:2888:3888
server.2=kafka-srv02:2888:3888
server.3=kafka-srv03:2888:3888


server.myid=호스트:통신용포트1:통신용포트2만 작성해주면 되서 쉽다.
myid는 주키퍼 클러스터에서 각 서버에 부여하는 고유한 서버 번호이다. 따라서 포스팅 내용처럼 tmp 디렉토리가 아닌 다른 디렉토리를 만들어서 보관하자.


다음은 카프카 설정파일을 수정한다. (vi server.properties)

mkdir -p /data01/tmp/kafka-logs

broker.id=1
log.dirs=/data01/tmp/kafka-logs
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://kafka-srv01:9092
zookeeper.connect=kafka-srv01:2181,kafka-srv02:2181,kafka-srv03:2181

broker.id=2
log.dirs=/data01/tmp/kafka-logs
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://kafka-srv02:9092
zookeeper.connect=kafka-srv01:2181,kafka-srv02:2181,kafka-srv03:2181

broker.id=3
log.dirs=/data01/tmp/kafka-logs
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://kafka-srv03:9092
zookeeper.connect=kafka-srv01:2181,kafka-srv02:2181,kafka-srv03:2181


브로커id는 안주면 카프카가 자동으로 세팅해주고 굳이 주키퍼랑 맞출 필요도 없긴 하지만 이쁘게 통일하기 위해 주키퍼-카프카 id를 맞췄다.
그리고 zookeeper.connect는 브로커가 주키퍼에 접속할 때의 접속정보다.


설정은 끝났다.

이제 구동해보자. 주키퍼를 먼저 띄우고 그다음 카프카(브로커)를 띄워야한다. 각 노드의 순서는 상관 없다. 반대로 서비스를 내릴 때에는 카프카(브로커)를 내린 후에 주키퍼를 내린다.

주키퍼 실행
zkServer.sh start
카프카 실행
bin/kafka-server-start.sh -daemon config/server.properties

카프카 서버 로그를 보니 잘 올라온 것을 확인할 수 있었다.



이제 토픽을 만들고 테스트를 해보자.

test-topic을 만들었다. 파티션은 3을 주고 레플리카도 3을 줬다.
./kafka-topics.sh --zookeeper kafka-srv01:2181,kafka-srv02:2181,kafka-srv03:2181 --create --topic test-topic --partitions 3 --replication-factor 3

잘 만들어졌는지 확인해본다.
./kafka-topics.sh --zookeeper kafka-srv01:2181,kafka-srv02:2181,kafka-srv03:2181 --describe --topic test-topic



describe만 살펴보자.


Leader는 각 파티션의 현재 Leader 복제본이 어떤 브로커에 있는지 알려준다. Replicas는 각 파티션의 복제본을 보유하고 있는 브로커의 리스트이다.

Isr은 In-Sync Replicas의 약자로 복제본 중에서 Leader Replica랑 동기화가 되고 있는 복제본을 소유하고 있는 브로커의 리스트이다. 브로커가 장애가 나거나 동기화가 안됐을 때 Isr에 포함되지 않는다.

테스트 토픽에 잘 던지고 받는지 console로 확인해보자.
나중에 이것저것 더 테스트해보려고 별도로 두대의 서버를 만들고 카프카를 설치했다.

각각 서버에서 프로듀서와 컨슈머를 띄웠다.
./kafka-console-producer.sh --broker-list kafka-srv01:9092,kafka-srv02:9092,kafka-srv03:9092 --topic test-topic
./kafka-console-consumer.sh --bootstrap-server kafka-srv01:9092,kafka-srv02:9092,kafka-srv03:9092 --topic test-topic


프로듀서에서 hello world, park, su, seong, parksuseong을 던지면 컨슈머에서 잘 받는 것을 확인할 수 있다.

kafka-producer


kafka-consumer



사실 카프카 혼자서는 서비스를 하기엔 무리이고 카프카를 이용한 생태계를 구축해야하는데 시간이 날지 모르겠다. 그래도 추후 fluentd나 logstash, elk, spark streaming 정도까지는 포스팅해보고싶은데 일단 노력해봐야지.. 집 컴퓨터라서 혼자서 세팅하는데 엄청난 인내심이 필요하기 때문이다.

2020년 5월 20일 수요일

IR - Inverted Index(역색인) 컨셉

정보검색(IR)에서 다루는 내용중에 하나인 역색인은 검색엔진이나 혹은 데이터베이스에서 많이 언급된다.

단어(term)가 등장하는 문서(Document)를 찾기 위해 Inverted Index가 고안되었다.

예를 들어 SQL Server에서는 다른 DB와 마찬가지로 컬럼에 적절한 인덱스를 생성하는 경우는 많이 봤을 것이다. 하지만 컬럼이 BLOB Type으로 정의되고 여기에는 책의 내용, 신문 기사 내용, 노래 가사같이 Document가 담겨 있을 수 있다. 그럼 이 컬럼에서는 어떤식으로 내용을 빠르게 찾을 수 있을까?

전에 포스팅한 내용 참고하면 좋을 것 같다. (SQL Server Full-Text Search)
https://parksuseong.blogspot.com/search?q=Full+Text+Search

그럼 색인(Index)와 역색인(Inverted Index)의 차이는 무엇인가?

Inverted Index란 Term의 집합을 dictionary, Term이 등장하는 문서들의 집합을 posting이라고 하면 Dictionary와 posting을 Vector화 하면 찾고자 하는 단어가 어떤 문서에서 등장하는지 쉽게 찾고자 고안되었다. 쉽게 생각하면 목차(색인)와 비슷하지만 동작하는 방식이 조금 다를 뿐이다. 책의 내용이 끝나고 맨 뒤에 단어들이 나와있고 이 단어가 몇 페이지에 나오는지를 알려주는 것을 볼 수 있는데 이것이 Inverted Index(역색인)이다. 즉 Term을 가지고 Document를 찾는 것이다.

Inverted Index를 만드는 방법은 다음과 같다.

1. 문서에서 등장하는 단어(Term, 혹은 token)를 구분해서 리스트를 만든다. (tokenization)
어떠한 Term이 어떤 Doc에 나오는지 찾는 과정이다.

2. 검색 시 무의미한 단어들은 제거하고(조사, 관계사 등) 대소문자, 과거형, 복수형 등을 일반화(Nomalization)시켜주는 과정을 거친다. Stop words, Equivalence class, Lemmatization, Stemming 등을 사용할 수 있다. 언어라는게 근원적으로 룰이니까 뭔가 룰베이스적인 처리가 필요하겠다.

3. Posting을 만든다.(Term, DocID)


4. posting을 sorting한다.

sorting 결과 중복되기도 하고, docID만 다른 것이 존재하는 Term들이 보인다.

5. term의 frequency를 계산하고 Posting을 list로 만든다.
(BRUTUS -> [1,2,4,11] 이런식으로 BRUTUS는 doc1, doc2 doc4 doc11에 등장)


6. 5번까지 작업하면 term List가 dictionary가 되고(dictionary 파일에 저장), document list가 posting list가 된다(이것도 posting 파일에 저장). 그럼 dictionary와 posting이 벡터라고 치면 dictionary를 통해 document를 쉽게 검색할 수 있게 된다.





기본적인 컨셉(?)만 간단하게 정리했다.

2020년 5월 8일 금요일

Scala parallel collection foreach return different results

스칼라에서 par.foreach를 썼더니 매번 같은 결과가 나오지 않는다는 것을 알게 되었다.

상황
스칼라를 사용해서 api호출 결과를 통으로 받고 그 결과값들을 하나씩 loop 돌면서 where절 조건에 넣고 group by 집계를 해야하는 이슈가 있었다. 
api 호출 결과가 갈수록 많아지고 집계 데이터의 양이 커지면서 foreach 반복문의 성능이 기하급수적으로 떨어졌다. (대략 2시간가까이 걸렸다.)

조치
처음 조치한 방법은 반복문 조건에 들어갈 내용을 map으로 펼친 후 par.foreach를 돌렸더니 약 30-40분으로 확실히 줄어들었다. 데이터 샘플 몇개만 확인했을 때 이상없어보였다.

문제
하지만 이후 다시 확인해보니 결과가 계속해서 예상치 못한 내용이었고 결론적으로는 이상했다.

원인
찾아보니 병렬 콜렉션을 사용할 경우 스레드들이 atomic하게 처리하지 않는다. 결과적으로 원복했고 이런 실수를 하면 안된다.

예를 들어보자.
var sum =0
(1 to 1000).par.foreach(row => sum+=row)
print(sum)

실행할 때마다 sum의 값이 계속해서 바뀐다.


다음 그림 출처는 Scala for Data Science by Pascal Bugnion 이다.



개인적으로 병렬 콜렉션이 조금은 실망스러운데 저렇게 결과를 내면 어디에 적용이 가능한지 당장 생각나지 않는다. 꼼수를 쓰면 해결가능할텐데 그것도 아닌 것 같다.




2022년 회고

 올해는 블로그 포스팅을 열심히 못했다. 개인적으로 지금까지 경험했던 내용들을 리마인드하자는 마인드로 한해를 보낸 것 같다.  대부분의 시간을 MLOps pipeline 구축하고 대부분을 최적화 하는데 시간을 많이 할애했다. 결국에는 MLops도 데이...