Durante un par de días he estado luchando sobre cómo optimizar (no solo hacer que se vea mejor) los 3 bucles anidados que contienen un condicional y una llamada de función dentro. Lo que tengo ahora es lo siguiente:

def build_prolongation_operator(p,qs):
    '''
    p: dimension of the coarse basis
    q: dimension of the fine basis

    The prolongation operator describes the relationship between
    the coarse and fine bases:    
    V_coarse = np.dot(V_fine, I)
    '''

    q = sum(qs)

    I = np.zeros([q, p])

    for i in range(0, q):
        for j in range(0, p):
            for k in range(0, qs[j]):
                # if BV i is a child of j, we set I[i, j] = 1
                if i == f_map(j, k, qs):
                    I[i, j] = 1
                    break

    return I

Donde f_map es:

def f_map(i, j, q):
    '''
    Mapping which returns the index k of the fine basis vector which
    corresponds to the jth child of the ith coarse basis vector.    
    '''

    if j < 0 or j > q[i]:
        print('ERROR in f_map')
        return None

    result = j

    for k in range(0, i):
        result += q[k]

    return result

Al perfilar todo mi código, obtengo que build_prolongation_operator se llama 45 veces y f_map ¡aproximadamente 8,5 millones de veces!

Aquí está la foto:

picture

He tratado de hacer lo mismo con la comprensión de la lista y un mapa, pero sin suerte.

Aquí hay una muestra de las entradas que build_prolongation_operator espera:

p = 10
qs = randint(3, size=p)
2
Guillermo Alejandro Barraza Mo 13 sep. 2018 a las 13:26

4 respuestas

La mejor respuesta

Por un lado, realmente no necesita p como parámetro para su función: len(qs) solo necesita ser llamado una vez, y es extremadamente barato. Si su entrada siempre es una matriz numpy (y bajo las circunstancias no hay razón para que no lo sea), qs.size también lo hará.

Comencemos reescribiendo f_map. El ciclo allí es solo la suma acumulativa de qs (pero comenzando con cero), que puede calcular previamente una vez (o al menos una vez por llamada a la función externa).

def f_map(i, j, cumsum_q):
    return j + cumsum_q[i]

Donde cumsum_q se definiría en build_prolongation_operator como

cumsum_q = np.roll(np.cumsum(qs), 1)
cumsum_q[0] = 0

Estoy seguro de que podría apreciar la utilidad de tener el mismo conjunto de nombres de variables dentro de f_map que tiene en build_prolongation_operator. Para hacerlo aún más fácil, podemos eliminar f_map por completo y usar la expresión que representa en su condición:

if i == k + cumsum_q[j]:
    I[i, j] = 1

El bucle sobre k significa "si i es k + cumsum[j] para any k", establezca el elemento en 1. Si reescribimos la condición como i - cumsum_q[j] == k, puede ver que no necesitamos un bucle sobre k en absoluto. i - cumsum_q[j] será igual a algunos k en el rango [0, qs[j]) si no es negativo y estrictamente menor que qs[j]. Solo puedes verificar

if i >= cumsum_q[j] and i - cumsum_q[j] < qs[j]:
    I[i, j] = 1

Esto reduce su ciclo a una iteración por elemento de la matriz. No puedes hacerlo mejor que eso:

def build_prolongation_operator_optimized(qs):
    '''
    The prolongation operator describes the relationship between
    the coarse and fine bases:    
    V_coarse = np.dot(V_fine, I)
    '''
    qs = np.asanyarray(qs)
    p = qs.size
    cumsum_q = np.roll(np.cumsum(qs), 1)
    q = cumsum_q[0]
    cumsum_q[0] = 0

    I = np.zeros([q, p])

    for i in range(0, q):
        for j in range(0, p):
            # if BV i is a child of j, we set I[i, j] = 1
            if 0 <= i - cumsum_q[j] < qs[j]:
                I[i, j] = 1
    return I

Ahora que ya sabe la fórmula para cada celda, puede hacer que Numpy calcule la matriz completa por usted esencialmente en una línea usando la transmisión:

def build_prolongation_operator_numpy(qs):
    qs = np.asanyarray(qs)
    cumsum_q = np.roll(np.cumsum(qs), 1)
    q = cumsum_q[0]
    cumsum_q[0] = 0
    i_ = np.arange(q).reshape(-1, 1)  # Make this a column vector
    return (i_ >= cumsum_q) & (i_ - cumsum_q < qs)

Ejecuté una pequeña demostración para asegurar que (A) Las soluciones propuestas obtengan el mismo resultado que su original, y (B) funcionen más rápido:

In [1]: p = 10
In [2]: q = np.random.randint(3, size=p)

In [3]: ops = (
...     build_prolongation_operator(p, qs),
...     build_prolongation_operator_optimized(qs),
...     build_prolongation_operator_numpy(qs),
...     build_prolongation_operator_RaunaqJain(p, qs),
...     build_prolongation_operator_gboffi(p, qs),
... )

