읽기 전에
* Hadoop HDFS Cluster 와 Spark Cluster 가 이미 설치되어있다는 전제 하에 아래 내용을 설명한다.
* HBase 와 Zookeeper 가 이미 실행중이라는 전제 하에 아래 내용을 설명한다.
내가 실행한 환경
- Ubuntu 16.04
- Spark 2.4.0
- Hadoop 3.1.1 Cluster Node 2개 (master 1개, slave 1개)
- Scala 2.11.12
- HBase 2.2.2
- Zookeeper 3.5.6
아래 절차를 따르기 전에 이미 HDFS, HBase, Zookeeper 가 실행중이어야 한다.
Master 와 Slave 서버에서 jps 로 현제 실행 중인지 데몬 목록으로 확인해본다.
Master 에서는 HMaster, Slave 서버에서는 HRegionServer 가 실행중이어야 한다.
Spark 와 연동하는 방법은 총 세 가지가 있다.
1. Hortonworks Spark HBase Connector(SHC) 를 사용하는 방법 (git)
2. nerdammer Spark Hbase Connector 를 사용하는 방법 (git)
3. Apache HBase Spark Connector 를 사용하는 방법 (git)
여기 포스트에서는 Hortonworks 에서 제공하는 첫번째 방법을 사용한다.
각자 어떤 connecotr 를 사용할 것인지는 아래 링크들을 참고한다.
*공식 github에서 받은 Hortonworks shc 는 Depencency(json4s) 와 Version 문제가 심각하다.
(에러 내용 java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods$.parse(Lorg/json4s/JsonInput;Z)Lorg/json4s/JsonAST$JValue;)
여기 링크 답변자의 github를 대신 사용한다.
1.
먼저 HBase 에 샘플 데이터를 넣는다.
HBase/bin 으로 가서 ./hbase shell 로 hbase shell 에 접속한 후에 아래 put 명령어를 넣는다.
create 'Contacts', 'Personal', 'Office'
|
scan "Contacts" 로 잘 들어갔는지 확인
2.
master 가 되는 노드의 적당한 곳에 상기한 github 를 clone 한다.
cd ~; git clone https://github.com/dhananjaypatkar/shc |
3.
받은 shc 로 들어가서 바로 maven 으로 package를 만든다.
mvn clean package |
위의 명령어에 의해 maven 으로 build 하는데 시간이 꽤 걸린다.
/shc/core/target/ 에 build 된 jar file 들이 있는지 확인한다.
이름 : shc-core-1.1.3-2.4-s_2.11-shaded.jar, shc-core-1.1.3-2.4-s_2.11.jar
혹은, 필자가 build 한 jar 파일을 그대로 사용한다. 다운로드
만약 maven 이 설치되어 있지 않다면, 아래 명령어로 설치한다.
sudo apt-get update; sudo apt-get upgrade -y ; sudo apt-get install maven |
설치하는 데 시간이 좀 걸린다.
4.
위에서 build 한 shc jars 의 위치를 기억해둔다.
spark-shell 을 시작할 때 --jars 옵션에 상기한 두 위치를 넣기 넣는다.
이 때 쉼표(,)로 구분하고, 쉼표 다음에 white space 없이 곧바로 이어서 path를 넣는다.
spark-shell --master yarn --jars /root/shc/core/target/*.jar |
5.
아래 코드를 통해 hbase 에 접근하여 데이터를 가져온다.
import org.apache.spark.sql.{SQLContext, _} import org.apache.spark.sql.execution.datasources.hbase._ import org.apache.spark.{SparkConf, SparkContext} import spark.sqlContext.implicits._
def catalog = s"""{ |"table":{"namespace":"default", "name":"Contacts"}, |"rowkey":"key", |"columns":{ |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"}, |"officeAddress":{"cf":"Office", "col":"Address", "type":"string"}, |"officePhone":{"cf":"Office", "col":"Phone", "type":"string"}, |"personalName":{"cf":"Personal", "col":"Name", "type":"string"}, |"personalPhone":{"cf":"Personal", "col":"Phone", "type":"string"} |} |}""".stripMargin
def withCatalog(cat: String): DataFrame = { spark.sqlContext .read .options(Map(HBaseTableCatalog.tableCatalog->cat)) .format("org.apache.spark.sql.execution.datasources.hbase") .load() }
val df = withCatalog(catalog) df.show()
|
마지막 df.show 코드에 의해 아래처럼 두 가지 row 가 나온다.
이렇게 spark 가 hbase 에서 데이터를 dataframes 형태로 가져올 수 있다.
shc 에 의해 데이터를 가져올 때, Spark 는 HBase 데이터의 split 된 partition 하나 하나를 읽어온다.
이 table 이 split 된 것은 HDFS 에서 바로 확인이 가능하다.
위의 설명을 따라했다면, 아래 path 상에 split 된 데이터가 보일 것이다.
hdfs dfs -ls /hbase/data/default/Contacts
보면 dot( . ) 으로 시작하는 파일 말고 이상한 hash 값으로 되어있는 값이 4개 있는 것을 볼 수 있다.
위의 테이블(word_10gb_b128) 은 4개의 partition 으로 split 되어있다고 볼 수 있다.
따라서, Spark 에서 해당 데이터를 읽게 된다면,
하나의 split 을 읽고 executor 에서 처리하고,
하나의 split 을 읽고 executor 에서 처리하고....를 반복하여,
결국에는 총 4개의 executor 를 실행하게 된다.
분산 처리, 병렬 처리하는 데 이 executor 실행 개수를 설정하는 것이 중요하기 때문에
이 HBase table의 partition 개수를 조절하는 방법을 알아야 한다.
여기를 참고하여 partition 개수를 조절해본다.
shc 에서 hbase 의 모든 데이터를 읽기 보다, filter 를 줘서 특정 row 만 읽게 할 수 있는 것 같다.
실제로 하고 나서 블로그에 포스팅 하겠음.
참고
참고
https://www.youtube.com/watch?v=we53uiOlnLA
https://docs.microsoft.com/ko-kr/azure/hdinsight/hdinsight-using-spark-query-hbase
https://community.cloudera.com/t5/Support-Questions/SHC-on-HDP-3-0-With-spark-2-4/td-p/236356
https://blog.cloudera.com/spark-hbase-connector-a-year-in-review/
'HBase' 카테고리의 다른 글
[HBase] ERROR: org.apache.hadoop.hbase.PleaseHoldException: Master is initializing 에러 해결 방법 (0) | 2019.11.14 |
---|---|
[HBase] tsv, csv file load 하는 방법 (0) | 2019.11.06 |
[HBase] log 보는 방법 (0) | 2019.10.31 |
[HBase] Spark 와 연동해서 데이터 읽는 방법 (2) | 2019.10.24 |
[HBase] 기본 개념 (0) | 2019.10.21 |