Almaceno QuertyText dentro de un marco de datos de pandas. Una vez que haya cargado todas las consultas, quiero volver a realizar un análisis en cada consulta. Actualmente, tengo ~ 50k para evaluar. Entonces, hacerlo uno por uno, llevará mucho tiempo.

Entonces, quería implementar concurrent.futures. ¿Cómo tomo el QueryText individual almacenado en fullAnalysis para pasarlo a concurrent.futures y devolver el resultado como una variable?

Aquí está mi código completo:

import pandas as pd
import time
import gensim
import sys
import warnings

from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed

fullAnalysis = pd.DataFrame()

def fetch_data(jFile = 'ProcessingDetails.json'):
    print("Fetching data...please wait")

    #read JSON file for latest dictionary file name
    baselineDictionaryFileName = 'Dictionary/Dictionary_05-03-2020.json'

    #copy data to pandas dataframe
    labelled_data = pd.read_json(baselineDictionaryFileName)

    #Add two more columns to get the most similar text and score
    labelled_data['SimilarText'] = ''
    labelled_data['SimilarityScore'] = float()

    print("Data fetched from " + baselineDictionaryFileName + " and there are " + str(labelled_data.shape[0]) + " rows to be evalauted")

    return labelled_data


def calculateScore(inputFunc):
    warnings.filterwarnings("ignore", category=DeprecationWarning) 

    model = gensim.models.Word2Vec.load('w2v_model_bigdata')

    inp = inputFunc
    print(inp)
    out = dict()

    strEvaluation = inp.split("most_similar ",1)[1]

    #while inp != 'quit':
    split_inp = inp.split()

    try:
        if split_inp[0] == 'help':
            pass
        elif split_inp[0] == 'similarity' and len(split_inp) >= 3:
            pass
        elif split_inp[0] == 'most_similar' and len(split_inp) >= 2:
            for pair in model.most_similar(positive=[split_inp[1]]):
                out.update({pair[0]: pair[1]})

    except KeyError as ke:
        #print(str(ke) + "\n")
        inp = input()
    return out

def main():
    with ThreadPoolExecutor(max_workers=5) as executor:
        for i in range(len(fullAnalysis)):
            text = fullAnalysis['QueryText'][i]
            arg = 'most_similar'+ ' ' + text
            #for item in executor.map(calculateScore, arg):
            output = executor.map(calculateScore, arg)

    return output

if __name__ == "__main__":
    fullAnalysis = fetch_data()
    results = main()
    print(f'results: {results}')
0
emie 5 may. 2020 a las 13:13

2 respuestas

La mejor respuesta

El Python Global Interpreter Lock o GIL permite que solo un hilo mantenga el control del intérprete de Python. Dado que su función calculateScore puede estar vinculada a la CPU y requiere que el intérprete ejecute su código de bytes, puede ganar poco al usar subprocesos. Si, por otro lado, realizara principalmente operaciones de E / S, estaría renunciando al GIL durante la mayor parte de su tiempo de ejecución permitiendo que se ejecuten otros subprocesos. Pero ese no parece ser el caso aquí. Probablemente deberías estar usando ProcessPoolExecutor de concurrent.futures (pruébalo en ambos sentidos y mira):

def main():
    with ProcessPoolExecutor(max_workers=None) as executor:
        the_futures = {}
        for i in range(len(fullAnalysis)):
            text = fullAnalysis['QueryText'][i]
            arg = 'most_similar'+ ' ' + text
            future = executor.submit(calculateScore, arg)
            the_futures[future] = i # map future to request
        for future in as_completed(the_futures): # results as they become available not necessarily the order of submission
            i = the_futures[future] # the original index
            result = future.result() # the result

Si omite el parámetro max_workers (o especifica un valor de None) del constructor ProcessPoolExecutor, el valor predeterminado será el número de procesadores que tiene en su máquina (no es un mal valor predeterminado) . No tiene sentido especificar un valor mayor que la cantidad de procesadores que tiene.

Si no necesita vincular el futuro con la solicitud original, entonces the_futures puede ser una lista a la cual, pero lo más simple es ni siquiera molestarse en usar el método as_completed:

def main():
    with ProcessPoolExecutor(max_workers=5) as executor:
        the_futures = []
        for i in range(len(fullAnalysis)):
            text = fullAnalysis['QueryText'][i]
            arg = 'most_similar'+ ' ' + text
            future = executor.submit(calculateScore, arg)
            the_futures.append(future)
        # wait for the completion of all the results and return them all:
        results = [f.result() for f in the_futures()] # results in creation order
        return results 

Cabe mencionar que el código que inicia las funciones ProcessPoolExecutor debe estar en un bloque gobernado por un if __name__ = '__main__':. Si no es así, entrará en un ciclo recursivo con cada subproceso lanzando el ProcessPoolExecutor. Pero ese parece ser el caso aquí. ¿Quizás quiso usar ProcessPoolExecutor todo el tiempo?

Además:

No sé cuál es la línea ...

model = gensim.models.Word2Vec.load('w2v_model_bigdata')

... en la función calculateStore sí. Puede ser la única declaración de enlace de E / S. Pero esto parece ser algo que no varía de una llamada a otra. Si ese es el caso y model no se está modificando en la función, ¿no debería esta declaración salir de la función y calcularse solo una vez? Entonces, esta función se ejecutará claramente más rápido (y estará claramente vinculada a la CPU).

Además:

El bloque de excepción ...

except KeyError as ke:
    #print(str(ke) + "\n")
    inp = input()

... es desconcertante. Está ingresando un valor que nunca se usará justo antes de regresar. Si esto es para pausar la ejecución, no se emite ningún mensaje de error.

0
Booboo 5 may. 2020 a las 11:20

Con la asistencia de Booboo, pude actualizar el código para incluir ProcessPoolExecutor. Aquí está mi código actualizado. En general, el procesamiento se ha acelerado en más del 60%.

Me encontré con un problema de procesamiento y encontré este tema BrokenPoolProcess que aborda el problema.

output = {}
thePool = {}

def main(labelled_data, dictionaryRevised):

    args = sys.argv[1:]

    with ProcessPoolExecutor(max_workers=None) as executor:
        for i in range(len(labelled_data)):
            text = labelled_data['QueryText'][i]
            arg = 'most_similar'+ ' '+ text

            output = winprocess.submit(
            executor, calculateScore, arg
            )
            thePool[output] = i  #original index for future to request


        for output in as_completed(thePool): # results as they become available not necessarily the order of submission
            i = thePool[output] # the original index
            text = labelled_data['QueryText'][i]
            result = output.result() # the result

            maximumKey = max(result.items(), key=operator.itemgetter(1))[0]
            maximumValue = result.get(maximumKey)

            labelled_data['SimilarText'][i] = maximumKey
            labelled_data['SimilarityScore'][i] = maximumValue


    return labelled_data, dictionaryRevised

if __name__ == "__main__":
    start = time.perf_counter()

    print("Starting to evaluate Query Text for labelling...")

    output_Labelled_Data, output_dictionary_revised = preProcessor()

    output,dictionary = main(output_Labelled_Data, output_dictionary_revised)


    finish = time.perf_counter()
    print(f'Finished in {round(finish-start, 2)} second(s)')
0
emie 6 may. 2020 a las 20:46