pyspark

#create an app named linuxhint
spark = SparkSession.builder.appName('linuxhint').getOrCreate()

# create student data with 5 rows and 6 attributes
students =[{'rollno':'001','name':'sravan','age':23,'height':5.79,'weight':67,'address':'guntur'},
               {'rollno':'002','name':'ojaswi','age':16,'height':3.79,'weight':34,'address':'hyd'},
               {'rollno':'003','name':'gnanesh chowdary','age':7,'height':2.79,'weight':17,'address':'patna'},
               {'rollno':'004','name':'rohith','age':9,'height':3.69,'weight':28,'address':'hyd'},
               {'rollno':'005','name':'sridevi','age':37,'height':5.59,'weight':54,'address':'hyd'}]

# create the dataframe
df = spark.createDataFrame( students)



# null 값이 포함된 dataframe 만드는 두 가지 방법
data_with_null = [{'a':None, 'b':2} , {'a':1, 'b':None}]
df = spark.createDataFrame( data_with_null )

#혹은 lit(None) 을 새로운 컬럼으로 추가
from pyspark.sql.types import StringType
from pyspark.sql.functions import lit
df = df.withColumn("null_val", lit(None).cast(StringType()))

https://stackoverflow.com/questions/33038686/add-an-empty-column-to-spark-dataframe

null 값이 포함된 column 에서 null 값을 바꾸려면 fill, fillna 함수 혹은 coalesce 함수 사용
아래 링크 참고
https://sparkbyexamples.com/pyspark/pyspark-fillna-fill-replace-null-values/
https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.functions.coalesce.html


# 1이상 10미만 연속된 숫자를 갖는 샘플 데이터
df = spark.range(1, 10)

https://linuxhint.com/sum-pyspark/

 

 

scala spark

val rdd = spark.sparkContext.parallelize(Seq(
  Item(1, "Thingy A", "awesome thing.", "high", 0),
  Item(2, "Thingy B", "available at http://thingb.com", null, 0),
  Item(3, null, null, "low", 5),
  Item(4, "Thingy D", "checkout https://thingd.ca", "low", 10),
  Item(5, "Thingy E", null, "high", 12)))

val data = spark.createDataFrame(rdd)


https://github.com/awslabs/deequ

 

 

airflow 에서 Task 를 병렬 실행 할 때,

Task 의 수를 조절하는 옵션값들이 헷갈려서 정리함

2022.05.07

 

 

parallelism [공식 문서]

 

(모든 DAG 내에서 실행되는) 최대 task 수

예를 들어 parallelism 이 3이면, 모든 실행중인 DAG 내의 수 많은 task 들 중 단 3개의 task 만 실행됨

여기서 말하는 '실행되는 task'는 'running 상태인 task' 를 의미함

이 값은 scheduler 나 worker 수와 관계 없음

 

parallelism 이 0이면 모든 DAG 내의 task 들이 제한없이 실행됨

task 가 실행된다는 말은, executor 가 실행된다는 말이 됨

executor 는 process 를 하나 생성한 후 그 위에서 실행되는데

이 말인 즉슨, executor 가 무수히 실행될 때 시스템의 자원(cpu, mem)을 가능한대로 가져다가 사용한다는 말이 됨

따라서 parallelism 은 0 이상인 것이 좋음

병렬성을 최대로 활용하기 위해서, parallelism 은 "시스템 cpu 코어수 -1 개"로 설정해주는 것이 좋다고 함

(이것을 CPU-bound 라고 부름)

 

This defines the maximum number of task instances that can run concurrently in Airflow regardless of scheduler count and worker count. Generally, this value is reflective of the number of task instances with the running state in

 

기본값 : 32

환경변수명 : AIRFLOW__CORE__PARALLELISM

 

 

 

max_active_runs_per_dag [공식 문서]

 

최대 활성 DAG 수

이 옵션값 수 만큼의 DAG 만이 자신들의 task 를 실행할 수 있음

예를 들어 max_active_runs_per_dag 가 1이면,

DAG 가 아무리 많아도 한 타임라인에 실행되는 DAG 의 수는 1개가 됨

max_active_runs_per_dag 가 2이면,

DAG 가 아무리 많아도 한 타임라인에 실행되는 DAG 의 수는 2개가 됨

 

The maximum number of active DAG runs per DAG. The scheduler will not create more DAG runs if it reaches the limit. This is configurable at the DAG level with max_active_runs, which is defaulted as max_active_runs_per_dag.

