ingrese la descripción de la imagen aquí  ingrese la descripción de la imagen aquí Tengo una aplicación que monitorea la carpeta FTP para un archivo CSV específico foo.csv, una vez que se encuentra el archivo Lo tira de mi local y genera un nuevo formato de salida bar.csv, la aplicación luego enviará el nuevo archivo bar.csv a la carpeta FTP y borrarálo de local.

Ahora quiero introducir un proceso que lea bar.csv e insértelo en una tabla de base de datos antes de enviarlo nuevamente en el servidor FTP.

Supongo que esto se podría hacer usando la integración por lotes de primavera, pero no pude encontrar cómo hacerlo.

A continuación se muestra mi código de solicitud de referencia y sugerencias.

public IntegrationFlow localToFtpFlow(Branch myBranch) {

    return IntegrationFlows.from(Files.inboundAdapter(new File(myBranch.getBranchCode()))
                    .filter(new ChainFileListFilter<File>()
                            .addFilter(new RegexPatternFileListFilter("final" + myBranch.getBranchCode() + ".csv"))
                            .addFilter(new FileSystemPersistentAcceptOnceFileListFilter(metadataStore(dataSource), "foo"))),//FileSystemPersistentAcceptOnceFileListFilter
            e -> e.poller(Pollers.fixedDelay(10_000)))
            .enrichHeaders(h ->h.headerExpression("file_originalFile", "new java.io.File('"+ myBranch.getBranchCode() +"/FEFOexport" + myBranch.getBranchCode() + ".csv')",true))
            .transform(p -> {
                LOG.info("Sending file " + p + " to FTP branch " + myBranch.getBranchCode());
                return p;
            })

            .log()
            .transform(m -> {
                        this.defaultSessionFactoryLocator.addSessionFactory(myBranch.getBranchCode(),createNewFtpSessionFactory(myBranch));
                        LOG.info("Adding factory to delegation");
                        return m;
            })

            .publishSubscribeChannel(s ->
                    s.subscribe(f -> f.transform(fileMessageToJobRequest()).handle(jobLaunchingGateway()).channel("nullChannel"))
                    .subscribe(h -> h.handle(Ftp.outboundAdapter(createNewFtpSessionFactory(myBranch), FileExistsMode.REPLACE)
                                     .useTemporaryFileName(true)
                                     .autoCreateDirectory(false)
                                     .remoteDirectory(myBranch.getFolderPath()), e -> e.advice(expressionAdvice()))))
            /*.handle(Ftp.outboundAdapter(createNewFtpSessionFactory(myBranch), FileExistsMode.REPLACE)
                    .useTemporaryFileName(true)
                    .autoCreateDirectory(false)
                    .remoteDirectory(myBranch.getFolderPath()), e -> e.advice(expressionAdvice()))*/
            .get();
}

@Bean
public FileMessageToJobRequest fileMessageToJobRequest(){
    FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
    fileMessageToJobRequest.setFileParameterName("input.file.name");
    fileMessageToJobRequest.setJob(orderJob);
    return fileMessageToJobRequest;
}

@Bean
public JobLaunchingGateway jobLaunchingGateway() {
    SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
    //simpleJobLauncher.setJobRepository(jobRepository);
    simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
    JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);

    return jobLaunchingGateway;
}

Mensaje de archivo a clase de trabajo

public class FileMessageToJobRequest {

    private Job job;
    private String fileParameterName;

    public void setFileParameterName(String fileParameterName) {
        this.fileParameterName = fileParameterName;
    }

    public void setJob(Job job) {
        this.job = job;
    }

    @Transformer
    public JobLaunchRequest toRequest(Message<File> message) {
        JobParametersBuilder jobParametersBuilder =
                new JobParametersBuilder();

        jobParametersBuilder.addString(fileParameterName,
                message.getPayload().getAbsolutePath());//message.getPayload().getAbsolutePath()

        return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
    }
}

Error que estoy recibiendo cuando estoy agregando un flujo en tiempo de ejecución.

