Actualmente tengo problemas con Joblib ejecutando multiprocesamiento o un programa paralelo. Pude hacer que esto funcionara antes, y estaba llegando a tiempos de 1 minuto en total, sin embargo, fui y cambié mucho, y arruiné algo. Publiqué el código básico, ya que recibo el mismo error. Estoy tratando de recorrer los 150 símbolos bursátiles y usar Yahoo Finance para recibir la cadena de opciones para cada uno. Estoy tratando de hacer esto en un minuto. También he probado otras bibliotecas como asyncio, y no he tenido éxito con eso. Cualquier recomendación sería muy apreciada.

import yfinance as yf


def background(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)

    return wrapped

done = []
@background
def downloadChain(ticker):
    print(ticker)
    df = pd.DataFrame()
    daysOut = 100
    chain = 0
    try:
        yf_ticker = yf.Ticker(ticker)
        expiration_dates = yf_ticker.options
        for expiration_date in expiration_dates:
            if (datetime.fromisoformat(expiration_date) - datetime.now()).days <= daysOut:
                try:
                    chain = yf_ticker.option_chain(expiration_date)
                    df = df.append(chain)

                except Exception as e:
                    pass
    except Exception as e:
        pass
    done.append(ticker)

Función principal:

symbols = ["WATT","TSLA","UVXY","VXX","KEYS","EGO","GLD","WORK","BYND","BLK","PINS","LYFT","SPCE","PAYC","WDAY","UBER","CHGG","SHAK","CMG","CTL","ACB","TLRY","CGC","MJ","ORCL","GRUB","RNG","JWN","TTWO","ADI","ATVI","EA","SNE","GAMR","TXN","TMUS","MCHP","TSM","XBI","ETFC","MS","IWM","EXPD","RCL","CCL","MOMO","BABA","VMW","CRM","ULTA","SKYY","SPLK","FLWS","AVGO","TWTR","PANW","RJF","SABR","LOW","RS","ON","VEEV","DOCU","FB","SNAP","HPQ","RACE","F","AMAT","MRO","STM","AAL","DAL","VICR","XLC","CRON","DELL","T","VZ","S","MELI","CVM","REGN","NVAX","APT","CODX","LAKE","MRNA","EBS","INO", "SPY","SH","QQQ","XLF","KRE","XLV","HYG","LQD","NET","NFLX","ROKU","SHOP","AMZN","AAPL","MSFT","GOOGL","GOOG","NVDA","MU","AMD","INTC","MRVL","QCOMM","SQ","PYPL","TTD","TSLA","ZM","TDOC","LVGO","MDB","HD","VNQ","ARI","ACC","IIPR","EQR","EPR","SPG","PLD","ACB","WHR","NVAX","APT","MDT","CLRX","COST","SDC","LK","PVH","KSS","M","LULU","NKE","KO","BAC","JPM","CS","WFC","ARKW","ARKK","MGM","AMAT","WYNN","TGT","ITT","FXI"]   


for ticker in symbols:
    downloadChain(ticker)

Agregué un bucle separado para ver el tamaño de la matriz "hecho", que contiene todos los símbolos que se han completado. No estoy seguro de lo que he cambiado, pero ahora este ciclo se completa en unos 10-15 minutos cuando se espera 1 minuto.

while True:
    clear_output(wait=True)
    print(len(done))
1
Drew Wood 25 abr. 2020 a las 19:41

2 respuestas

Puede probar un paquete llamado yahooquery. Tiene la capacidad de recuperar datos de la cadena de opciones, así como recuperarlos de forma asincrónica. Puede pasar los 150 símbolos o recorrerlos:

from yahooquery import Ticker

symbols = ["WATT","TSLA","UVXY","VXX","KEYS","EGO","GLD","WORK","BYND","BLK","PINS","LYFT","SPCE","PAYC","WDAY","UBER","CHGG","SHAK","CMG","CTL","ACB","TLRY","CGC","MJ","ORCL","GRUB","RNG","JWN","TTWO","ADI","ATVI","EA","SNE","GAMR","TXN","TMUS","MCHP","TSM","XBI","ETFC","MS","IWM","EXPD","RCL","CCL","MOMO","BABA","VMW","CRM","ULTA","SKYY","SPLK","FLWS","AVGO","TWTR","PANW","RJF","SABR","LOW","RS","ON","VEEV","DOCU","FB","SNAP","HPQ","RACE","F","AMAT","MRO","STM","AAL","DAL","VICR","XLC","CRON","DELL","T","VZ","S","MELI","CVM","REGN","NVAX","APT","CODX","LAKE","MRNA","EBS","INO", "SPY","SH","QQQ","XLF","KRE","XLV","HYG","LQD","NET","NFLX","ROKU","SHOP","AMZN","AAPL","MSFT","GOOGL","GOOG","NVDA","MU","AMD","INTC","MRVL","QCOMM","SQ","PYPL","TTD","TSLA","ZM","TDOC","LVGO","MDB","HD","VNQ","ARI","ACC","IIPR","EQR","EPR","SPG","PLD","ACB","WHR","NVAX","APT","MDT","CLRX","COST","SDC","LK","PVH","KSS","M","LULU","NKE","KO","BAC","JPM","CS","WFC","ARKW","ARKK","MGM","AMAT","WYNN","TGT","ITT","FXI"]   

# Can either pass them all (probably want to use a proxy)
ticker = Ticker(symbols, asynchronous=True)
df = ticker.option_chain

# Or loop through your list
ticker = Ticker('aapl', asynchronous=True)  # Replace this within the loop
n = 10
dataframes = []
for i in range(0, len(symbols), n):
    ticker.symbols = symbols[i:i+n]
    dataframes.append(ticker.option_chain)
df = pd.concat(dataframes)
0
putty 25 abr. 2020 a las 18:35

