La mayoría de mis guiones hacen algo como lo siguiente.

spark.read().csv("s3://")
  .filter(..).map(...)
  .write().parquet("s3://");

¿Hay alguna forma de especificar que Spark, quiero que todo este trabajo se haga en la memoria, ya que no hay agregación, agrupación dentro de mi procesamiento? Debe ser un registro simple por procesador de flujo de registros que no toque el disco en absoluto.

0
ForeverConfused 30 oct. 2017 a las 02:34

2 respuestas

La mejor respuesta

No puedo hablar por EMR y su conector s3. Puedo hablar en nombre de Apache Hadoop y del conector S3A

Necesitamos almacenar en búfer los datos generados antes de cargarlos en S3. No puede hacer una transmisión () seguida de un cierre (), porque para archivos grandes necesita dividir la carga en archivos de 4 + GB, e incluso para cargas más pequeñas, debe lidiar con la condición común de generación de aplicaciones datos más rápido de lo que puede cargar en S3.

El uso del almacenamiento temporal local transitorio le brinda la capacidad de generar datos más rápido de lo que puede manejar el ancho de banda de carga de su S3 y hacer frente a los errores de red reenviando bloques.

Los clientes s3: y s3n: originales de Apache (y s3a antes de Hadoop 2.8) escribieron todo el archivo en el disco duro antes de iniciar la carga. El almacenamiento que necesitaba era el mismo que el número de bytes generados, y como solo se cargó en close (), el tiempo para esa llamada cercana es datos / ancho de banda.

S3A en Hadoop 2.8+ admite carga rápida (2.8+ opcional, automático en 3.0), donde los datos se almacenan en búfer al tamaño de un solo bloque (5+ MB, por defecto 64 MB), y la carga comienza tan pronto como se alcanza el tamaño del bloque. Esto hace que las escrituras sean más rápidas, con suficiente ancho de banda casi no hay retraso de close () (máximo: último tamaño de bloque / ancho de banda). Todavía necesita almacenamiento para hacer frente a la falta de coincidencia entre las tasas de generación y carga, aunque puede configurarlo para usarlo en matrices de bytes de pila o búferes de bytes fuera de pila. Haga eso y tendrá que jugar con mucho cuidado con las asignaciones de memoria y los tamaños de las colas: debe configurar el cliente para bloquear los escritores cuando la cola de cargas pendientes sea lo suficientemente grande.

Actualización Johnathan Kelly @ AWS ha confirmado que hacen lo mismo por bloque de búfer y carga como el conector ASF S3A. Esto significa que si su tasa de generación de datos en bytes / seg <= cargar ancho de banda desde la VM, entonces la cantidad de disco local necesaria es mínima ... si genera datos más rápido, entonces necesitará más (y eventualmente se quedará sin disco o alcanzar algunos límites de cola para bloquear los hilos del generador). No voy a citar ningún número sobre el ancho de banda real, ya que invariablemente mejora año tras año y cualquier declaración pronto quedará obsoleta. Por esa razón, mire la edad de cualquier publicación de puntos de referencia antes de creer. Realice sus propias pruebas con sus propias cargas de trabajo.

2
stevel 31 oct. 2017 a las 10:08

Sí, spark.read (). csv ("s3: //") funciona en emr spark.

Tuve problemas cuando intento escribir directamente en s3 teniendo activa una vista coherente de clúster.

http://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-consistent-view.html

1
dnocode 23 nov. 2017 a las 19:22