El siguiente código me muestra leyendo un archivo de uno en uno en una carpeta. Quiero leer 10 archivos a la vez, ejecutar un METHOD_A y escribir en otra carpeta. Hay 5000 archivos en la carpeta. Leer el archivo 1 a la vez es extremadamente lento. Quiero acelerarlo

Estoy usando Java Spring Boot. ¿Alguna sugerencia de cómo puedo hacerlo?

for (int i = 0; i < files.length; i++){
     Object obj = parser.parse(new FileReader(files[i]));
     JSONObject obj1 = METHOD_A(obj);

        try{
            PrintWriter writer = new PrintWriter(...);
            writer.println(obj1);
            writer.close();
        } catch (IOException e) {
            // do something
    }
}

Gracias de antemano.

2
Gavy 18 oct. 2017 a las 15:21

3 respuestas

La mejor respuesta

¡Si tu puedes! , Ejemplo;

    ExecutorService executorService = Executors.newFixedThreadPool(/*Number of Threads*/);


    for (int i = 0; i < /* Number of Requests */; i++) {
        WorkerThread wt = context.getBean(WorkerThread.class, String.valueOf(i));
        tasks.add(wt);
    }

    List<Future<String>> futures = executorService.invokeAll(tasks);

    for (Future<String> future : futures) {
        String result = future.get(10000, TimeUnit.MILLISECONDS);
    }

    executorService.shutdown();
1
Habil 18 oct. 2017 a las 12:32

La gestión de subprocesos y tareas debe considerarse para el trabajo vinculado a la CPU, que no es su caso. Para la entrada / salida, debe usar IO sin bloqueo, como AsynchronousFileChannel. El RxIo simplifica el uso de AFC y puede lograr su objetivo a través del AsyncFiles API, como:

AsyncFiles
      .readAll(files[i])
      .thenApply(parser::parse)
      .thenApply(obj -> obj.toString().getBytes())
      .thenCompose(bytes -> AsyncFiles.writeBytes(..., bytes))
      .thenAccept(index ->     /* invoked on completion */)
      .exceptionally(excep -> /* invoked on error*/

Por lo general, para administrar las E / S asincrónicas, debe usar E / S sin bloqueo, lo que evita administrar explícitamente grupos de subprocesos y bloquear subprocesos. Dado que la propuesta de respuesta aceptada es el bloqueo de hilos para obtener el resultado, ¿cuántos hilos configurará en newFixedThreadPool(size) para lograr la concurrencia?

Tal vez, el tamaño del grupo de subprocesos podría ser igual al número de archivos que se están leyendo. ¿Quizás 10 en tu caso?

Pero luego, en la propuesta WorkerThread, está bloqueando el hilo al leer y analizar. No conozco el tipo de parser, pero dado que le estás dando un bloqueo FileReader, entonces parser.parse(…) se bloqueará con seguridad. Finalmente, bloqueará nuevamente en writer.println().

La siguiente imagen muestra la diferencia entre la propuesta de respuesta aceptada correspondiente a " síncrono, dos hilos de control " y mi propuesta correspondiente a " asíncrono "

eloquent javascript

0
Miguel Gamboa 8 may. 2020 a las 12:10

En función de sus requisitos, he agregado los siguientes códigos para usted. Pruebe con esto y vea si funciona para usted. Tenga en cuenta que su servidor también debe tener un número suficiente de procesadores para procesos concurrentes. Si aún confunde, vea un programa de ejemplo al final: -

    private int numberOfFileProcessed = 0;
    private int numberOfThreadAlive = 0 ;
    private int numberOfThreadAlive = 0;
    private int numberOfThreadToBeAllowed = 10;//Change this value to control number of thread to be run concurrently

    for (int i = 0; i < files.length; i++){
     Object obj = parser.parse(new FileReader(files[i]));
     JSONObject obj1 = METHOD_A(obj);
     try{
            Thread t = new Thread(new ReadFiles(obj1));
            t.start();
            numberOfThreadAlive++;
        }catch (Exception e) {
        //do something  
       }

      while(numberOfThreadAlive > numberOfThreadToBeAllowed){//This while loop will control number of thread to be not more than 10
        try{Thread.sleep(100);}catch(Exception e){}//Release the processor    
      }
  }

  private final synchronized void jobCompleted(){       
            numberOfFileProcessed++;
            numberOfThreadAlive--;      
  }

  while(numberOfFileProcessed < files.length){
    //wait till last thread complete it's task
    //I am not using thread.join() for performance 
    try{Thread.sleep(100);}catch(Exception e){}//Release the processor
  }



private class ReadFiles implements Runnable  {
        JSONObject jobj;
        public ReadFiles(JSONObject obj) {
            jobj = obj;
        }
        @SuppressWarnings("unchecked")
        public void run() {
          try{      
            PrintWriter writer = new PrintWriter(...);
            writer.println(jobj);
            writer.close();
            jobCompleted();
           } catch (IOException e) {
            // do something
          }
        }

    }

A continuación hay un archivo de prueba que puede usar para comprender

        package com.test.threadtest;

    public final class ThreadTest {
        private int numberOfFileProcessed = 0;
        private int numberOfThreadAlive = 0 ;   
        int numberOfThreadToBeAllowed = 10;

        public void processFiles(){

            for (int i = 0; i < 50; i++){
                try{
                      Thread t = new Thread(new ReadFiles(i));
                      t.start();
                      numberOfThreadAlive++;
                 }catch (Exception e) {
                    //do something  
                 }

                  while(numberOfThreadAlive > numberOfThreadToBeAllowed){//This while loop will control number of thread to be not more than 10
                    try{Thread.sleep(100);}catch(Exception e){}//Release the processor
                    System.out.println("Reached maximum");
                  }
            }  
              while(numberOfFileProcessed < 50){
                //wait till last thread complete it's task
                //I am not using thread.join() for performance 
                  System.out.println("Number of file processed :" + numberOfFileProcessed);
                try{Thread.sleep(100);}catch(Exception e){}
              }
        }

        private final synchronized void jobCompleted(){     
                numberOfFileProcessed++;
                System.out.println("numberOfFileProcessed :" + numberOfFileProcessed);
                numberOfThreadAlive--;      
        }

        public static void main(String[] args) {
            // TODO Auto-generated method stub
            ThreadTest test = new ThreadTest();
            test.processFiles();
            System.out.println("Exit from the process");
            System.exit(0);
        }

        private class ReadFiles implements Runnable  {
            int i;
            public ReadFiles(int val) {
                i = val;
            }
            @SuppressWarnings("unchecked")
            public void run() {
              try{      
                System.out.println("I am Thread : " + i);
                Thread.sleep(1000);
                jobCompleted();
               } catch (Exception e) {
                // do something
              }
            }

        }
    }
0
Abhijit Pritam Dutta 18 oct. 2017 a las 16:07