Estaba siguiendo este tutorial y pude paralelizar un bucle for donde las operaciones se realizaron de forma independiente en varios archivos. Sin embargo, ahora necesito realizar una función iterativa para extraer variables en 300 archivos desde un único conjunto de datos de matriz x (fuente). Mis funciones se ven así:

def build_data(parameters)
    extract var1 from source
    extract var2 from source
    extract var3 from source
    ..........
    return('finished')

He configurado 6 trabajadores y para ejecutar esta función en un bucle 300 veces, implementé:

source = xr.opendataset()
build_delayed = []
for i in range (0,300):
   task = dask.delayed (build_data) (params)
   build_delayed.append(task)
dask.compute(*build_delayed)

Pero esto no funciona y parece que la latencia se está acumulando ya que todos los trabajadores intentan acceder a la misma pieza de datos. Además, cuando finaliza el ciclo, se devuelve None en build_delayed [].

¿Que me estoy perdiendo aqui? ¿Cómo hago que funcione en paralelo en este caso?

(Mi pregunta detallada).

EDITAR: MVCE:

stations = []
for key in valid_stations:
    station = pd.read_csv('~/projects/LIS_verification/pyLISF/data/IFI-Observations/flow_2/'+key+'.csv')
    station.reset_index(drop = True,inplace=True)
    station.set_index('Date',inplace=True)
    station.index = pd.to_datetime(station.index)
    stations.append(station)

routedat = xr.opendataset('/LDAS/02-ILDAS/OUTPUT/EXP002/SURFACEMODEL/file1.nc')
    
def build_data(stations,routedat,i) :
    try :
        start_date = stations[i]['Streamflow (cumecs)'].first_valid_index()
        lis_date = routedat['time'].values
        return (start_date,lis_date)
    except Exception as e :
        return( start_date,str(e))
    
for i in range (0,10):
    task = dask.delayed (build_data) (stations,routedat,i)
    build_delayed.append(task)

dask.compute(*build_delayed)

Obtengo la salida requerida, pero el tiempo necesario es demasiado grande en comparación con el ciclo secuencial (508 ms frente a 31 segundos).

ACTUALIZACIÓN: pude ejecutarlo con éxito en <300 ms en paralelo usando el comando .compute(scheduler='synchronous')

1
Bhanu Magotra 20 jul. 2020 a las 10:27

1 respuesta

La mejor respuesta

Esto es solo una suposición, pero ¿intentó crear una función de reducción y calcularla?

Algo así como:

@dask.delayed
def delayed_reduce(array):
    out = ''
    for entry in array:
        print (entry)
        out = out + str(entry) + ' '
    return out

build_delayed = []
for i in range (0,300):
   task = dask.delayed (build_data) (params)
   build_delayed.append(task)
out = delayed_reduce(build_delayed) 
value = out.compute()

Esto es solo el esqueleto del código, pero entiendes la idea. Si desea depurar lo que está sucediendo, agregue un valor de retorno diferente y también imprímalo en algún archivo.

Además, no mencionó cómo ejecuta dask. ¿Estás usando distribuido? Si lo hace, asegúrese de haber inicializado los trabajadores y el programador.

Con suerte, este código básico le ayudará a localizar el problema.

0
Jacob Barhak 21 jul. 2020 a las 00:00