읽기 전에

* Hadoop HDFS Cluster 와 Spark Cluster 가 이미 설치되어있다는 전제 하에 아래 내용을 설명한다.

* HBase 와 Zookeeper 가 이미 실행중이라는 전제 하에 아래 내용을 설명한다.

* HBase 설치 방법

* Zookeeper 설치 방법

 

 

내가 실행한 환경

- Ubuntu 16.04

- Spark 2.4.0 

- Hadoop 3.1.1 Cluster Node 2개 (master 1개, slave 1개)
- Scala 2.11.12

- HBase 2.2.2

- Zookeeper 3.5.6

 

아래 절차를 따르기 전에 이미 HDFS, HBase, Zookeeper 가 실행중이어야 한다.

Master 와 Slave 서버에서 jps 로 현제 실행 중인지 데몬 목록으로 확인해본다.

Master 에서는 HMaster, Slave 서버에서는 HRegionServer 가 실행중이어야 한다.

 

 

< Master 서버에서 jps >

 

< Slave 서버에서 jps >

 

 

Spark 와 연동하는 방법은 총 세 가지가 있다.

1. Hortonworks Spark HBase Connector(SHC) 를 사용하는 방법 (git)

2. nerdammer Spark Hbase Connector 를 사용하는 방법 (git)

3. Apache HBase Spark Connector 를 사용하는 방법 (git)

 

여기 포스트에서는 Hortonworks 에서 제공하는 첫번째 방법을 사용한다.

각자 어떤 connecotr 를 사용할 것인지는 아래 링크들을 참고한다.

링크1

링크2

링크3

 

*공식 github에서 받은 Hortonworks shc 는 Depencency(json4s) 와 Version 문제가 심각하다.

(에러 내용 java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods$.parse(Lorg/json4s/JsonInput;Z)Lorg/json4s/JsonAST$JValue;)

여기 링크 답변자의 github를 대신 사용한다.

답변자의 github

 

 

 

 

1.

먼저 HBase 에 샘플 데이터를 넣는다.

HBase/bin 으로 가서 ./hbase shell 로 hbase shell 에 접속한 후에 아래 put 명령어를 넣는다.

create 'Contacts', 'Personal', 'Office'


put 'Contacts', '1000', 'Personal:Name', 'John Dole'
put 'Contacts', '1000', 'Personal:Phone', '1-425-000-0001'
put 'Contacts', '1000', 'Office:Phone', '1-425-000-0002'
put 'Contacts', '1000', 'Office:Address', '1111 San Gabriel Dr.'
put 'Contacts', '8396', 'Personal:Name', 'Calvin Raji'
put 'Contacts', '8396', 'Personal:Phone', '230-555-0191'
put 'Contacts', '8396', 'Office:Phone', '230-555-0191'
put 'Contacts', '8396', 'Office:Address', '5415 San Gabriel Dr.'

scan "Contacts" 로 잘 들어갔는지 확인

 

 

 

2.

master 가 되는 노드의 적당한 곳에 상기한 github 를 clone 한다.

cd ~; git clone https://github.com/dhananjaypatkar/shc

 

 

 

 

3.

받은 shc 로 들어가서 바로 maven 으로 package를 만든다.

mvn clean package

위의 명령어에 의해 maven 으로 build 하는데 시간이 꽤 걸린다.

 

/shc/core/target/ 에 build 된 jar file 들이 있는지 확인한다.

이름 : shc-core-1.1.3-2.4-s_2.11-shaded.jar, shc-core-1.1.3-2.4-s_2.11.jar

혹은, 필자가 build 한 jar 파일을 그대로 사용한다. 다운로드

 

 

만약 maven 이 설치되어 있지 않다면, 아래 명령어로 설치한다.

sudo apt-get update; sudo apt-get upgrade -y ; sudo apt-get install maven

설치하는 데 시간이 좀 걸린다.

 

 

 

 

 

 

4.

위에서 build 한 shc jars 의 위치를 기억해둔다.

spark-shell 을 시작할 때 --jars 옵션에 상기한 두 위치를 넣기 넣는다.

