2020년 7월 6일 월요일

Spark to Cassandra SSTable로 bulk insert할 때 throughput 조절하기

최근 Spark에서 Cassandra로 데이터를 부을 때 SSTable을 활용해서 Bulk Insert를 하였다.

분산환경에서 여러 테스트를 거치며 노하우를 공유차 포스팅을 남긴다.

Spark executor들에서 병렬로 Cassandra에 부을 때 주의할 점은 초당 처리율이다.
아무래도 실 운영환경에서 Bulk를 쓴다는 것은 데이터가 10~100GB이상의 데이터일 것이고 이 때 주의할 점은 Network Traffic이다.

카산드라 서버 네트워크 카드의 limit을 고려하며, 즉 인프라 환경을 반드시 고려하면서 분산처리를 해야한다.

필자의 경우에는 각 카산드라 노드가 초당 1Gbps를 받을 수 있고 나름대로 base line을 max 35%인 350Mbps만 사용하기로 정했다. 동시에 2개의 배치를 돌리더라도 70%사용이라면, 유사시에 충분히 대비할 수 있기 때문이다. (30%인 300Mbps를 여분으로 남김.)

따라서 목표는 배치당 350Mbps을 limit으로 잡았고 이때 변수는 파티션갯수와 각 파티션에서의 전송속도이다. 변수를 1개로 줄이기 위해 file size가 1G당 파티션은 1개로 고정하였다. file size가 커질 수록 partition이 너무 많이 생길까봐 개인적으로 1~2를 추천한다.

즉 10G파일을 insert할 때에는 파티션이 10개가 되어 10개가 병렬로 insert가 된다.
(파티션갯수는 repartition으로 조절하며 전송속도는 mapreduce.output.bulkoutputformat.streamthrottlembits 옵션으로 정할 수 있다.)


다음은 예시 코드이다.
val fs: FileSystem = FileSystem.get(sc.hadoopConfiguration)
val file_size = fs.getContentSummary(new Path(outputFilePath)).getLength
var partition_num = (file_size/1024/1024/1024) //1기가당 파티션2
if ( partition_num == 0 ) { partition_num=1 }
var streamthrottlembits = (350 / partition_num) + ""
if ( streamthrottlembits == "0" ) { streamthrottlembits="1" }
file size로 partition 갯수를 정하고 streamthrottlembits를 정했다. 나뉜 파티션으로 foreachPartition을 돌릴 것이다.

try {
val conf = new Configuration()
val cassHosts = "ip1,ip2,ip3,ip4,ip5"
val shuffledCassHosts=Random.shuffle(cassHosts.split(",").toList)
val selectedCassHost=shuffledCassHosts.head
conf.set("cassandra.output.thrift.address", selectedCassHost)
conf.set("mapreduce.output.bulkoutputformat.streamthrottlembits", streamthrottlembits)
new SSTableLoader(new File(dir), new ExternalClient(conf), new OutputHandler.LogOutput).stream().get()
} finally {
var deleteFolderList = dir_file.listFiles();
for ( i <- 0 to deleteFolderList.length -1 ) {
deleteFolderList(i).delete();
}
dir_file.delete();
}
이후 sstable을 만들고 전송을 할 때 위에서 구한 streamthrottlembits값으로 속도를 조절하였다.

또한 코드상으로는 카산드라 클러스터 ip들 중에서 랜덤으로 1개를 골라서 insert를 하였는데  이는 인프라마다 다르겠지만 어쨌든 한쪽 노드로 traffic이 몰리지 않게하기 위함이다.

사실 더 좋은 구성은 카산드라를 1개의 스위치에 모두 물리지 않고 2개 이상의 스위치로 분리해서 구성하였다면 더욱 안전한 아키텍처가 될 것이다.



network traffic이 원하는데로 max 35%만 사용하고 있다.


cpu 상태도 안정적이다.






댓글 없음:

댓글 쓰기

2022년 회고

 올해는 블로그 포스팅을 열심히 못했다. 개인적으로 지금까지 경험했던 내용들을 리마인드하자는 마인드로 한해를 보낸 것 같다.  대부분의 시간을 MLOps pipeline 구축하고 대부분을 최적화 하는데 시간을 많이 할애했다. 결국에는 MLops도 데이...