기본값 : 16
환경변수명 : AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG

 

 

max_active_tasks_per_dag [공식 문서]

 

(하나의 DAG 내에서 실행되는) 최대 task 수

예를 들어 max_active_tasks_per_dag 값이 2면,

하나의 DAG 내에 아무리 많은 task 들이 있다고 해도 단 2개의 task 만 실행 가능

(DAG 가 5개인 상황에서 max_active_tasks_per_dag 값이 2면,

모든 DAG 에서 실행중인 task 는 10개 이하가 됨. 5*2=10

'이하'인 이유는, 어떤 DAG 는 task 가 1개 이하일지도 모르기 때문)

 

2.2.0 버전 이후부터 새롭게 등장

dag_concurrency 옵션값이 deprecated 되고, 대신 max_active_tasks_per_dag 가 사용됨

 

The maximum number of task instances allowed to run concurrently in each DAG. To calculate the number of tasks that is running concurrently for a DAG, add up the number of running tasks for all DAG runs of the DAG. This is configurable at the DAG level with max_active_tasks, which is defaulted as max_active_tasks_per_dag.

An example scenario when this would be useful is when you want to stop a new dag with an early start date from stealing all the executor slots in a cluster.

 

기본값 : 16

환경변수값 : AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG

 

 

 

 

 

 

select reflect("java.net.URLDecoder", "decode", [컬럼명]) from table;

SELECT reflect("java.net.URLDecoder", "decode", field_name) FROM table;

 

https://stackoverflow.com/a/15475187

 

 

 

 

'SQL' 카테고리의 다른 글

[PostgreSQL] HA 구성하는 방법  (0) 2022.07.25
[SQL] hackerrank New Company 쿼리  (0) 2022.03.09
[MySQL] 여러가지 함수 예제 모음  (2) 2022.02.13
[SQL] WAL 이란 무엇인가?  (0) 2022.02.08
[Phoenix] update values 하는 방법  (0) 2021.11.29

 

 

CentOS 에서

/etc/resolv.conf 를 아래와 같이 바꾸거나 추가하면 됨

 

nameserver 8.8.8.8
nameserver 168.126.63.1

 

8.8.8.8 은 google 의 dns 서버이고

168.126.63.1 은 KT 의 dns 서버임

 

 

 

https://www.lesstif.com/system-admin/linux-dns-domain-name-server-100205650.html

 

Linux에 DNS(Domain Name Server) 설정 방법

최근 Linux 배포판은 resolv.conf 를 직접 수정하면 재부팅시 정보가 사라질 수 있으니 nmcli나 netplan 을 사용해서 설정해야 합니다.

www.lesstif.com

 

 

 

 

 

HDFS 에 디렉터리/파일들이 너무 많아서

NameNode 의 memory 가 위험하다 싶을 때

HDFS 내의 디렉터리/파일들을 하나의 har 파일로 아카이빙하여

디렉터리/파일 개수를 줄일 수 있음

디렉터리/파일 개수가 줄어들면 파일의 메타데이터가 줄어들어서

NameNode 의 memory 를 절약할 수 있음

 

참고로 HDFS 상의 파일, 디렉터리 개수를 알고 싶다면 -count 옵션을 주면 됨

 

