2019년 9월 30일 월요일

Polybase를 활용한 ETL(mssql -> hadoop) 라이브러리 개발

Polybase를 활용한 ETL(mssql -> hadoop) 라이브러리 개발

ssis, sqoop 등 etl 툴이 많지만 db to db가 아닌 mssql에서 hadoop으로 데이터를 내린다면 polybase를 활용하여 쉽게 해결할 수 있다.

구조(준비물)은 다음과 같다.

1. EtlSqlOperator.java
- Connect mssql, Read sql query file, Run sql, Delete/Move hdfs path
2. EtlSqlOperator.sh
- Run shell script
3. DelHdfsPath.sh
- Delete path if exists
4. MoveHdfsPath.sh
- Move path if exists
5. sql query file
- CETAS query
6. sqljdbc.jar
- For db connection


사용방법
원하는 경로에 etl 디렉토리(상위)를 만들었고 etl 디렉토리 하위에는 sql, lib 디렉토리가 있다. lib에는 jdbc driver를 넣어놨고 sql 디렉토리에는 실제로 날릴 CETAS 쿼리들이 들어있다. 나머지 쉘 스크립트나 java 파일은 etl 디렉토리에 넣어놨다.

1. EtlSqlOperator.java를 자신의 상황에 맞게 수정해서 컴파일 한다.
2. 쿼리 구문을 작성해서 sql 디렉토리 하위에 넣는다.
3. EtlSqlOperator.sh에 2번에서 작성한 sql 쿼리 파일을 추가해서 실행한다.



소스, 드라이버 등 파일은 깃헙에 올려놓았다.
깃헙 주소 : https://github.com/parksuseong/ETL_Library



파일 설명 부분

EtlSqlOperator.java 

import java.sql.Connection;
import java.io.File;
import java.io.FileReader;
import java.io.FileNotFoundException;
import java.io.IOException;

import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.ResultSet;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;

public class EtlSqlOperator {

public static void DelPath(String str){

try {
Runtime runtime = Runtime.getRuntime();
Process process = runtime.exec("path/etl/DelHdfsPath.sh " + str );

InputStream is = process.getInputStream();
InputStreamReader isr = new InputStreamReader(is);

BufferedReader br = new BufferedReader(isr);

String line;
   // this is trick
while((line = br.readLine()) != null) {

System.out.println(line);
}


} catch (IOException e1) {
System.out.println("del error..........");
// TODO Auto-generated catch block
e1.printStackTrace();
}

}


public static void MovePath(String str1, String str2){

try {
Runtime runtime = Runtime.getRuntime();
System.out.println("------------------------------------------------------------");
System.out.println("from " + str2 + " to " + str1);
System.out.println("------------------------------------------------------------");
String tmptmp = "path/etl/MoveHdfsPath.sh " + str1 + " " + str2+"*";
Process process = runtime.exec(tmptmp);

InputStream is = process.getInputStream();
InputStreamReader isr = new InputStreamReader(is);

BufferedReader br = new BufferedReader(isr);

String line;
   // this is trick
while((line = br.readLine()) != null) {

System.out.println(line);

}


} catch (IOException e1) {
System.out.println("move error.........");
// TODO Auto-generated catch block
e1.printStackTrace();
}

}


public static void main (String args []) {

String sql = "";
System.out.println("ETL Start...");
System.out.println("PARAMETER FILE NAME IS " + args[0]);

// sql file read
System.out.println("------------------------ READ SQL FILE ----------------------");
try{
File file = new File(args[0]);
FileReader filereader = new FileReader("path/etl/sql/"+file);
int singleCh = 0;
while((singleCh = filereader.read()) != -1){
sql += (char)singleCh;
}
filereader.close();
}catch (FileNotFoundException e) {
// TODO: handle exception
System.out.println(e);
}catch(IOException e){
System.out.println(e);
}


Connection conn = null;
Statement stmt = null;
ResultSet rs = null;

try {

// connect db
Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver");
conn = DriverManager.getConnection("jdbc:sqlserver://xx.xxx.x.xx:1433","userid","password");
stmt = conn.createStatement();



int start_idx = sql.indexOf("LOCATION='/")+10;
int end_idx = sql.indexOf(",",10);
// create destination path
String path_str = sql.substring(start_idx,end_idx-1);
// create temp path
String tmp_path = "/user/BIGDATA/DW/STG"+path_str;
// delete temp path if exists
System.out.println("------------------------------ destination path ------------------------------ \n" + path_str);
System.out.println("------------------------------ tmp path ------------------------------ \n" + tmp_path);
System.out.println("--------------------------------------------------");
DelPath(tmp_path);
// create tmp_sql
String tmp_sql = sql.replace(path_str, tmp_path);
System.out.println("------------------------------ tmp sql ------------------------------\n" + tmp_sql);
// run tmp_sql
System.out.println("------------------------------ tmp_sql run ------------------------------");
int result = stmt.executeUpdate(tmp_sql);
// delete destination path if exists
System.out.println("------------------------------ del func ------------------------------");
DelPath(path_str);
System.out.println("------------------------------ del complete ------------------------------");
//move to destination from temp path
MovePath(path_str,tmp_path);
System.out.println("------------------------------ move succ ------------------------------");
} catch (SQLException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} finally{
try {
if (rs!=null) rs.close();
if (stmt!=null) stmt.close();
if (conn!=null) conn.close();
} catch (SQLException e) {}
}


}

}


EtlSqlOperator.sh



DelHdfsPath.sh


MoveHdfsPath.sh



SQL_TEST_TABLE (example)




주의할 점은 지우고 쓰는 것을 sync를 맞춰야되서 나름대로의 trick을 넣어뒀다.
// this is trick
그렇지 않으면 쓰는동안 지워버리는 문제가 생길 수 있기 때문이다. 그때의 에러는 해당 경로가 존재하지 않는다고 나올 것이다.

댓글 2개:

  1. Thank you for those kind words. It's not enough, but I'll try to post it often.

    답글삭제
  2. The article is so appealing. You should read this article before choosing the Google cloud big data services you want to learn.


    답글삭제

2022년 회고

 올해는 블로그 포스팅을 열심히 못했다. 개인적으로 지금까지 경험했던 내용들을 리마인드하자는 마인드로 한해를 보낸 것 같다.  대부분의 시간을 MLOps pipeline 구축하고 대부분을 최적화 하는데 시간을 많이 할애했다. 결국에는 MLops도 데이...