Hay dos versiones de la "corrección". Agregándolos como respuesta en lugar de usar comentarios como chat :)

import asyncio
import pandas as pd
import yfinance as yf
from concurrent.futures import ThreadPoolExecutor


def background(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(executor, f, *args, **kwargs)

    return wrapped

done = []
@background
def downloadChain(ticker):
    print(ticker)
    df = pd.DataFrame()
    daysOut = 100
    chain = 0
    try:
        yf_ticker = yf.Ticker(ticker)
        expiration_dates = yf_ticker.options
        for expiration_date in expiration_dates:
            if (datetime.fromisoformat(expiration_date) - datetime.now()).days <= daysOut:
                try:
                    chain = yf_ticker.option_chain(expiration_date)
                    df = df.append(chain)

                except Exception as e:
                    pass
    except Exception as e:
        pass
    done.append(ticker)


symbols = ["WATT","TSLA","UVXY","VXX","KEYS","EGO","GLD","WORK","BYND","BLK","PINS","LYFT","SPCE","PAYC","WDAY","UBER","CHGG","SHAK","CMG","CTL","ACB","TLRY","CGC","MJ","ORCL","GRUB","RNG","JWN","TTWO","ADI","ATVI","EA","SNE","GAMR","TXN","TMUS","MCHP","TSM","XBI","ETFC","MS","IWM","EXPD","RCL","CCL","MOMO","BABA","VMW","CRM","ULTA","SKYY","SPLK","FLWS","AVGO","TWTR","PANW","RJF","SABR","LOW","RS","ON","VEEV","DOCU","FB","SNAP","HPQ","RACE","F","AMAT","MRO","STM","AAL","DAL","VICR","XLC","CRON","DELL","T","VZ","S","MELI","CVM","REGN","NVAX","APT","CODX","LAKE","MRNA","EBS","INO", "SPY","SH","QQQ","XLF","KRE","XLV","HYG","LQD","NET","NFLX","ROKU","SHOP","AMZN","AAPL","MSFT","GOOGL","GOOG","NVDA","MU","AMD","INTC","MRVL","QCOMM","SQ","PYPL","TTD","TSLA","ZM","TDOC","LVGO","MDB","HD","VNQ","ARI","ACC","IIPR","EQR","EPR","SPG","PLD","ACB","WHR","NVAX","APT","MDT","CLRX","COST","SDC","LK","PVH","KSS","M","LULU","NKE","KO","BAC","JPM","CS","WFC","ARKW","ARKK","MGM","AMAT","WYNN","TGT","ITT","FXI"]   

with ThreadPoolExecutor() as executor:
    for ticker in symbols:
        downloadChain(ticker)

El segundo es más estándar. En el que definimos un async principal que le pedimos a asyncio que utilice como punto de entrada principal.

import asyncio
import pandas as pd
import yfinance as yf
from concurrent.futures import ProcessPoolExecutor


symbols = ["WATT","TSLA","UVXY","VXX","KEYS","EGO","GLD","WORK","BYND","BLK","PINS","LYFT","SPCE","PAYC","WDAY","UBER","CHGG","SHAK","CMG","CTL","ACB","TLRY","CGC","MJ","ORCL","GRUB","RNG","JWN","TTWO","ADI","ATVI","EA","SNE","GAMR","TXN","TMUS","MCHP","TSM","XBI","ETFC","MS","IWM","EXPD","RCL","CCL","MOMO","BABA","VMW","CRM","ULTA","SKYY","SPLK","FLWS","AVGO","TWTR","PANW","RJF","SABR","LOW","RS","ON","VEEV","DOCU","FB","SNAP","HPQ","RACE","F","AMAT","MRO","STM","AAL","DAL","VICR","XLC","CRON","DELL","T","VZ","S","MELI","CVM","REGN","NVAX","APT","CODX","LAKE","MRNA","EBS","INO", "SPY","SH","QQQ","XLF","KRE","XLV","HYG","LQD","NET","NFLX","ROKU","SHOP","AMZN","AAPL","MSFT","GOOGL","GOOG","NVDA","MU","AMD","INTC","MRVL","QCOMM","SQ","PYPL","TTD","TSLA","ZM","TDOC","LVGO","MDB","HD","VNQ","ARI","ACC","IIPR","EQR","EPR","SPG","PLD","ACB","WHR","NVAX","APT","MDT","CLRX","COST","SDC","LK","PVH","KSS","M","LULU","NKE","KO","BAC","JPM","CS","WFC","ARKW","ARKK","MGM","AMAT","WYNN","TGT","ITT","FXI"]   
done = []


def downloadChain(ticker):
    print(ticker)
    df = pd.DataFrame()
    daysOut = 100
    chain = 0
    try:
        yf_ticker = yf.Ticker(ticker)
        expiration_dates = yf_ticker.options
        for expiration_date in expiration_dates:
            if (datetime.fromisoformat(expiration_date) - datetime.now()).days <= daysOut:
                try:
                    chain = yf_ticker.option_chain(expiration_date)
                    df = df.append(chain)

                except Exception as e:
                    pass
    except Exception as e:
        pass
    done.append(ticker)


async def main():
    with ProcessPoolExecutor() as executor:
        for ticker in symbols:
            asyncio.get_event_loop().run_in_executor(executor, downloadChain,
                                                     ticker)



if __name__ == '__main__':
    asyncio.run(main())

Aquí también tiene un control más refinado sobre qué ejecutor usar. Básicamente, codificamos explícitamente en qué bucle de eventos estamos trabajando y en qué agregamos trabajo a un ejecutor. Las pruebas locales no mostraron grandes diferencias entre ProcessPoolExecutor y ThreadPoolExecutor.

0
edd 25 abr. 2020 a las 18:42