hdfs dfs -count -h /user/eyeballs/*

결과가 총 4개의 열로 나오는데, 다음과 같은 뜻임.

DIR_COUNT FILE_COUNT CONTENT_SIZE FILE_NAME

따라서, 첫번째 두번째 열의 크기를 통해 어떤 path 에 파일, 디렉터리가 많이 있구나 를 알 수 있음

 

 

 

예제를 들어, har 파일로 아카이빙하는 방법을 설명해 봄

 

나의 HDFS 에 다음과 같은 파일이 1000개 있음

 


hadoop fs -ls /user/eyeballs/data

/user/eyeballs/data/1
/user/eyeballs/data/2
...
/user/eyeballs/data/1000

 

 

나는 이 1000 개의 파일들을 하나의 har 파일로 묶고 싶음(archiving) 

hadoop archive 명령어를 이용하여

/user/eyeballs/data 디렉터리 자체를

test.har 라는 파일로 묶음

 


hadoop archive -archiveName test.har -p /user/eyeballs data /user/eyeballs/

 

명령어들을 하나씩 뜯어보자.

 

hadoop archive : 하둡에서 제공하는 archiving 기능을 실행하는 명령어

  map-reduce job 으로 실행됨

 

-archiveName : 아카이빙 할 디렉터리가 저장될 har 파일의 이름

 

test.har : 'test.har' 라는 이름으로 아카이빙

 

-p : 아카이빙 할 디렉터리의 부모 path

  여기 예제에서는, /user/eyeballs 가 되는데

  왜냐하면 내가 아카이빙 할 디렉터리가 /user/eyeballs/data 이고

  data 의 부모 path 는 /user/eyeballs/ 이기 때문


data : 아카이빙 할 디렉터리

 

/user/eyeballs : 아카이빙하여 생성될 har 파일이 저장될 path

 

 

 

위의 명령어를 실행한 후 ls 명령으로 확인해보면

test.har 파일이 잘 생성된 것을 볼 수 있음


hadoop fs -ls /user/eyeballs/

/user/eyeballs/data
/user/eyeballs/test.har



생성된 test.har 는 파일이 아니라 dir 로 분류됨
test.har 내부로 들어가보면 다음과 같이 나타남

 


hdfs dfs -ls /user/eyeballs/test.har/


/user/eyeballs/test.har/_SUCCESS
/user/eyeballs/test.har/_index
/user/eyeballs/test.har/_masterindex
/user/eyeballs/test.har/part-0

 

 

test.har 내부 아카이빙된 data 디렉터리를 보고싶다?

'har' 스키마를 이용해서 test.har 내부에

내가 아카이빙 한 data 디렉터리를 볼 수 있음

 


hadoop fs -ls har:///user/eyeballs/test.har


har:///user/eyeballs/test.har/data

 

 

test.har 내부의 data 디렉터리도 열어볼 수 있음

 


hadoop fs -ls har:///user/eyeballs/test.har/data



har:///user/eyeballs/test.har/data/1
har:///user/eyeballs/test.har/data/2
...
har:///user/eyeballs/test.har/data/1000

 

 

 

 

생성한 test.har 는

HDFS 내의 디렉터리를 지우는 방법과 동일한 방법으로 지울 수 있음

 


hadoop fs -rm -r /user/eyeballs/test.har






위의 예제에서는 /user/eyeballs/data 하나만을 아카이빙 했는데,

만약 여러 디렉터리를 아카이빙 하고싶다면?

예를 들어 /user/eyeballs 에 data, data2, data3 디렉터리가 있다고 하자.

 


hadoop fs -ls /user/eyeballs/


/user/eyeballs/data
/user/eyeballs/data2
/user/eyeballs/data3

 

data, data2, data3 를 모두 아카이빙 하려면,

다음과 같이 아카이빙 할 디렉터리를 위치에 모든 이름을 넣어주면 됨



hadoop archive -archiveName test.har -p /user/eyeballs data data2 data3 /user/eyeballs/

 

 

위에서 본 것 처럼,

har 스키마를 통해서 test.har 아카이빙 내부의 data, data2, data3 를 볼 수 있음

 

 


hadoop fs -ls har:///user/eyeballs/test.har


har:///user/eyeballs/test.har/data
har:///user/eyeballs/test.har/data2
har:///user/eyeballs/test.har/data3

 

 

 

 

test.har 로 아카이빙 한 har 파일을 다시 HDFS 에 되돌리는 복구 작업은 

hadoop distcp 명령어를 사용하여 진행됨

 

다음과 같이 test.har 파일이 있다고 하자.

 


hadoop fs -ls /user/eyeballs/


/user/eyeballs/test.har

 

 

위의 test.har 를 복구하려면 아래 명령어를 사용

 


hadoop distcp har:///user/eyeballs/test.har/ /user/eyeballs/decompress/

 

 

마지막 인자값 (/user/eyeballs/decompress/) 에

test.har 내부 디렉터리들이 모두 복구됨

 


hadoop fs -ls /user/eyeballs/decompress


/user/eyeballs/decompress/data
/user/eyeballs/decompress/data2
/user/eyeballs/decompress/data3



참고로 복구 작업을 마친 test.har는 사라짐



추가)

 

archive map-reduce 작업의 큐는 다음과 같이 지정 가능

 


hadoop archive -archiveName test.har -Dmapred.job.queue.name=queue_name -p /user/eyeballs data data2 data3 /user/eyeballs/

 

 

크기가 너무 큰 파일/디렉터리를 아카이빙 하는 경우

OOM 에러가 남

 

Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded

 

 

 

 

 

참고

https://hadoop.apache.org/docs/current/hadoop-archives/HadoopArchives.html

https://wikidocs.net/27605

 

 

 

+ Recent posts