Como eu conectei Oracle ao DuckDB mesmo não tendo extensão específica na documentação
Sou engenheiro de dados e desde o início do DuckDB já implementei em projetos pessoais. Recentemente, comecei a utilizar em produção já que o DW da empresa é em PostgreSQL e o DuckDB se aproveita do mesmo.
Na documentação, as core extensions para banco de dados relacionais são: MySQL, PostgreSQL e SQLite.
Já das extensões da comunidade, há duas para ficar de olho:
Então eu pensei: e se eu implementar eu mesmo algo afim de estudar?
Bibliotecas que utilizei:
- DuckDB: focado principalmente em OLAP, armazena dados em colunas e é ideal para análise de grande volume de dados em memória.
- OracleDB: módulo oficial de Oracle para Python desenvolvido pelo time da Oracle.
- PyArrow: provê uma interface para trabalhar com dados colunares em memória.
O código
oracle_data_loader.py
import oracledb
import getpass
import os
import pyarrow as pa
import duckdb
def sync_connect_to_oracle():
"""Conecta ao banco de dados Oracle e retorna o objeto de conexão."""
un = os.getenv("UN")
cs = os.getenv("CS") + ":" + os.getenv("PORT") + "/" + os.getenv("SERVICE")
pw = getpass.getpass(f"Enter password for {un}@{cs}: ")
return oracledb.connect(user=un, password=pw, dsn=cs)
def load_sync_data_to_duckdb(connection, table_name, sql_query):
"""
Carrega dados do Oracle para o DuckDB em batches usando PyArrow.
Cria uma tabela no DuckDB com o nome especificado.
"""
first_batch = True
with connection.cursor() as cursor:
for odf in cursor.fetch_df_batches(statement=sql_query, size=100_000):
arrow_table = pa.table(odf)
if first_batch:
duckdb.sql(f"CREATE OR REPLACE TABLE {table_name} AS SELECT * FROM arrow_table")
first_batch = False
else:
duckdb.sql(f"INSERT INTO {table_name} SELECT * FROM arrow_table")
main.py
import duckdb
import oracledb
from oracle_data_loader import connect_to_oracle, load_data_to_duckdb
def main():
"""Função principal para executar o fluxo de trabalho."""
sql = "SELECT * FROM PRODUTOS"
try:
with connect_to_oracle() as connection:
load_data_to_duckdb(connection, "products", sql)
print(duckdb.sql("SELECT * FROM products"))
except oracledb.Error as e:
print(f"Database error: {e}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
if __name__ == "__main__":
main()
Processo é simples:
- Crio a conexão e a retorno na função
sync_connect_to_oracle - Passo a conexão como argumento da função
load_sync_data_to_duckdb - Transformo o dataframe retornado pelo método
fetch_df_batchesem um Arrow Table - Crio uma tabela a partir dessa tabela anterior e insiro as informações
- Consulto/processo posteriormente os dados em memória
O que pretendo fazer agora?
- Vou implementar os métodos assíncronos de DML
- Vou fazer benchmark com as extensões da comunidade para comparar com minha implementação