일단 생각하는 것은 kafka broker는 3대를 클러스터링하고 컨슈머,프로듀서를 각각 세팅해놓으려고 한다.
먼저 centos7 가상환경을 3대를 준비했다. (나중에 프로듀서, 컨슈머로 사용하려고 2대 더 준비했다. 총5대 )
/etc/hosts에 kafka 클러스터로 쓸 host 3대를 추가한다.
192.168.20.130 kafka-srv01
192.168.20.131 kafka-srv02
192.168.20.128 kafka-srv03
먼저 java를 설치한다. (zulu 1.8)
kafka cluster는 zookeeper가 관리하기 때문에 zookeeper와 함께 설치되어야한다.
현재 나온 카프카 최신버전은 2.5.0 버전이고 카프카에 포함되어있는 주키퍼가 있지만 주키퍼를 별도로 설치했다. (주키퍼 3.5.8)
디렉토리 구조는 다음처럼 가져갔다.
먼저 주키퍼 부터 세팅하자.
먼저 디렉토리를 만들었다.
mkdir -p /data01/tmp/zookeeper
각 노드에서 id를 세팅해주었다.
echo 1 > /data01/tmp/zookeeper/myid
echo 2 > /data01/tmp/zookeeper/myid
echo 3 > /data01/tmp/zookeeper/myid
다음으로 /data01/sw/zookeeper/conf에서 zoo_sample.cfg를 복사해서 zoo.cfg를 만들었다.
아래처럼 수정한다.
dataDir=/data01/tmp/zookeeper
(생략)
initLimit=5
syncLimit=2
server.1=kafka-srv01:2888:3888
server.2=kafka-srv02:2888:3888
server.3=kafka-srv03:2888:3888
server.myid=호스트:통신용포트1:통신용포트2만 작성해주면 되서 쉽다.
myid는 주키퍼 클러스터에서 각 서버에 부여하는 고유한 서버 번호이다. 따라서 포스팅 내용처럼 tmp 디렉토리가 아닌 다른 디렉토리를 만들어서 보관하자.
다음은 카프카 설정파일을 수정한다. (vi server.properties)
mkdir -p /data01/tmp/kafka-logs
log.dirs=/data01/tmp/kafka-logs
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://kafka-srv01:9092
zookeeper.connect=kafka-srv01:2181,kafka-srv02:2181,kafka-srv03:2181
broker.id=2
log.dirs=/data01/tmp/kafka-logs
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://kafka-srv02:9092
log.dirs=/data01/tmp/kafka-logs
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://kafka-srv02:9092
zookeeper.connect=kafka-srv01:2181,kafka-srv02:2181,kafka-srv03:2181
broker.id=3
log.dirs=/data01/tmp/kafka-logs
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://kafka-srv03:9092
log.dirs=/data01/tmp/kafka-logs
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://kafka-srv03:9092
zookeeper.connect=kafka-srv01:2181,kafka-srv02:2181,kafka-srv03:2181
브로커id는 안주면 카프카가 자동으로 세팅해주고 굳이 주키퍼랑 맞출 필요도 없긴 하지만 이쁘게 통일하기 위해 주키퍼-카프카 id를 맞췄다.
그리고 zookeeper.connect는 브로커가 주키퍼에 접속할 때의 접속정보다.
설정은 끝났다.
이제 구동해보자. 주키퍼를 먼저 띄우고 그다음 카프카(브로커)를 띄워야한다. 각 노드의 순서는 상관 없다. 반대로 서비스를 내릴 때에는 카프카(브로커)를 내린 후에 주키퍼를 내린다.
주키퍼 실행
zkServer.sh start
카프카 실행
bin/kafka-server-start.sh -daemon config/server.properties
카프카 서버 로그를 보니 잘 올라온 것을 확인할 수 있었다.
이제 토픽을 만들고 테스트를 해보자.
test-topic을 만들었다. 파티션은 3을 주고 레플리카도 3을 줬다.
test-topic을 만들었다. 파티션은 3을 주고 레플리카도 3을 줬다.
./kafka-topics.sh --zookeeper kafka-srv01:2181,kafka-srv02:2181,kafka-srv03:2181 --create --topic test-topic --partitions 3 --replication-factor 3
잘 만들어졌는지 확인해본다.
./kafka-topics.sh --zookeeper kafka-srv01:2181,kafka-srv02:2181,kafka-srv03:2181 --describe --topic test-topic
describe만 살펴보자.
Leader는 각 파티션의 현재 Leader 복제본이 어떤 브로커에 있는지 알려준다. Replicas는 각 파티션의 복제본을 보유하고 있는 브로커의 리스트이다.
Isr은 In-Sync Replicas의 약자로 복제본 중에서 Leader Replica랑 동기화가 되고 있는 복제본을 소유하고 있는 브로커의 리스트이다. 브로커가 장애가 나거나 동기화가 안됐을 때 Isr에 포함되지 않는다.
테스트 토픽에 잘 던지고 받는지 console로 확인해보자.
나중에 이것저것 더 테스트해보려고 별도로 두대의 서버를 만들고 카프카를 설치했다.
각각 서버에서 프로듀서와 컨슈머를 띄웠다.
./kafka-console-producer.sh --broker-list kafka-srv01:9092,kafka-srv02:9092,kafka-srv03:9092 --topic test-topic
./kafka-console-consumer.sh --bootstrap-server kafka-srv01:9092,kafka-srv02:9092,kafka-srv03:9092 --topic test-topic
잘 만들어졌는지 확인해본다.
./kafka-topics.sh --zookeeper kafka-srv01:2181,kafka-srv02:2181,kafka-srv03:2181 --describe --topic test-topic
describe만 살펴보자.
Isr은 In-Sync Replicas의 약자로 복제본 중에서 Leader Replica랑 동기화가 되고 있는 복제본을 소유하고 있는 브로커의 리스트이다. 브로커가 장애가 나거나 동기화가 안됐을 때 Isr에 포함되지 않는다.
테스트 토픽에 잘 던지고 받는지 console로 확인해보자.
나중에 이것저것 더 테스트해보려고 별도로 두대의 서버를 만들고 카프카를 설치했다.
각각 서버에서 프로듀서와 컨슈머를 띄웠다.
./kafka-console-producer.sh --broker-list kafka-srv01:9092,kafka-srv02:9092,kafka-srv03:9092 --topic test-topic
./kafka-console-consumer.sh --bootstrap-server kafka-srv01:9092,kafka-srv02:9092,kafka-srv03:9092 --topic test-topic
프로듀서에서 hello world, park, su, seong, parksuseong을 던지면 컨슈머에서 잘 받는 것을 확인할 수 있다.
kafka-producer
kafka-consumer
사실 카프카 혼자서는 서비스를 하기엔 무리이고 카프카를 이용한 생태계를 구축해야하는데 시간이 날지 모르겠다. 그래도 추후 fluentd나 logstash, elk, spark streaming 정도까지는 포스팅해보고싶은데 일단 노력해봐야지.. 집 컴퓨터라서 혼자서 세팅하는데 엄청난 인내심이 필요하기 때문이다.