2020년 7월 26일 일요일

Cassandra can not partition switch

Oracle, MSSQL 등 RDBMS에서는 PARTITION SWITCH를 통해 순간적인 데이터 바꿔끼우기가 가능하다. 이는 배치를 개발할 때 매우 유용하게 사용할 수 있다. 

현재 유효 파티션을 저장한 테이블을 두고 이 테이블 조회를 한번 거치고 본 테이블을 조회하는 방식으로 운영할 수 있다. 가령 PARTITION이 10, 20으로 나뉘어져 있다면 현재 유효 파티션 번호는 10을 기록하고 있는다. 이후 다음 배치에서는 파티션 번호 20으로 데이터를 적재한 후 PARTITION SWITCH를 하고 유효 파티션을 20으로 UPDATE하는 방식이다. 
(ex: ALTER TABLE… EXCHANGE PARTITION)

하지만 카산드라에서는 이러한 기능이 없다. 따라서 동일한 스키마 테이블 2벌을 생성하고 핑퐁하는 방식으로 구현해야한다. (모든 테이블이 2벌씩 나오므로 비효율적일 수 있는데 현재로서는 사용하기 쉽기도 하고 대체할 방법이 딱히 없어보인다.)

먼저 본 테이블을 조회하기전에 조회해야할 유효테이블마스터 테이블이다.


그리고 본 테이블인 table_A, table_B, table_C을 각각 두벌씩(_10, _20) 생성하자.
table_A_10, table_A_20, table_B_10, table_B_20, table_C_10, table_C_20


table_A 테이블을 조회하기 위해 eff_tbl_mst 테이블을 조회해서 유효코드값 10을 알아내고 이후 table_A_10을 조회하는 방식이다. 이렇게 하면 데이터 적재 배치후에 eff_tbl_mst 테이블에서 div 컬럼 값을 10<->20으로 update해주면 된다. 이후 유효하지 않은 테이블은 truncate.

현재 많이 사용하는 방법이고 쉬우면서도 운영 중에 데이터 전체를 갈아끼우기 편해서 좋다.
하지만 단점도 존재한다. 매번 동일한 테이블이 생성되고 db입장에서는 table을 조회하기 위해서 2번의 쿼리를 수행해야한다는 단점이 있다.

2020년 7월 22일 수요일

Cassandra SSTable에 의한 Disk I/O 영향

최근 Spark에서 Cassandra에 데이터를 적재할 때 SSTable Write 형식으로 전환했다.

변환 후 Spark excutor에서 카산드라 노드에 데이터를 전송할 때 네트워크 트래픽 문제가 발생하여 이를 대역폭의 35%만 사용하도록 동적 세팅을 해놓았었다.

이후 대역폭을 1G에서 10G로 인프라를 업그레이드했지만 이번엔 다른 문제가 발생했다.

문제
전송하고자 하는 데이터 size는 50G, SSTable 갯수는 약 3천개이다.
(실시간으로 read, write가 이루어지는)운영중인 카산드라에 SSTable을 복사하고 해당 테이블을 바라보게 하였다.
insert 직후라서 key cache가 없고 sstable은 약 3천개이며 동시에 compaction도 진행이 되었다.

결과적으로 트래픽이 몰리면서 read request가 꾸준히 올라가면서 disk I/O에 부하를 주는 문제가 되었다. 장애가 난 것이다.

당시의 그래프이다.

sstable이 점차 줄어들고 있는 것은 compaction 때문이다.


다음 그래프를 보면 약 19시부터 disk i/o가 full 찬 것을 확인할 수 있으며 22시에 1/3토막이 난 것은 was단에서 read request를 강제적으로 1/3으로 줄였기 때문이다.


그랬더니 disk i/o가 정상적으로 돌아왔으며 두개의 그래프로 추정해보건데 compaction보다도 read에서 더 큰 부하를 많이 준 것으로 판단된다. (물론 read와 compaction 모두 경합이 일어나서 복합적인 문제가 발생한 것이기도 하다.)

정리하면 2가지의 원인은 다음과 같다.
1. 트래픽이 점차 몰리면서 카산드라 read 발생수가 많아졌고 이때 데이터를 조회하기 위해 대량의 sstable을 읽어야하기 때문에 disk i/o 부하를 줌.
2. 데이터 적재 후 약 대량의 sstable이 생성되었고 카산드라 자체적으로 compaction이 돌면서 disk i/o에 부하를 줌.

문제를 해결하기 위해 SSTable 갯수를 줄이는 것을 먼저 시도하였고 withBufferSizeInMB로 조절이 가능했다. 하지만 이 옵션은 oom이 발생할 수 있어 꾸준히 모니터링을 해야하지만 default 128에서 현재 256으로 올린 상태에서는 안정적인 disk io를 유지했다.
점차 테스트해보면서 이를 더 올릴 생각이다.

