최근 Spark에서 Cassandra로 데이터를 부을 때 SSTable을 활용해서 Bulk Insert를 하였다.
분산환경에서 여러 테스트를 거치며 노하우를 공유차 포스팅을 남긴다.
Spark executor들에서 병렬로 Cassandra에 부을 때 주의할 점은 초당 처리율이다.
아무래도 실 운영환경에서 Bulk를 쓴다는 것은 데이터가 10~100GB이상의 데이터일 것이고 이 때 주의할 점은 Network Traffic이다.
카산드라 서버 네트워크 카드의 limit을 고려하며, 즉 인프라 환경을 반드시 고려하면서 분산처리를 해야한다.
필자의 경우에는 각 카산드라 노드가 초당 1Gbps를 받을 수 있고 나름대로 base line을 max 35%인 350Mbps만 사용하기로 정했다. 동시에 2개의 배치를 돌리더라도 70%사용이라면, 유사시에 충분히 대비할 수 있기 때문이다. (30%인 300Mbps를 여분으로 남김.)
따라서 목표는 배치당 350Mbps을 limit으로 잡았고 이때 변수는 파티션갯수와 각 파티션에서의 전송속도이다. 변수를 1개로 줄이기 위해 file size가 1G당 파티션은 1개로 고정하였다. file size가 커질 수록 partition이 너무 많이 생길까봐 개인적으로 1~2를 추천한다.
즉 10G파일을 insert할 때에는 파티션이 10개가 되어 10개가 병렬로 insert가 된다.
(파티션갯수는 repartition으로 조절하며 전송속도는 mapreduce.output.bulkoutputformat.streamthrottlembits 옵션으로 정할 수 있다.)
다음은 예시 코드이다.
val fs: FileSystem = FileSystem.get(sc.hadoopConfiguration)
val file_size = fs.getContentSummary(new Path(outputFilePath)).getLength
var partition_num = (file_size/1024/1024/1024) //1기가당 파티션2개
if ( partition_num == 0 ) { partition_num=1 }
var streamthrottlembits = (350 / partition_num) + ""
if ( streamthrottlembits == "0" ) { streamthrottlembits="1" }
file size로 partition 갯수를 정하고 streamthrottlembits를 정했다. 나뉜 파티션으로 foreachPartition을 돌릴 것이다.
try {
val conf = new Configuration()
val cassHosts = "ip1,ip2,ip3,ip4,ip5"
val shuffledCassHosts=Random.shuffle(cassHosts.split(",").toList)
val selectedCassHost=shuffledCassHosts.head
conf.set("cassandra.output.thrift.address", selectedCassHost)
conf.set("mapreduce.output.bulkoutputformat.streamthrottlembits", streamthrottlembits)
new SSTableLoader(new File(dir), new ExternalClient(conf), new OutputHandler.LogOutput).stream().get()
} finally {
var deleteFolderList = dir_file.listFiles();
for ( i <- 0 to deleteFolderList.length -1 ) {
deleteFolderList(i).delete();
}
dir_file.delete();
}
이후 sstable을 만들고 전송을 할 때 위에서 구한 streamthrottlembits값으로 속도를 조절하였다.
또한 코드상으로는 카산드라 클러스터 ip들 중에서 랜덤으로 1개를 골라서 insert를 하였는데 이는 인프라마다 다르겠지만 어쨌든 한쪽 노드로 traffic이 몰리지 않게하기 위함이다.
사실 더 좋은 구성은 카산드라를 1개의 스위치에 모두 물리지 않고 2개 이상의 스위치로 분리해서 구성하였다면 더욱 안전한 아키텍처가 될 것이다.
network traffic이 원하는데로 max 35%만 사용하고 있다.
cpu 상태도 안정적이다.