핵심 에러 내용 : java.lang.IllegalArgumentException: Option 'basePath' must be a directory

 

위의 에러가 난 상황에 대해 설명하고 해결책을 제시한다.

 

나는 그 때 당시 json data를 hdfs 의  /streaming_data 에 바로 꽂아 넣었고

 

/streaming_data 를 path 로 load 하기를 시도했다.

 

아래 코드처럼.

 

val schema = spark.read.format("json").load("/streaming_data").schema
val df = spark.readStream.format("json").schema(schema).load("/streaming_data")
val group = df.groupBy("gt").count()
val strm = group.writeStream.queryName("count").format("console").outputMode("complete").start().awaitTermination()

 

 

하지만 위의 에러가 뙇 뜨면서 내 발목을 잡았다.

 

구글링 해 본 결과 load 에 넣어야 하는 path 는 data 가 아니라 directory 여야 한다고 한다.

 

그래서 hdfs 내의 데이터를 /data/streaming_data 로 옮겼고,

 

load 에는 /data 를 대신 넣었다.

 

아래 코드처럼.

 

val schema = spark.read.format("json").load("/data").schema
val df = spark.readStream.format("json").schema(schema).load("/data")
val group = df.groupBy("gt").count()
val strm = group.writeStream.queryName("count").format("memory").outputMode("complete").start().awaitTermination()

 

 

위의 에러 없이 start 되었다.

 

 

 

해설 : 

 

똑같은 schema 를 갖는 데이터가 해당 directory(위에선 /data) 에 추가되면,

 

추가된 데이터를 spark streaming 이 인지하고 batch 로 만들어 streaming 처리를 한다.

 

 

예를 들어, 기존의 aaa 파일이 /data/ 에 있을 때, streaming 이 start 되면 aaa 를 먼저 읽고 streaming 처리 한다.

 

streaming 이 아직 끝나지 않았고, 내가 직접 /data/ 에, bbb를 하나 더 넣어주면 (여기서 bbb는 aaa와 똑같은 schema 를 갖음)

 

spark 가 bbb를 인식하고 streaming 처리를 하게 된다.

 

아래 그림 처럼.

 

complete mode
append mode

(eyeballs 가 내가 직접 넣은 bbb)

 

따라서, /data/ directory 로 데이터가 하나 둘 씩 생성되면, spark 는 자동으로 그것들을 잡고 streaming 한다.

 

(물론 output mode 에 따라 다르게 동작한다.)

 

 

 

 

 

 

 

 

 

아래는 첫번째 코드를 실행시켰을 때 보이는 전체 에러 log 이다.

...더보기

scala> val strm = group.writeStream.queryName("count").format("console").outputMode("complete").start().awaitTermination()

2019-09-03 06:47:50 ERROR MicroBatchExecution:91 - Query count [id = 3f4ff5b7-0510-494b-a64f-889e69ea3dfe, runId = 6cad0949-8d1c-4697-94fa-03aec75f2b00] terminated with error

java.lang.IllegalArgumentException: Option 'basePath' must be a directory

at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.basePaths(PartitioningAwareFileIndex.scala:218)

at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.inferPartitioning(PartitioningAwareFileIndex.scala:132)

at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.partitionSpec(InMemoryFileIndex.scala:71)

at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.partitionSchema(PartitioningAwareFileIndex.scala:50)

at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:144)

at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:373)

at org.apache.spark.sql.execution.streaming.FileStreamSource.getBatch(FileStreamSource.scala:174)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$10.apply(MicroBatchExecution.scala:438)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$10.apply(MicroBatchExecution.scala:434)

at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)

at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)

at scala.collection.Iterator$class.foreach(Iterator.scala:891)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)

at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)

at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)

at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:434)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:434)

at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)

at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:433)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)

at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)

at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)

at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)

at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)

at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)

org.apache.spark.sql.streaming.StreamingQueryException: Query count [id = 3f4ff5b7-0510-494b-a64f-889e69ea3dfe, runId = 6cad0949-8d1c-4697-94fa-03aec75f2b00] terminated with exception: Option 'basePath' must be a directory

  at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)

  at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)

Caused by: java.lang.IllegalArgumentException: Option 'basePath' must be a directory

  at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.basePaths(PartitioningAwareFileIndex.scala:218)

  at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.inferPartitioning(PartitioningAwareFileIndex.scala:132)

  at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.partitionSpec(InMemoryFileIndex.scala:71)

  at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.partitionSchema(PartitioningAwareFileIndex.scala:50)

  at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:144)

  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:373)

  at org.apache.spark.sql.execution.streaming.FileStreamSource.getBatch(FileStreamSource.scala:174)

  at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$10.apply(MicroBatchExecution.scala:438)

  at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$10.apply(MicroBatchExecution.scala:434)

  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)

  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)

  at scala.collection.Iterator$class.foreach(Iterator.scala:891)

  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)

  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

  at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)

  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)

  at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)

  at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:434)

  at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:434)

  at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)

  at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)

  at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:433)

  at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)

  at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)

  at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)

  at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)

  at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)

  at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)

  at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)

  at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)

  at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)

  ... 1 more

 

 

참고 https://stackoverflow.com/questions/48357753/error-java-lang-illegalargumentexception-option-basepath-must-be-a-directory

+ Recent posts