Mi canalización almacena los datos de salida, archivo, a GCS. Me gustaría comprimir este archivo. TextIO ha descomprimido el archivo que está comprimido, pero supongo que no tiene comprimir el archivo. ¿Cómo puedo comprimir el archivo de salida?

1
Tadayasu Yotsu 28 dic. 2016 a las 03:31

3 respuestas

La mejor respuesta

Actualmente se trata de una solicitud de función abierta para DataFlow, sin embargo, el trabajo ya se ha realizado en Haz. Una vez que se lanza Dataflow 2.0 (que se basará en Beam), esto debería ser oficialmente compatible.

Dicho esto, he podido escribir archivos GZIP comprimidos extendiendo el clase FileBasedSink y utilizando el trabajo de Jeff Payne sobre esta función en Beam.

public class GZIPSink<T> extends FileBasedSink<T>  {
    private final Coder<T> coder;

    GZIPSink(String baseOutputFilename, Coder<T> coder) {
        super(baseOutputFilename, ".gz");
        this.coder = coder;
    }

    @Override
    public FileBasedWriteOperation createWriteOperation(PipelineOptions pipelineOptions) {
        return new GZIPWriteOperation(this, coder);
    }

    static class GZIPWriteOperation<T> extends FileBasedSink.FileBasedWriteOperation<T> {
        private final Coder<T> coder;

        private GZIPWriteOperation(GZIPSink<T> sink, Coder<T> coder) {
            super(sink);
            this.coder = coder;
        }

        @Override
        public FileBasedWriter createWriter(PipelineOptions pipelineOptions) throws Exception {
            return new GZIPBasedWriter(this, coder);
        }
    }

    static class GZIPBasedWriter<T> extends FileBasedSink.FileBasedWriter <T> {
        private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
        private final Coder<T> coder;
        private GZIPOutputStream out;

        public GZIPBasedWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder) {
            super(writeOperation);
            this.mimeType = MimeTypes.BINARY;
            this.coder = coder;
        }

        @Override
        protected void prepareWrite(WritableByteChannel channel) throws Exception {
            out = new GZIPOutputStream(Channels.newOutputStream(channel), true) {{
                def.setLevel(def.BEST_COMPRESSION);
            }};
        }

        @Override
        public void write(T value) throws Exception {
            coder.encode(value, out, Coder.Context.OUTER);
            out.write(NEWLINE);
        }

        @Override
        public void writeFooter() throws IOException {
            out.finish();
        }
    }
}     

Y luego hacer la escritura:

aStringPCollection.apply(Write.to(new GZIPSink("gs://path/sharded-filename", StringUtf8Coder.of()));

1
Thang 4 ene. 2017 a las 21:54

TextIO solo admite la lectura de archivos comprimidos. No admite escribir archivos con compresión.

https://cloud.google.com/dataflow/model/text-io#reading-from-compressed-text-files

Actualmente, TextIO no admite la escritura en archivos comprimidos.

Más información:

1
Community 23 may. 2017 a las 12:17

Como Thang mencionó, ahora esto es posible en la versión 2 de SDK de la viga agregando .withCompression(Compression.GZIP):

// Without Compression:
pcollection.apply(TextIO.write().to("/path/to/file.txt"));

// With Compression:
pcollection.apply(TextIO.write().to("/path/to/file.txt"))
      .withSuffix(".txt")
      .withCompression(Compression.GZIP));

El ejemplo completo proporcionado se puede encontrar en los documentos

0
Will 22 ago. 2018 a las 09:40