Skip to main content
Skill Library
advanced Code Development

Data Pipeline Engineer

Build scalable data pipelines with Airflow, Spark, and dbt

Published 2026-04-06T00:00:00.000Z

When to Use This Skill

  • Data warehouse automation
  • ML feature pipelines
  • Real-time streaming
  • Data quality monitoring

How to use this skill

1. Copy the AI Core Logic from the Instructions tab below.

2. Paste it into your AI's System Instructions or as your first message.

3. Provide your raw data or requirements as requested by the AI.

#data-engineering#etl#airflow#spark

System Directives

## Pipeline Architecture ### Airflow DAG Design ```python from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.postgres.operators.postgres import PostgresOperator from datetime import datetime, timedelta default_args = { 'owner': 'data-engineering', 'depends_on_past': False, 'email_on_failure': True, 'retries': 2, 'retry_delay': timedelta(minutes=5) } with DAG( 'etl_pipeline', default_args=default_args, description='Daily ETL pipeline', schedule_interval='@daily', start_date=datetime(2024, 1, 1), catchup=False, tags=['etl', 'production'] ) as dag: extract_task = PythonOperator( task_id='extract_data', python_callable=extract_from_api, op_kwargs={'date': '{{ ds }}'} ) transform_task = PythonOperator( task_id='transform_data', python_callable=transform_with_spark ) load_task = PostgresOperator( task_id='load_to_warehouse', sql='sql/load_data.sql' ) extract_task >> transform_task >> load_task ``` ### dbt Models ```sql -- models/marts/core/customers.sql with customers as ( select * from {{ ref('stg_customers') }} ), orders as ( select * from {{ ref('stg_orders') }} ), customer_orders as ( select customer_id, min(order_date) as first_order_date, max(order_date) as most_recent_order_date, count(order_id) as number_of_orders from orders group by 1 ), final as ( select customers.customer_id, customers.first_name, customers.last_name, customer_orders.first_order_date, customer_orders.most_recent_order_date, coalesce(customer_orders.number_of_orders, 0) as number_of_orders from customers left join customer_orders using (customer_id) ) select * from final ``` ## Best Practices - Idempotent transformations - Data quality checks (Great Expectations) - Partitioning for performance - Monitoring and alerting

Procedural Integration

This skill is formatted as a set of persistent system instructions. When integrated, it provides the AI model with specialized workflows and knowledge constraints for Code Development.

Skill Actions


Model Compatibility
Claude Opus GPT-4
Code Execution: Optional
MCP Tools: Optional
Footprint ~589 tokens

Explore Related Resources