이 때 쉼표(,)로 구분하고, 쉼표 다음에 white space 없이 곧바로 이어서 path를 넣는다.

spark-shell --master yarn --jars /root/shc/core/target/*.jar

 

 

 

 

 

 

 

5.

아래 코드를 통해 hbase 에 접근하여 데이터를 가져온다.

 

import org.apache.spark.sql.{SQLContext, _}

import org.apache.spark.sql.execution.datasources.hbase._

import org.apache.spark.{SparkConf, SparkContext}

import spark.sqlContext.implicits._

 

def catalog = s"""{

    |"table":{"namespace":"default", "name":"Contacts"},

    |"rowkey":"key",

    |"columns":{

    |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"},

    |"officeAddress":{"cf":"Office", "col":"Address", "type":"string"},

    |"officePhone":{"cf":"Office", "col":"Phone", "type":"string"},

    |"personalName":{"cf":"Personal", "col":"Name", "type":"string"},

    |"personalPhone":{"cf":"Personal", "col":"Phone", "type":"string"}

    |}

|}""".stripMargin

 

def withCatalog(cat: String): DataFrame = {

    spark.sqlContext

    .read

    .options(Map(HBaseTableCatalog.tableCatalog->cat))

    .format("org.apache.spark.sql.execution.datasources.hbase")

    .load()

 }

 

val df = withCatalog(catalog)

df.show()

 

마지막 df.show 코드에 의해 아래처럼 두 가지 row 가 나온다.

 

이렇게 spark 가 hbase 에서 데이터를 dataframes 형태로 가져올 수 있다.

 

 

 

 

 

 

 

shc 에 의해 데이터를 가져올 때, Spark 는 HBase 데이터의 split 된 partition 하나 하나를 읽어온다.

 

이 table 이 split 된 것은 HDFS 에서 바로 확인이 가능하다.

 

위의 설명을 따라했다면, 아래 path 상에 split 된 데이터가 보일 것이다.

 

hdfs dfs -ls /hbase/data/default/Contacts

 

위의 설명에서 만든 table 이름은 Contacts이다, 이 이미지는 테이블이 다른 이름(word_10gb_b128)을 사용하고 있을 때 찍었다.

 

보면 dot( . ) 으로 시작하는 파일 말고 이상한 hash 값으로 되어있는 값이 4개 있는 것을 볼 수 있다.

 

위의 테이블(word_10gb_b128) 은 4개의 partition 으로 split 되어있다고 볼 수 있다.

 

따라서, Spark 에서 해당 데이터를 읽게 된다면,

하나의 split 을 읽고 executor 에서 처리하고,

하나의 split 을 읽고 executor 에서 처리하고....를 반복하여,

결국에는 총 4개의 executor 를 실행하게 된다.

 

분산 처리, 병렬 처리하는 데 이 executor 실행 개수를 설정하는 것이 중요하기 때문에

 

이 HBase table의 partition 개수를 조절하는 방법을 알아야 한다.

 

여기를 참고하여 partition 개수를 조절해본다.

 

 

 

 

 

 

 

 

 

shc 에서 hbase 의 모든 데이터를 읽기 보다, filter 를 줘서 특정 row 만 읽게 할 수 있는 것 같다.

실제로 하고 나서 블로그에 포스팅 하겠음.

참고

https://github.com/hortonworks-spark/shc/blob/master/examples/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseSource.scala#L118

https://community.cloudera.com/t5/Support-Questions/Using-shc-API-how-to-fetch-only-specific-rows-on-the-basis/td-p/195049

 

 

 

 

 

 

 

 

 

참고 

 

https://www.youtube.com/watch?v=we53uiOlnLA

https://docs.microsoft.com/ko-kr/azure/hdinsight/hdinsight-using-spark-query-hbase

https://community.cloudera.com/t5/Support-Questions/SHC-on-HDP-3-0-With-spark-2-4/td-p/236356

https://blog.cloudera.com/spark-hbase-connector-a-year-in-review/

 

 

 

 

+ Recent posts