추후에 더 문제가 발생하면 table을 나누던지, compaction strategy를 바꿔봐야하겠다. 시계열 데이터가 아니지만 LCS가 도움이 된다는 글이 있기에 시도해봐야겠다.

아무튼 정리하면
운영중인 카산드라에 bulk load를 할 때 disk I/O도 고려해야한다는 것이다.



2020년 7월 6일 월요일

Spark to Cassandra SSTable로 bulk insert할 때 throughput 조절하기

최근 Spark에서 Cassandra로 데이터를 부을 때 SSTable을 활용해서 Bulk Insert를 하였다.

분산환경에서 여러 테스트를 거치며 노하우를 공유차 포스팅을 남긴다.

Spark executor들에서 병렬로 Cassandra에 부을 때 주의할 점은 초당 처리율이다.
아무래도 실 운영환경에서 Bulk를 쓴다는 것은 데이터가 10~100GB이상의 데이터일 것이고 이 때 주의할 점은 Network Traffic이다.

카산드라 서버 네트워크 카드의 limit을 고려하며, 즉 인프라 환경을 반드시 고려하면서 분산처리를 해야한다.

필자의 경우에는 각 카산드라 노드가 초당 1Gbps를 받을 수 있고 나름대로 base line을 max 35%인 350Mbps만 사용하기로 정했다. 동시에 2개의 배치를 돌리더라도 70%사용이라면, 유사시에 충분히 대비할 수 있기 때문이다. (30%인 300Mbps를 여분으로 남김.)

따라서 목표는 배치당 350Mbps을 limit으로 잡았고 이때 변수는 파티션갯수와 각 파티션에서의 전송속도이다. 변수를 1개로 줄이기 위해 file size가 1G당 파티션은 1개로 고정하였다. file size가 커질 수록 partition이 너무 많이 생길까봐 개인적으로 1~2를 추천한다.

즉 10G파일을 insert할 때에는 파티션이 10개가 되어 10개가 병렬로 insert가 된다.
(파티션갯수는 repartition으로 조절하며 전송속도는 mapreduce.output.bulkoutputformat.streamthrottlembits 옵션으로 정할 수 있다.)


다음은 예시 코드이다.
val fs: FileSystem = FileSystem.get(sc.hadoopConfiguration)
val file_size = fs.getContentSummary(new Path(outputFilePath)).getLength
var partition_num = (file_size/1024/1024/1024) //1기가당 파티션2
if ( partition_num == 0 ) { partition_num=1 }
var streamthrottlembits = (350 / partition_num) + ""
if ( streamthrottlembits == "0" ) { streamthrottlembits="1" }
file size로 partition 갯수를 정하고 streamthrottlembits를 정했다. 나뉜 파티션으로 foreachPartition을 돌릴 것이다.

try {
val conf = new Configuration()
val cassHosts = "ip1,ip2,ip3,ip4,ip5"
val shuffledCassHosts=Random.shuffle(cassHosts.split(",").toList)
val selectedCassHost=shuffledCassHosts.head
conf.set("cassandra.output.thrift.address", selectedCassHost)
conf.set("mapreduce.output.bulkoutputformat.streamthrottlembits", streamthrottlembits)
new SSTableLoader(new File(dir), new ExternalClient(conf), new OutputHandler.LogOutput).stream().get()
} finally {
var deleteFolderList = dir_file.listFiles();
for ( i <- 0 to deleteFolderList.length -1 ) {
deleteFolderList(i).delete();
}
dir_file.delete();
}
이후 sstable을 만들고 전송을 할 때 위에서 구한 streamthrottlembits값으로 속도를 조절하였다.

또한 코드상으로는 카산드라 클러스터 ip들 중에서 랜덤으로 1개를 골라서 insert를 하였는데  이는 인프라마다 다르겠지만 어쨌든 한쪽 노드로 traffic이 몰리지 않게하기 위함이다.

사실 더 좋은 구성은 카산드라를 1개의 스위치에 모두 물리지 않고 2개 이상의 스위치로 분리해서 구성하였다면 더욱 안전한 아키텍처가 될 것이다.



network traffic이 원하는데로 max 35%만 사용하고 있다.


cpu 상태도 안정적이다.






2022년 회고

 올해는 블로그 포스팅을 열심히 못했다. 개인적으로 지금까지 경험했던 내용들을 리마인드하자는 마인드로 한해를 보낸 것 같다.  대부분의 시간을 MLOps pipeline 구축하고 대부분을 최적화 하는데 시간을 많이 할애했다. 결국에는 MLops도 데이...