Base de datos existente y resultado deseado:

Tengo una base de datos SQLite más grande (12 gb, tablas con más de 44 millones de filas) que me gustaría modificar usando Pandas en Python3.

Objetivo de ejemplo : espero leer una de estas tablas grandes (44 millones de filas) en un DF en fragmentos, manipular el fragmento de DF y escribir el resultado en una nueva tabla. Si es posible, me gustaría reemplazar la nueva tabla si existe, y agregarle cada fragmento.

Debido a que mis manipulaciones solo agregan o modifican columnas, la nueva tabla debe tener el mismo número de filas que la tabla original.

Problemas:

El problema principal parece provenir de la siguiente línea en el siguiente código:

df.to_sql(new_table, con=db, if_exists = "append", index=False)

  1. Cuando esta línea se ejecuta en el código siguiente, parece que constantemente obtengo un fragmento adicional de tamaño = N, más una observación de lo que esperaba.
  2. La primera vez que este código se ejecuta con un nuevo nombre de tabla, aparece un error:
 Traceback (most recent call last):
  File "example.py", line 23, in <module>
    for df in df_generator:
  File "/usr/local/lib/python3.5/site-packages/pandas/io/sql.py", line 1420, in _query_iterator
    data = cursor.fetchmany(chunksize)
sqlite3.OperationalError: SQL logic error or missing database
  1. Si luego vuelvo a ejecutar el script, con el mismo nuevo nombre de tabla, se ejecuta para cada fragmento y un fragmento adicional, +1 fila.

  2. Cuando se comenta la línea df.to_sql(), el bucle se ejecuta para el número esperado de fragmentos.

Ejemplo de prueba de problema con código completo:

Código completo: ejemplo.py

import pandas as pd
import sqlite3

#Helper Functions Used in Example
def ren(invar, outvar, df):
    df.rename(columns={invar:outvar}, inplace=True)
    return(df)

def count_result(c, table):
    ([print("[*] total: {:,} rows in {} table"
        .format(r[0], table)) 
        for r in c.execute("SELECT COUNT(*) FROM {};".format(table))])


#Connect to Data
db = sqlite3.connect("test.db")
c = db.cursor()
new_table = "new_table"

#Load Data in Chunks
df_generator = pd.read_sql_query("select * from test_table limit 10000;", con=db, chunksize = 5000)

for df in df_generator:
    #Functions to modify data, example
    df = ren("name", "renamed_name", df)
    print(df.shape)
    df.to_sql(new_table, con=db, if_exists = "append", index=False)


#Count if new table is created
try:
    count_result(c, new_table)
except:
    pass

1. Resultado cuando #df.to_sql(new_table, con=db, if_exists = "append", index=False)

(la línea del problema está comentada):

$ python3 example.py 
(5000, 22)
(5000, 22)

Lo que espero ya que el código de ejemplo limita mi tabla grande a 10k filas.

2. Resultado cuando df.to_sql(new_table, con=db, if_exists = "append", index=False)

a. la línea del problema no está comentada

si. esta es la primera vez el código se ejecuta con una nueva_tabla:

$ python3 example.py 
(5000, 22)
Traceback (most recent call last):
  File "example.py", line 23, in <module>
    for df in df_generator:
  File "/usr/local/lib/python3.5/site-packages/pandas/io/sql.py", line 1420, in _query_iterator
    data = cursor.fetchmany(chunksize)
sqlite3.OperationalError: SQL logic error or missing database

3. Resultado cuando df.to_sql(new_table, con=db, if_exists = "append", index=False)

a. la línea del problema no está comentada

si. el código anterior se ejecuta segunda vez con new_table:

$ python3 example.py 
(5000, 22)
(5000, 22)
(5000, 22)
(1, 22)
[*] total: 20,001 rows in new_table table

Por lo tanto, tengo el problema de que primero se rompe el código cuando se ejecuta la primera vez (Resultado 2) y segundo, el número total de filas cuando se ejecuta la segunda vez (Resultado 3) es más del doble de lo que esperaba.

Cualquier sugerencia sobre cómo puedo resolver este problema sería muy apreciada.

1
data-cruncher524 1 mar. 2018 a las 01:32

3 respuestas

La mejor respuesta

Puede intentar especificar:

db = sqlite3.connect("test.db", isolation_level=None)
#  ---->                        ^^^^^^^^^^^^^^^^^^^^

Además, puede intentar aumentar su tamaño de fragmento, porque de lo contrario el tiempo entre confirmaciones es demasiado corto para SQLite DB, eso está causando este error, supongo ... También recomendaría usar PostgreSQL, MySQL / MariaDB o algo similar. son mucho más confiables y apropiados para tal tamaño de DB ...

1
MaxU 28 feb. 2018 a las 23:21

Retraso de tiempo en la solución anterior

La solución de @ MaxU agregando isolation_level=None a la conexión de la base de datos es breve y agradable. Sin embargo, por cualquier motivo, ralentizó la escritura / confirmación de cada fragmento en la base de datos de manera espectacular. Por ejemplo, cuando probé la solución en una tabla de 12 millones de filas, el código tardó más de 6 horas en completarse. Por el contrario, construir la tabla original a partir de varios archivos de texto tomó unos minutos.

Esta idea condujo a una solución más rápida pero menos elegante, que tardó menos de 7 minutos en completarse en una tabla de 12 millones de filas frente a más de 6 horas. Las filas de salida coincidieron con las filas de entrada, resolviendo el problema en mi pregunta original.

Solución más rápida pero menos elegante

