2020년 6월 28일 일요일

Spark to Cassandra - memtable에서 sstable write로 전환기 (cassandra bulk load)

스파크를 활용해서 카산드라에 매일 100억이상의 row, 250G 이상의 데이터를 붓고 있었다.
붓는 방식은 memory write, 즉 memtable에 insert를 하는 방식으로 사용하고 있었다. 하지만 이러한 방법은 문제가 되었다.


상황
초기에는 데이터 양이 많지 않았고, 순간적인 부하만 견디면 됐기 때문에 이 정도면 새벽시간대에 충분히 커버가 가능하다고 생각했다. 하지만 데이터셋이 많아지고 갈수록 커지는 데이터 사이즈에 따라 새벽시간대에 cpu가 100%까지 치는 일이 많아졌다.
이는 카산드라 입장에서 read, write에 영향을 주고 결국 pending threads와 db connection timeout 수치를 기하급수적으로 늘어 장애가 발생할 위험이 존재했다.
물론 클러스터 구조에 따른 어느 정도 내구성은 보장했지만 노드가 순차적으로 돌아가며 부하를 받는 것을 보고있자니 걱정이 되었다.

Spark에서 cassandra로 데이터를 넣을 때 throuput을 조정하긴 했지만 이는 근본적인 해결책이 되지 못했다.


Before Graph

write / sec

cpu usage

network traffic

특정 시점에 conection timeout & pending threads



개선 계획
memtable에 direct로 붓지 말고 sstable에 데이터를 붓는 방식으로 변경한다. 이는 disk에 쓰는 방식이다. spark에서 병렬로 write를 한다면 충분한 성능이 나올 것이다. disk I/O와 network 성능에 맞춰서 적당한 throuput을 찾는다.

여러 깃소스, 블로그 레퍼런스를 통해 대부분이 spark to cassandra에서 비슷한 문제를 겪었고 대부분이 동일한 방법으로 해결한 것 같다. 소스를 참고해서 우리에 맞는 방식으로 튜닝을 시도했다.

github Arunkumar&joswlv 등 소스를 참고해서 적용 전에 계속해서 테스트를 했다.

다음은 예제 소스이다. (라이브러리화 한 소스는 github에 올려두었다.)

현재 mesos로 resource를 관리하고 있으며 데이터 size에 따라 partition 갯수와 excutor 갯수를 조절해가며 적절한 구간을 찾아야한다. 참고 자료에서는 hdfs 블록 사이즈인 256M를 활용하는 것 같은데 아마 카산드라의 옵션 중에 sstable 관련해서 변경하지 않는다면 비슷하게 적용할 것 같다.


sstable을 작성할 directory를 랜덤하게 만들고, create와 insert문을 준비, 각각의 partition(excutor)에서 sstable을 parallel create&load, 마지막으로 작업했던 directory를 지우는 방식이다.

"mapreduce.output.bulkoutputformat.streamthrottlembits"를 1000으로 했는데 이는 적용하는 인프라에서의 네트워크 상태를 고려해서 설정해야한다.

1차적으로 모든 소스를 이렇게 바꾼 후의 모습이다.

After Graph

write / sec

cpu usage

network traffic

특정 시점에 conection timeout & pending threads



결과가 아주 다이나믹하고 아름답게 변했다. 

1. memtable에 direct로 write를 할 때에는 write/sec가 60k 수준으로 올라갔지만 이는 cpu가 튀는 원인이 되었다. 하지만 sstable write를 할 경우에는 cpu가 튀지 않았고 오히려 더 빠르게 insert가 되었다. 
2. cpu 사용량은 compaction에도 영향을 주는데 이는 결국 실제 운영상황에서의 response from cassandra에 영향을 준다.
3. cpu가 안정화되었으니 network traffic을 더 dynamic하게 사용할 수 있다. 현재 포스팅에서는 속도를 더 높여 처리율을 올릴 수 있다.
4. cpu에 따른 connection timeout과 pending threads가 안정적으로 변했다.
5. 카산드라의 disk는 ssd로 되어있는데 이는 sstable write를 사용할 때 성능을 극대화할 수 있었다.
6. 포스팅에는 없지만 단순히 spark to cassandra 배치 중에 memtable에 insert하던 배치가 2시간정도 걸렸었는데 sstable insert로 바꾼 뒤 약 15분으로 단축이 되었다. (2h -> 15m)

주의할점
만약 대량의 데이터를 sstable write를 할 경우에는 네트워크 트래픽을 다 잡아먹을 수 있다. 이는 결국 카산드라의 response에도 영향을 준다. 따라서 사용자가 적은 시간대를 찾거나(새벽), 처리율을 조절하며 적절한 구간을 찾아야한다. 
sstable insert는 수 많은 sstable이 생성되고 이는 쌓이면 쌓일수록 compaction에 부담이 된다. 따라서 적절한 compaction 전략을 구상하여 적용하도록 한다.
partition 갯수에 따른 병렬처리에 따라 성능을 좌우한다. 적절한 partition 갯수를 조절하며 병령처리 성능을 올려야한다. 이는 테스트 중이고 추후 성능테스트 포스팅을 할 예정이다.

distribution 환경에서의 insert 성능 테스트 포스팅을 할 것이며, sstable을 통한 bulk insert 소스는 github에 정리해서 올려놓도록 하겠다.






2022년 회고

 올해는 블로그 포스팅을 열심히 못했다. 개인적으로 지금까지 경험했던 내용들을 리마인드하자는 마인드로 한해를 보낸 것 같다.  대부분의 시간을 MLOps pipeline 구축하고 대부분을 최적화 하는데 시간을 많이 할애했다. 결국에는 MLops도 데이...