Obteniendo un error OutOfMemory para el siguiente código de PySpark: (falla después de que se escribe un cierto número de filas. Esto no sucede si intento escribir en el sistema de archivos hadoop en lugar de usar s3a, así que creo que lo he reducido al problema es s3a.) - objetivo final para escribir en s3a. Me preguntaba si había una configuración óptima de s3a en la que no me quedara sin memoria para tablas extremadamente grandes.

df = spark.sql("SELECT * FROM my_big_table")
df.repartition(1).write.option("header", "true").csv("s3a://mycsvlocation/folder/")

Mis configuraciones de s3a (emr predeterminado):

('fs.s3a.attempts.maximum', '10')
('fs.s3a.buffer.dir', '${hadoop.tmp.dir}/s3a')
('fs.s3a.connection.establish.timeout', '5000')
('fs.s3a.connection.maximum', '15')
('fs.s3a.connection.ssl.enabled', 'true')
('fs.s3a.connection.timeout', '50000')
('fs.s3a.fast.buffer.size', '1048576')
('fs.s3a.fast.upload', 'true')
('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
('fs.s3a.max.total.tasks', '1000')
('fs.s3a.multipart.purge', 'false')
('fs.s3a.multipart.purge.age', '86400')
('fs.s3a.multipart.size', '104857600')
('fs.s3a.multipart.threshold', '2147483647')
('fs.s3a.paging.maximum', '5000')
('fs.s3a.threads.core', '15')
('fs.s3a.threads.keepalivetime', '60')
('fs.s3a.threads.max', '256')
('mapreduce.fileoutputcommitter.algorithm.version', '2')
('spark.authenticate', 'true')
('spark.network.crypto.enabled', 'true')
('spark.network.crypto.saslFallback', 'true')
('spark.speculation', 'false')

Base de la traza de la pila:

Caused by: java.lang.OutOfMemoryError
        at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
        at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
        at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
        at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
        at org.apache.hadoop.fs.s3a.S3AFastOutputStream.write(S3AFastOutputStream.java:194)
        at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:60)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
        at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282)
        at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125)
        at java.io.OutputStreamWriter.write(OutputStreamWriter.java:207)
        at com.univocity.parsers.common.input.WriterCharAppender.writeCharsAndReset(WriterCharAppender.java:152)
        at com.univocity.parsers.common.AbstractWriter.writeRow(AbstractWriter.java:808)
        ... 16 more
2
ProgrammingUnicorn 29 ago. 2020 a las 21:29

1 respuesta

La mejor respuesta

El problema aquí es que la carga s3a predeterminada no admite la carga de un archivo grande singular de más de 2 GB o 2147483647 bytes.

('fs.s3a.multipart.threshold', '2147483647')

Mi versión de EMR es más antigua que las más recientes, por lo que el parámetro multipart.threshold es solo un número entero, por lo que el límite es 2147483647 bytes, para una sola "parte" o archivo. Las versiones más recientes usan long en lugar de int y pueden admitir un límite de tamaño de archivo único más grande.

Usaré una solución alternativa para escribir el archivo en hdfs locales y luego moverlo a s3 a través de un programa java separado.

1
ProgrammingUnicorn 1 sep. 2020 a las 03:07