.internal-skills/data-engineer/SKILL.md
Engenheiro de Dados. Use para: - Pipeline de dados (ETL/ELT) - Data warehousing - Integrações de dados - Data quality - Arquitetura de dados
npx skillsauth add suportebahia/equipe-devs Equipe SBahia - Data EngineerInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
# Airflow DAG
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'etl_orders',
default_args=default_args,
schedule_interval='0 2 * * *',
start_date=datetime(2024, 1, 1),
)
def extract_orders(**context):
from src.extractors import orders_api
data = orders_api.get_orders(
since=context['execution_date']
)
return data
def transform_orders(**context):
import pandas as pd
ti = context['ti']
data = ti.xcom_pull(task_ids='extract')
df = pd.DataFrame(data)
df['total'] = df['items'].apply(lambda x: sum(i['price'] * i['qty'] for i in x))
df['order_date'] = pd.to_datetime(df['created_at'])
df['month'] = df['order_date'].dt.to_period('M')
return df.to_json()
def load_to_warehouse(**context):
import pandas as pd
from sqlalchemy import create_engine
ti = context['ti']
df = pd.read_json(ti.xcom_pull(task_ids='transform'))
engine = create_engine(os.environ['WAREHOUSE_URL'])
df.to_sql('fact_orders', engine, if_exists='append', index=False)
extract = PythonOperator(task_id='extract', python_callable=extract_orders, dag=dag)
transform = PythonOperator(task_id='transform', python_callable=transform_orders, dag=dag)
load = PythonOperator(task_id='load', python_callable=load_to_warehouse, dag=dag)
extract >> transform >> load
# dbt/models/marts/fact_orders.sql
{{ config(materialized='incremental', unique_key='order_id') }}
SELECT
o.order_id,
o.customer_id,
o.status,
o.created_at,
o.total_amount,
o.items_count,
c.customer_name,
c.segment,
c.region,
p.payment_method,
ROW_NUMBER() OVER (PARTITION BY o.customer_id ORDER BY o.created_at DESC) as customer_order_seq
FROM {{ source('staging', 'raw_orders') }} o
JOIN {{ ref('dim_customer') }} c ON o.customer_id = c.customer_id
JOIN {{ ref('dim_payment') }} p ON o.order_id = p.order_id
{% if is_incremental() %}
WHERE o.created_at > (SELECT MAX(created_at) FROM {{ this }})
{% endif %}
# Sensors e Operators
from airflow.sensors.sql_sensor import SqlSensor
from airflow.operators.dummy import DummyOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
# Wait for source data
wait_orders = SqlSensor(
task_id='wait_orders',
conn_id='source_db',
sql="SELECT COUNT(*) FROM orders WHERE processed = 0",
poke_interval=60,
)
# Load to staging
load_staging = PostgresOperator(
task_id='load_staging',
postgres_conn_id='warehouse',
sql='sql/staging/load_orders.sql',
)
# Data quality check
check_quality = PostgresOperator(
task_id='check_quality',
postgres_conn_id='warehouse',
sql="""
SELECT COUNT(*) FROM stg_orders
WHERE order_total < 0
""",
)
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
spark = SparkSession.builder \
.appName('orders_etl') \
.config('spark.sql.shuffle.partitions', 8) \
.getOrCreate()
# Read from Kafka
orders = spark.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers', 'broker:9092') \
.option('subscribe', 'orders') \
.load()
# Transform
orders_parsed = orders.select(
from_json(col('value').cast('string'), schema).alias('data')
).select('data.*')
# Aggregations
daily_orders = orders_parsed \
.groupBy(window('created_at', '1 hour'), 'status') \
.agg(
count('order_id').alias('order_count'),
sum('total').alias('revenue')
)
# Write to Delta Lake
daily_orders.writeStream \
.format('delta') \
.outputMode('complete') \
.option('checkpointLocation', '/delta/checkpoints') \
.table('analytics.daily_orders')
┌─────────────┐
│ dim_date │
└──────┬──────┘
│
┌──────┴──────┐ ┌─────────────┐
│ fact_orders │────<│ dim_product │
└──────┬──────┘ └─────────────┘
│
┌──────┴──────┐ ┌─────────────┐
│ dim_customer│────<│ dim_seller │
└─────────────┘ └─────────────┘
| Tipo | Descrição | Uso | |------|-----------|-----| | Fact | Eventos transacionais | Vendas, pedidos, cliques | | Dim | Dados descritivos | Clientes, produtos | | Bridge | Many-to-many | Categorias, tags | | Accumulating | Status changing | Pedidos com timeline |
# Great Expectations
import great_expectations as ge
df = ge.read_csv('orders.csv')
# Expectations
df.expect_column_values_to_be_between('total', min_value=0)
df.expect_column_values_to_be_unique('order_id')
df.expect_column_values_to_not_be_null('customer_id')
df.expect_column_values_to_be_in_set('status', ['pending', 'completed', 'cancelled'])
# Run and save
results = df.validate()
results.save_expectation_suite('orders_expectations.json')
| Tipo | Ferramenta | |------|------------| | Orquestração | Airflow, Dagster, Prefect | | Transformação | dbt, Spark, Databricks | | Warehouse | BigQuery, Snowflake, Redshift, PostgreSQL | | Streaming | Kafka, Flink, Spark Streaming | | Data Lake | S3, GCS, Delta Lake | | Quality | Great Expectations, dbt tests | | Orchestration | Airbyte, Fivetran, Singer |
testing
Sistema de agentes IA para coordenação de projetos de desenvolvimento. Use este skill para iniciar qualquer projeto. Este skill orquestra automaticamente os agentes especializados conforme a necessidade: - Análise e planejamento de projetos - Coordenação de múltiplos agentes - Gestão de tasks e dependências
development
Orquestrador principal do ecossistema de agentes IA Equipe SBahia. Use para: - Coordenar projetos de desenvolvimento web - Alocar agentes especializados - Gerenciar workflow completo - Garantir padrões MVC e de mercado Agents disponíveis: leadership-tech, uxui-designer, frontend-developer, backend-controller, backend-model, dba-specialist, security-specialist, api-gateway-specialist, mobile-developer, data-engineer, elastic-engineer, machine-learning-engineer, testing-specialist, error-handling-specialist, product-owner, devops-engineer, solutions-engineer
testing
Skill para Designer UX/UI. Use para: - Criar experiência do usuário - Desenvolver interfaces visuais - Definir design system - Validar usabilidade
testing
Especialista em QA/Testes automatizados. Use para: - Criar estratégia de testes completa - Implementar testes unitários, integração e E2E - TDD/BDD quando aplicável - Coverage analysis - Testes de performance e carga