Executando verificação de segurança...
2

Como eu conectei Oracle ao DuckDB mesmo não tendo extensão específica na documentação

Sou engenheiro de dados e desde o início do DuckDB já implementei em projetos pessoais. Recentemente, comecei a utilizar em produção já que o DW da empresa é em PostgreSQL e o DuckDB se aproveita do mesmo.
Na documentação, as core extensions para banco de dados relacionais são: MySQL, PostgreSQL e SQLite.
Já das extensões da comunidade, há duas para ficar de olho:

Então eu pensei: e se eu implementar eu mesmo algo afim de estudar?

Bibliotecas que utilizei:

  • DuckDB: focado principalmente em OLAP, armazena dados em colunas e é ideal para análise de grande volume de dados em memória.
  • OracleDB: módulo oficial de Oracle para Python desenvolvido pelo time da Oracle.
  • PyArrow: provê uma interface para trabalhar com dados colunares em memória.

O código

oracle_data_loader.py

import oracledb
import getpass
import os
import pyarrow as pa
import duckdb

def sync_connect_to_oracle(): 
    """Conecta ao banco de dados Oracle e retorna o objeto de conexão."""
    un = os.getenv("UN")
    cs = os.getenv("CS") + ":" + os.getenv("PORT") + "/" + os.getenv("SERVICE")
    pw = getpass.getpass(f"Enter password for {un}@{cs}: ")
    return oracledb.connect(user=un, password=pw, dsn=cs)
    
def load_sync_data_to_duckdb(connection, table_name, sql_query):
    """
    Carrega dados do Oracle para o DuckDB em batches usando PyArrow.
    Cria uma tabela no DuckDB com o nome especificado.
    """
    first_batch = True
    with connection.cursor() as cursor:
        for odf in cursor.fetch_df_batches(statement=sql_query, size=100_000):
            arrow_table = pa.table(odf) 

            if first_batch: 
                duckdb.sql(f"CREATE OR REPLACE TABLE {table_name} AS SELECT * FROM arrow_table")
                first_batch = False
            else: 
                duckdb.sql(f"INSERT INTO {table_name} SELECT * FROM arrow_table")

main.py

import duckdb
import oracledb
from oracle_data_loader import connect_to_oracle, load_data_to_duckdb

def main():
    """Função principal para executar o fluxo de trabalho."""
    sql = "SELECT * FROM PRODUTOS"
    try:
        with connect_to_oracle() as connection:
            load_data_to_duckdb(connection, "products", sql)
            
        print(duckdb.sql("SELECT * FROM products"))
    except oracledb.Error as e:
        print(f"Database error: {e}")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")

if __name__ == "__main__":
    main()

Processo é simples:

  1. Crio a conexão e a retorno na função sync_connect_to_oracle
  2. Passo a conexão como argumento da função load_sync_data_to_duckdb
  3. Transformo o dataframe retornado pelo método fetch_df_batches em um Arrow Table
  4. Crio uma tabela a partir dessa tabela anterior e insiro as informações
  5. Consulto/processo posteriormente os dados em memória

O que pretendo fazer agora?

  • Vou implementar os métodos assíncronos de DML
  • Vou fazer benchmark com as extensões da comunidade para comparar com minha implementação
Carregando publicação patrocinada...