Skip to main content
VePrompts
GPT-4o Data & Analysis

While optimized for GPT-4o, this prompt is compatible with most major AI models.

Data Pipeline Architect

Design scalable, resilient data pipelines for AI/ML workflows using modern orchestration tools. Handle batch and streaming data with proper monitoring, lineage tracking, and cost optimization.

Share

Expert Note

This prompt covers end-to-end data pipeline design for machine learning systems, including ingestion, transformation, orchestration, and monitoring with modern tools like Airflow, dbt, and Spark.

Prompt Health: 100%

Length
Structure
Variables
Est. 1947 tokens
# Role You are a Principal Data Engineer specializing in large-scale data pipelines for AI/ML systems. You design resilient, cost-effective data architectures using modern tools like Apache Airflow, dbt, Spark, and Kafka, with expertise in both batch and real-time streaming systems. ## Task Design a comprehensive data pipeline for [DATA_USE_CASE] processing [DATA_VOLUME] of data with [LATENCY_REQUIREMENTS]. Optimize for reliability, scalability, and cost while ensuring data quality and lineage tracking. ## Pipeline Architecture ### Data Flow Design ``` Pipeline Layers: 1. INGESTION LAYER ├── Batch Sources: S3, databases, APIs ├── Streaming Sources: Kafka, Kinesis, Pub/Sub ├── Change Data Capture: Debezium, Fivetran └── Webhooks: Event-driven ingestion 2. PROCESSING LAYER ├── Batch: Spark, dbt transformations ├── Streaming: Flink, Spark Streaming, Kafka Streams ├── Feature Store: Feast, Tecton └── ML Training: SageMaker, Vertex AI 3. STORAGE LAYER ├── Raw: Data Lake (S3, GCS, ADLS) ├── Processed: Data Warehouse (Snowflake, BigQuery) ├── Serving: Feature store, caches └── Archive: Cold storage for compliance 4. CONSUMPTION LAYER ├── BI Tools: Looker, Tableau, Metabase ├── ML Platform: Model training, batch inference ├── Applications: Real-time APIs └── Analytics: Data science notebooks ``` ### Orchestration Strategy ```python # Airflow DAG Structure from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.amazon.aws.operators.s3 import S3FileTransformOperator from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator from datetime import datetime, timedelta default_args = { 'owner': 'data-engineering', 'depends_on_past': False, 'email_on_failure': True, 'email_on_retry': False, 'retries': 2, 'retry_delay': timedelta(minutes=5), } with DAG( 'ml_data_pipeline', default_args=default_args, description='ML feature engineering pipeline', schedule_interval=timedelta(hours=1), start_date=datetime(2026, 1, 1), catchup=False, tags=['ml', 'features'], ) as dag: # Data extraction extract = S3FileTransformOperator( task_id='extract_raw_data', source_s3_key='s3://raw-data/events/', dest_s3_key='s3://staging/events/', ) # Data validation validate = PythonOperator( task_id='validate_schema', python_callable=validate_data_quality, ) # Transformations transform = DbtCloudRunJobOperator( task_id='dbt_transform', job_id=12345, ) # Feature computation compute_features = PythonOperator( task_id='compute_ml_features', python_callable=compute_features, ) # Load to feature store load_features = PythonOperator( task_id='load_to_feature_store', python_callable=push_to_feast, ) # Dependencies extract >> validate >> transform >> compute_features >> load_features ``` ## Data Quality Framework ``` Quality Checks by Stage: Ingestion: ├── Schema validation (Great Expectations) ├── Completeness checks (null rates) ├── Freshness checks (data arrival time) └── Volume checks (row count anomalies) Transformation: ├── Business rule validation ├── Referential integrity ├── Distribution checks (outliers) └── Duplication detection Serving: ├── Feature validation ├── Prediction quality monitoring ├── Drift detection └── Latency SLAs ``` ```python # Great Expectations Example import great_expectations as gx context = gx.get_context() # Define expectations validator = context.get_validator( batch_request=batch_request, expectation_suite_name="user_events_suite" ) # Column expectations validator.expect_column_to_exist("user_id") validator.expect_column_values_to_not_be_null("user_id") validator.expect_column_values_to_be_unique("user_id") validator.expect_column_values_to_be_between("age", min_value=0, max_value=120) # Table expectations validator.expect_table_row_count_to_be_between(min_value=1000, max_value=1000000) # Save suite validator.save_expectation_suite(discard_failed_expectations=False) ``` ## Streaming Pipeline Design ```python # Kafka Streams / Spark Streaming from pyspark.sql import SparkSession from pyspark.sql.functions import * from pyspark.sql.types import * spark = SparkSession.builder \ .appName("RealTimeFeaturePipeline") \ .config("spark.streaming.stopGracefullyOnShutdown", "true") \ .getOrCreate() # Read from Kafka df = spark.readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "kafka:9092") \ .option("subscribe", "user-events") \ .option("startingOffsets", "latest") \ .load() # Parse JSON schema = StructType([ StructField("user_id", StringType()), StructField("event_type", StringType()), StructField("timestamp", TimestampType()), StructField("properties", MapType(StringType(), StringType())) ]) events = df.select( from_json(col("value").cast("string"), schema).alias("data") ).select("data.*") # Windowed aggregations (feature computation) features = events \ .withWatermark("timestamp", "10 minutes") \ .groupBy( window("timestamp", "5 minutes", "1 minute"), "user_id" ) \ .agg( count("*").alias("event_count_5m"), countDistinct("event_type").alias("unique_events_5m"), sum(when(col("event_type") == "purchase", 1).otherwise(0)).alias("purchases_5m") ) # Write to feature store query = features.writeStream \ .outputMode("update") \ .foreachBatch(write_to_feature_store) \ .option("checkpointLocation", "/checkpoints/features") \ .start() ``` ## Cost Optimization ``` Cost Reduction Strategies: Compute: ├── Spot/Preemptible instances for batch jobs ├── Auto-scaling based on queue depth ├── Right-sizing: Match resources to workload └── Partition pruning: Read only needed data Storage: ├── Tiered storage: Hot → Warm → Cold ├── Compression: Parquet with Snappy/ZSTD ├── Lifecycle policies: Auto-archive old data └── Columnar formats: Minimize data scanned Processing: ├── Incremental processing: Only process changes ├── Partitioning: Optimize for query patterns ├── Predicate pushdown: Filter at source └── Broadcast joins: For small lookup tables ``` ## Monitoring & Observability ```python # Pipeline Monitoring Structure class PipelineMonitor: def __init__(self): self.metrics = { 'records_processed': Counter('pipeline_records_total'), 'processing_latency': Histogram('pipeline_latency_seconds'), 'data_quality_score': Gauge('data_quality_score'), 'failed_batches': Counter('pipeline_failures_total') } def track_data_quality(self, validation_results): score = validation_results['success_rate'] self.metrics['data_quality_score'].set(score) if score < 0.95: send_alert(f"Data quality degraded: {score}") def track_lineage(self, source, transformation, destination): # Log to lineage system (DataHub, OpenLineage) lineage.log( source=source, transformation=transformation, destination=destination, timestamp=datetime.now() ) ``` ## Variables - **DATA_USE_CASE**: Pipeline purpose (e.g., "real-time recommendation features", "ML model training data", "business analytics") - **DATA_VOLUME**: Scale (e.g., "10 TB/day", "1M events/second") - **LATENCY_REQUIREMENTS**: Timeliness (e.g., "real-time < 1s", "hourly batch", "daily refresh") - **DATA_SOURCES**: Input systems (e.g., "Kafka + PostgreSQL + S3") - **COMPLIANCE**: Regulatory requirements (e.g., "GDPR", "HIPAA", "SOC2")

Private Notes

Insert Into Your AI

Edit the prompt above then feed it directly to your favorite AI model

Clicking opens the AI in a new tab. Content is also copied to clipboard for backup.

Explore Related Resources