핵심 에러 내용 : java.lang.IllegalArgumentException: Option 'basePath' must be a directory
위의 에러가 난 상황에 대해 설명하고 해결책을 제시한다.
나는 그 때 당시 json data를 hdfs 의 /streaming_data 에 바로 꽂아 넣었고
/streaming_data 를 path 로 load 하기를 시도했다.
아래 코드처럼.
val schema = spark.read.format("json").load("/streaming_data").schema
val df = spark.readStream.format("json").schema(schema).load("/streaming_data")
val group = df.groupBy("gt").count()
val strm = group.writeStream.queryName("count").format("console").outputMode("complete").start().awaitTermination()
하지만 위의 에러가 뙇 뜨면서 내 발목을 잡았다.
구글링 해 본 결과 load 에 넣어야 하는 path 는 data 가 아니라 directory 여야 한다고 한다.
그래서 hdfs 내의 데이터를 /data/streaming_data 로 옮겼고,
load 에는 /data 를 대신 넣었다.
아래 코드처럼.
val schema = spark.read.format("json").load("/data").schema
val df = spark.readStream.format("json").schema(schema).load("/data")
val group = df.groupBy("gt").count()
val strm = group.writeStream.queryName("count").format("memory").outputMode("complete").start().awaitTermination()
위의 에러 없이 start 되었다.
해설 :
똑같은 schema 를 갖는 데이터가 해당 directory(위에선 /data) 에 추가되면,
추가된 데이터를 spark streaming 이 인지하고 batch 로 만들어 streaming 처리를 한다.
예를 들어, 기존의 aaa 파일이 /data/ 에 있을 때, streaming 이 start 되면 aaa 를 먼저 읽고 streaming 처리 한다.
streaming 이 아직 끝나지 않았고, 내가 직접 /data/ 에, bbb를 하나 더 넣어주면 (여기서 bbb는 aaa와 똑같은 schema 를 갖음)
spark 가 bbb를 인식하고 streaming 처리를 하게 된다.
아래 그림 처럼.
(eyeballs 가 내가 직접 넣은 bbb)
따라서, /data/ directory 로 데이터가 하나 둘 씩 생성되면, spark 는 자동으로 그것들을 잡고 streaming 한다.
(물론 output mode 에 따라 다르게 동작한다.)
아래는 첫번째 코드를 실행시켰을 때 보이는 전체 에러 log 이다.
...더보기
scala> val strm = group.writeStream.queryName("count").format("console").outputMode("complete").start().awaitTermination() 2019-09-03 06:47:50 ERROR MicroBatchExecution:91 - Query count [id = 3f4ff5b7-0510-494b-a64f-889e69ea3dfe, runId = 6cad0949-8d1c-4697-94fa-03aec75f2b00] terminated with error java.lang.IllegalArgumentException: Option 'basePath' must be a directory at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.basePaths(PartitioningAwareFileIndex.scala:218) at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.inferPartitioning(PartitioningAwareFileIndex.scala:132) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.partitionSpec(InMemoryFileIndex.scala:71) at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.partitionSchema(PartitioningAwareFileIndex.scala:50) at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:144) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:373) at org.apache.spark.sql.execution.streaming.FileStreamSource.getBatch(FileStreamSource.scala:174) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$10.apply(MicroBatchExecution.scala:438) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$10.apply(MicroBatchExecution.scala:434) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:434) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:434) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:433) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189) org.apache.spark.sql.streaming.StreamingQueryException: Query count [id = 3f4ff5b7-0510-494b-a64f-889e69ea3dfe, runId = 6cad0949-8d1c-4697-94fa-03aec75f2b00] terminated with exception: Option 'basePath' must be a directory at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189) Caused by: java.lang.IllegalArgumentException: Option 'basePath' must be a directory at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.basePaths(PartitioningAwareFileIndex.scala:218) at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.inferPartitioning(PartitioningAwareFileIndex.scala:132) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.partitionSpec(InMemoryFileIndex.scala:71) at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.partitionSchema(PartitioningAwareFileIndex.scala:50) at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:144) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:373) at org.apache.spark.sql.execution.streaming.FileStreamSource.getBatch(FileStreamSource.scala:174) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$10.apply(MicroBatchExecution.scala:438) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$10.apply(MicroBatchExecution.scala:434) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:434) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:434) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:433) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) ... 1 more |
'Spark' 카테고리의 다른 글
[Spark] Spark WebUI DAG 이해하기 (0) | 2019.09.09 |
---|---|
[Spark Streaming] Structured Streaming 명령어 및 예제 (0) | 2019.09.03 |
[Spark Streaming] Structured Streaming 공부에 도움 되는 사이트 (0) | 2019.09.03 |
[Spark] Docker 로 Spark 클러스터 구성하는 방법 (2) | 2019.08.30 |
[Spark] 스칼라 DataFrame 다양한 연산 모음 (1) | 2019.08.28 |