In [4]: np.array([[(op1 == op2).all() for op1 in ops] for op2 in ops])
Out[4]: 
array([[ True,  True,  True,  True,  True],
       [ True,  True,  True,  True,  True],
       [ True,  True,  True,  True,  True],
       [ True,  True,  True,  True,  True],
       [ True,  True,  True,  True,  True]])

In [5]: %timeit build_prolongation_operator(p, qs)
321 µs ± 890 ns per loop (mean ± std. dev. of 7 runs, 1000 loops each)
In [6]: %timeit build_prolongation_operator_optimized(qs)
75.1 µs ± 1.85 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
In [7]: %timeit build_prolongation_operator_numpy(qs)
24.8 µs ± 77.7 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
In [8]: %timeit build_prolongation_operator_RaunaqJain(p, qs)
28.5 µs ± 1.55 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
In [9]: %timeit build_prolongation_operator_gboffi(p, qs)
31.8 µs ± 772 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
In [10]: %timeit build_prolongation_operator_gboffi2(p, qs)
26.6 µs ± 768 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)

Como puede ver, la opción más rápida es la completamente vectorizada, pero @ RaunaqJain's y @gboffi las opciones están muy cerca en segundo lugar.

Nota

Mi solución vectorizada crea una matriz booleana. Si no desea eso, use I.astype(...) para convertir al tipo de letra deseado, o véalo como una matriz np.uint8: I.view(dtype=np.uint8).

1
Mad Physicist 13 sep. 2018 a las 14:12

No sé sobre bases y operadores de prolongación, pero debes enfocarte en el algoritmo mismo. Esto casi siempre es un buen consejo en lo que respecta a la optimización.

Probablemente este sea el quid, y si no, es algo para comenzar: el cálculo f_map no depende de i, pero lo estás volviendo a calcular para cada valor de i. Dado que i varía de cero a la suma de los valores en qs, guardará una cantidad enorme de recálculo almacenando en caché los resultados; google "python memoize" y prácticamente se escribirá solo. Solucione esto y probablemente haya terminado, sin micro optimizaciones.

Necesitará suficiente espacio para almacenar los valores de max(p) * max(qs[j]), pero a partir de la cantidad de llamadas que informe, esto no debería ser un gran obstáculo.

3
Mad Physicist 13 sep. 2018 a las 12:10

Intenta comprobar si esto funciona,

for j in range(0,p):
    for k in range(0, qs[j]):
        # if BV i is a child of j, we set I[i,j] = 1
        val = f_map(j,k,qs)
        if I[val, j] == 0:
            I[val, j] = 1
1
Mad Physicist 13 sep. 2018 a las 14:01

Aquí está el ciclo optimizado propuesto por Raunaq Jain en su respuesta

for j in range(0,p):
    for k in range(0, qs[j]):
        # if BV i is a child of j, we set I[i,j] = 1
            val = f_map(j,k,qs)
            if I[val, j] == 0:
                I[val, j] = 1

Y aquí está la función f_map, donde he editado los nombres de los argumentos para reflejar los nombres utilizados por la persona que llama

def f_map(j,k,qs):
    if k < 0 or k > qs[j]:
        print('ERROR in f_map')
        return None
    result = k
    for i in range(0, j):
        result += qs[i]
    return result

Comenzamos señalando que siempre es 0 ≤ k < qs[j], debido a la definición del bucle en k, para que podamos con seguridad eliminar el control de cordura y escribir

def f_map(j,k,qs):
    result = k
    for i in range(0, j):
        result += q[i]
    return result

Ahora, esta es la cadena de documentos del sum incorporado

Firma: suma (iterable, inicio = 0, /)
Docstring:
Devuelve la suma de un valor 'inicial' (predeterminado: 0) más un número iterable de números

Cuando el iterable está vacío, devuelve el valor inicial.
Esta función está diseñada específicamente para su uso con valores numéricos y puede rechazar tipos no numéricos.
Tipo: builtin_function_or_method

Es evidente que podemos escribir

def f_map(j,k,qs):
    return sum(qs[:j], k)

Y es evidente también que podemos hacer sin la llamada a la función

for j in range(0,p):
    for k in range(0, qs[j]):
        # if BV i is a child of j, we set I[i,j] = 1
            val = sum(qs[:j], k)
            if I[val, j] == 0:
                I[val, j] = 1

Llamar a una función integrada debería ser más eficiente que una llamada de función y un bucle, ¿no?


Abordar el comentario del físico loco

Podemos calcular previamente las sumas parciales de qs para acelerar aún más

sqs = [sum(qs[:i]) for i in range(len(qs))] # there are faster ways...
...
for j in range(0,p):
    for k in range(0, qs[j]):
        # if BV i is a child of j, we set I[i,j] = 1
            val = k+sqs[j]
            if I[val, j] == 0:
                I[val, j] = 1
1
Mad Physicist 13 sep. 2018 a las 14:06