Desde la construcción de la tabla original a partir de archivos de texto / archivos csv y el uso de scripts SQL para cargar los datos, combiné ese enfoque con las capacidades de Panda. Los pasos básicos esenciales son los siguientes:

  1. Conéctate a la base de datos
  2. Use un script SQL para crear una nueva tabla (las columnas y el orden deben coincidir con lo que haga con los pandas df)
  3. Lee la tabla masiva en trozos
  4. Para cada fragmento, modifique el df como desee, escriba en csv, cargue csv usando sql y confirme el cambio.

Código principal de solución:

import pandas as pd
import sqlite3

#Note I Used Functions I Wrote in build_db.py
#(shown below after example solution)
from build_db import *


#Helper Functions Used in Example
def lower_var(var, df):
    s = df[var].str.lower()
    df = df.drop(var, axis=1)
    df = pd.concat([df, s], axis=1)
    return(df)


#Connect to Data
db = sqlite3.connect("test.db")
c = db.cursor()

#create statement
create_table(c, "create_test.sql", path='sql_clean/')

#Load Data in Chunks
df_generator = pd.read_sql_query("select * from example_table;", con=db, chunksize = 100000)

for df in df_generator:
    #functions to modify data, example
    df = lower_var("name", df) #changes column order

    #restore df to column order in sql table
    db_order = ["cmte_id", "amndt_ind", "rpt_tp", "transaction_pgi", "image_num", "transaction_tp", \
        "entity_tp", "name", "city", "state", "zip_code", "employer", "occupation", "transaction_dt", \
        "transaction_amt", "other_id", "tran_id", "file_num", "memo_cd", "memo_text", "sub_id"]
    df = df[db_order]

    #write chunk to csv
    file = "df_chunk.csv"
    df.to_csv(file, sep='|', header=None, index=False)

    #insert chunk csv to db
    insert_file_into_table(c, "insert_test.sql", file, '|', path='sql_clean/')
    db.commit()


#Count results
count_result(c, "test_indiv")

Funciones de usuario importadas para el código anterior

#Relavant Functions in build_db.py

def count_result(c, table):
    ([print("[*] total: {:,} rows in {} table"
        .format(r[0], table)) 
        for r in c.execute("SELECT COUNT(*) FROM {};".format(table))])

def create_table(cursor, sql_script, path='sql/'):
    print("[*] create table with {}{}".format(path, sql_script))
    qry = open("{}{}".format(path, sql_script), 'rU').read()
    cursor.executescript(qry)


def insert_file_into_table(cursor, sql_script, file, sep=',', path='sql/'):
    print("[*] inserting {} into table with {}{}".format(file, path, sql_script))
    qry = open("{}{}".format(path, sql_script), 'rU').read()
    fileObj = open(file, 'rU', encoding='latin-1')
    csvReader = csv.reader(fileObj, delimiter=sep, quotechar='"')

    try:
        for row in csvReader:
            try:
                cursor.execute(qry, row)
            except sqlite3.IntegrityError as e:
                pass

    except Exception as e:
        print("[*] error while processing file: {}, error code: {}".format(file, e))
        print("[*] sed replacing null bytes in file: {}".format(file))
        sed_replace_null(file, "clean_null.sh")
        subprocess.call("bash clean_null.sh", shell=True)

        try:
            print("[*] inserting {} into table with {}{}".format(file, path, sql_script))
            fileObj = open(file, 'rU', encoding='latin-1')
            csvReader = csv.reader(fileObj, delimiter=sep, quotechar='"')
            for row in csvReader:
                try:
                    cursor.execute(qry, row)
                except sqlite3.IntegrityError as e:
                    pass
                    print(e)    

        except Exception as e:
            print("[*] error while processing file: {}, error code: {}".format(file, e))

Scripts de usuario SQL

--create_test.sql

DROP TABLE if exists test_indiv;

CREATE TABLE test_indiv (
    cmte_id TEXT NOT NULL,
    amndt_ind TEXT,
    rpt_tp TEXT,
    transaction_pgi TEXT,
    image_num TEXT,
    transaction_tp TEXT,
    entity_tp TEXT,
    name TEXT,
    city TEXT,
    state TEXT,
    zip_code TEXT,
    employer TEXT,
    occupation TEXT,
    transaction_dt TEXT,
    transaction_amt TEXT,
    other_id TEXT,
    tran_id TEXT,
    file_num NUMERIC,
    memo_cd TEXT,
    memo_text TEXT,
    sub_id NUMERIC NOT NULL
);

CREATE UNIQUE INDEX idx_test_indiv ON test_indiv (sub_id);
--insert_test.sql

INSERT INTO test_indiv (
    cmte_id,
    amndt_ind,
    rpt_tp,
    transaction_pgi,
    image_num,
    transaction_tp,
    entity_tp,
    name,
    city,
    state,
    zip_code,
    employer,
    occupation,
    transaction_dt,
    transaction_amt,
    other_id,
    tran_id,
    file_num,
    memo_cd,
    memo_text,
    sub_id
    ) 
VALUES (
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?
);
1
data-cruncher524 1 mar. 2018 a las 19:41

Experimentó exactamente el mismo problema (que trata con datos> 30 GB). Así es como abordé el problema: en lugar de usar la función Chunk de read_sql. Decidí crear un trozo de fragmentos manual así:

chunksize=chunk_size
offset=0
for _ in range(0, a_big_number):
    query = "SELECT * FROM the_table %s offset %s" %(chunksize, offset)
    df = pd.read_sql(query, conn)
    if len(df)!=0:
        ....
    else:
        break
0
Greg 17 ago. 2018 a las 16:04