Estoy cargando varios archivos desde un directorio usando comodines como se muestra a continuación:

val df: DataFrame = spark.read
          .format("csv")
          .option("delimiter", ",")
          .schema(schema)
          .load(inputPath + "/*.csv*")

Esto funciona muy bien en su mayor parte. Pero cuando inputPath no tiene ningún archivo csv, obtengo:

org.apache.spark.sql.AnalysisException: Path does not exist

¿Hay alguna manera de evitar este error para que carguemos si los archivos csv están ahí pero no error si no hay nada que cargar?

0
NITS 26 ago. 2020 a las 20:01

1 respuesta

La mejor respuesta

Puede poner esto en el bloque try y catch la excepción

try {
       val df: DataFrame = spark.read
          .format("csv")
          .option("delimiter", ",")
          .schema(schema)
          .load(inputPath + "/*.csv*")
    }catch (Exception e) {
    print("Do something else here")
    e.getMessage();
  }

O si desea verificar que existe csv, primero puede verificar la existencia del archivo

import java.nio.file.{Paths, Files}
exist = Files.exists(Paths.get(inputPath +  "/*.csv*"))
if (exist){
 val df: DataFrame = spark.read
          .format("csv")
          .option("delimiter", ",")
          .schema(schema)
          .load(inputPath + "/*.csv*")

 }

Si tiene varias rutas en inputPaths, por ejemplo, puede filtrarlas como

inputPaths.filter(f => Files.exists(Paths.get(f +  "/*.csv*")))

Para el sistema de archivos hdfs, puede reemplazar la lógica anterior con

Para un solo archivo

val conf = sc.hadoopConfiguration
val fs = org.apache.hadoop.fs.FileSystem.get(conf)
val exists = fs.exists(new org.apache.hadoop.fs.Path(inputPath +  "/*.csv*"))
if (exist){
     val df: DataFrame = spark.read
              .format("csv")
              .option("delimiter", ",")
              .schema(schema)
              .load(inputPath + "/*.csv*")
    
     }

Para múltiples ubicaciones de archivos almacenados en una matriz.

val conf = sc.hadoopConfiguration
val fs = org.apache.hadoop.fs.FileSystem.get(conf)


inputPaths.filter(f => fs.exists(new org.apache.hadoop.fs.Path(f +  "/*.csv*")))
1
A.B 26 ago. 2020 a las 19:37