2019-07-02 12:42:49.292  INFO 7476 --- [ask-scheduler-4] o.s.i.file.FileReadingMessageSource      : Created message: [GenericMessage [payload=BEY\finalBEY.csv, headers={file_originalFile=BEY\finalBEY.csv, id=38827020-c9c1-aa35-526c-cc6848ca5e11, file_name=finalBEY.csv, file_relativePath=finalBEY.csv, timestamp=1562060569292}]]
2019-07-02 12:42:49.292  INFO 7476 --- [ask-scheduler-4] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=BEY\finalBEY.csv, headers={file_originalFile=BEY\FEFOexportBEY.csv, id=abbe6169-7f46-fcb9-be9e-44533ab05ec8, file_name=finalBEY.csv, file_relativePath=finalBEY.csv, timestamp=1562060569292}]
2019-07-02 12:42:49.295 ERROR 7476 --- [ask-scheduler-4] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [jobLaunchingGateway]; nested exception is java.lang.NullPointerException, failedMessage=GenericMessage [payload=JobLaunchRequest: orderJob, parameters={input.file.name=C:\Java Programs\spring4ftpappftp\BEY\finalBEY.csv}, headers={file_originalFile=BEY\FEFOexportBEY.csv, id=f7b2560e-e435-009b-2b8e-27ef168a6767, file_name=finalBEY.csv, file_relativePath=finalBEY.csv, timestamp=1562060569293}]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:184)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:175)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:224)
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:180)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:220)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:277)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.lambda$run$0(AbstractPollingEndpoint.java:378)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:53)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:372)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
    at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:98)
    at org.springframework.batch.integration.launch.JobLaunchingMessageHandler.launch(JobLaunchingMessageHandler.java:50)
    at org.springframework.batch.integration.launch.JobLaunchingGateway.handleRequestMessage(JobLaunchingGateway.java:76)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
    ... 100 more
0
Elias Khattar 28 jun. 2019 a las 11:22

1 respuesta

La mejor respuesta

Debe considerar usar un publishSubscribeChannel() en su definición IntegtrationFlow. Coloque como un suscriptor su .handle(Ftp.outboundAdapter()) (sería mejor como segundo). Y como el primero, debe ser algo como .handle(jobLaunchingGateway).channel("nullChannel").

Puede leer sobre JobLaunchingGateway en el lote de primavera Manual de referencia.

El punto es que desea enviar el mismo mensaje a varios lugares. Entonces, PublishSubscribeChannel es la mejor manera de ir: https://docs.spring.io/spring-integration/docs/5.1.6.release/reference/html/#java-dsl-subflows

Le sugiero que tenga aquellos suscriptores exactamente en ese orden, ya que realmente desea almacenar en DB antes de enviarlo a FTP. Sin un executor, el segundo suscriptor esperará hasta que finalice su trabajo.

Que .channel("nullChannel") es necesario allí porque JobLaunchingGateway es realmente una puerta de enlace y devuelve un JobExecution como respuesta. Ya que no estás interesado en eso, solo necesitas ignorarte. Por supuesto, puede tener otro handle() después de esa puerta de entrada para procesar este JobExecution de alguna manera. El punto no devuelve nada del suscriptor de puño como respuesta. Va a frenar su flujo principal de alguna manera.

ACTUALIZACIÓN

Creo que .transform(fileMessageToJobRequest()) debe ir al primer suscriptor a continuación, justo antes de jobLaunchingGateway:

 .publishSubscribeChannel(s ->
                s.subscribe(f -> f.transform(fileMessageToJobRequest()).handle(jobLaunchingGateway()).channel("nullChannel"))
                .subscribe(h -> h.handle(Ftp.outboundAdapter(createNewFtpSessionFactory(myBranch), ...))

El punto es que le gustaría enviar el mismo archivo a la siguiente handle(), pero después de ese transformador aguas arriba se cambiará en JobLaunchRequest, que es bueno para jobLaunchingGateway, pero no es para Ftp.outboundAdapter().

Tu excepción:

Caused by: java.lang.NullPointerException
    at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:98)

Conduce a esta pieza de código: Assert.notNull(jobParameters, "The JobParameters must not be null.");. Entonces, de alguna manera, que jobParametersBuilder.toJobParameters() se evalúa a null.

No puedo ayudarte con tanto código personalizado.

ACTUALIZACIÓN2

Está bien. Parece que su problema está en la definición de frijol JobLaunchingGateway. Usted hace un new SimpleJobLauncher() explícito sin una inyección JobRepository.

Parece que hay un BasicBatchConfigurer en el bota de resorte, que se puede inyectar en este JobLaunchingGateway. Por eso pasaremos un NPE. Hay otros errores después, pero esa es una historia diferente ...

2
Artem Bilan 10 jul. 2019 a las 20:15