Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
244 changes: 213 additions & 31 deletions databricks-skills/databricks-spark-structured-streaming/SKILL.md
Original file line number Diff line number Diff line change
@@ -1,23 +1,59 @@
---
name: databricks-spark-structured-streaming
description: Comprehensive guide to Spark Structured Streaming for production workloads. Use when building streaming pipelines, implementing real-time data processing, handling stateful operations, or optimizing streaming performance.
description: "Comprehensive guide to Spark Structured Streaming for production workloads. Use when building streaming pipelines, implementing real-time data processing, handling stateful operations, or optimizing streaming performance."
---

# Spark Structured Streaming

Production-ready streaming pipelines with Spark Structured Streaming. This skill provides navigation to detailed patterns and best practices.
Production-ready streaming pipelines with Spark Structured Streaming on Databricks.

## Quick Start
## When to Use This Skill

Use this skill when:
- Building **Kafka-to-Delta** or **Kafka-to-Kafka** streaming pipelines
- Implementing **stream-stream joins** or **stream-static joins**
- Configuring **watermarks**, **state stores**, or **RocksDB** for stateful operations
- Choosing between **processingTime**, **availableNow**, and **Real-Time Mode** triggers
- Optimizing **streaming costs** (trigger tuning, cluster sizing, scheduled streaming)
- Writing **foreachBatch MERGE** patterns for upserts
- Managing **checkpoints** (location, recovery, migration)
- Troubleshooting streaming issues (lag, state bloat, checkpoint corruption)

## Reference Files

| Topic | File | When to Read |
|-------|------|--------------|
| Kafka Streaming | [kafka-streaming.md](kafka-streaming.md) | Kafka-to-Delta ingestion, Kafka-to-Kafka, Real-Time Mode, authentication |
| Stream-Stream Joins | [stream-stream-joins.md](stream-stream-joins.md) | Joining two streams with watermarks and time-range conditions |
| Stream-Static Joins | [stream-static-joins.md](stream-static-joins.md) | Enriching streams with dimension tables, broadcast hints |
| Multi-Sink Writes | [multi-sink-writes.md](multi-sink-writes.md) | Writing one stream to multiple Delta tables in parallel |
| Merge Operations | [merge-operations.md](merge-operations.md) | foreachBatch MERGE, parallel merges, deduplication |
| Checkpoints | [checkpoint-best-practices.md](checkpoint-best-practices.md) | Checkpoint location, recovery, migration, cleanup |
| Stateful Operations | [stateful-operations.md](stateful-operations.md) | Watermarks, state stores, RocksDB, state monitoring |
| Triggers & Cost | [trigger-and-cost-optimization.md](trigger-and-cost-optimization.md) | Trigger selection, cost optimization, cluster right-sizing |
| Best Practices | [streaming-best-practices.md](streaming-best-practices.md) | Production checklist, beginner through expert tips |

---

## Quick Start: Kafka to Delta

```python
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

schema = StructType([
StructField("event_id", StringType()),
StructField("user_id", StringType()),
StructField("event_type", StringType()),
StructField("event_time", TimestampType()),
])

# Basic Kafka to Delta streaming
df = (spark
.readStream
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("subscribe", "topic")
.option("subscribe", "events")
.option("startingOffsets", "earliest")
.option("minPartitions", "6")
.load()
.select(from_json(col("value").cast("string"), schema).alias("data"))
.select("data.*")
Expand All @@ -26,40 +62,186 @@ df = (spark
df.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/Volumes/catalog/checkpoints/stream") \
.option("checkpointLocation", "/Volumes/catalog/schema/checkpoints/events") \
.trigger(processingTime="30 seconds") \
.start("/delta/target_table")
.toTable("catalog.schema.bronze_events")
```

## Quick Start: foreachBatch MERGE (Upserts)

```python
from delta.tables import DeltaTable

def upsert_batch(batch_df, batch_id):
target = DeltaTable.forName(spark, "catalog.schema.customers")
(target.alias("t")
.merge(batch_df.alias("s"), "t.customer_id = s.customer_id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute())

(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("subscribe", "customer-updates")
.load()
.select(from_json(col("value").cast("string"), customer_schema).alias("data"))
.select("data.*")
.writeStream
.foreachBatch(upsert_batch)
.option("checkpointLocation", "/Volumes/catalog/schema/checkpoints/customers")
.trigger(processingTime="1 minute")
.start()
)
```

## Quick Start: availableNow (Scheduled Streaming)

```python
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "/Volumes/catalog/schema/schemas/events")
.load("/Volumes/catalog/schema/landing/events/")
.writeStream
.format("delta")
.option("checkpointLocation", "/Volumes/catalog/schema/checkpoints/events")
.trigger(availableNow=True)
.toTable("catalog.schema.bronze_events")
)
```

Schedule via Databricks Jobs every 15 minutes for near-real-time at a fraction of continuous cost.

---

## Trigger Selection Guide

| Latency Requirement | Trigger | Cost | Use Case |
|---------------------|---------|------|----------|
| Sub-second (ms) | `realTime="5 minutes"` | $$$ | Fraud detection, real-time personalization (DBR 16.4+) |
| 1–30 seconds | `processingTime="N seconds"` | $$ | Near real-time dashboards |
| 15–60 minutes | `availableNow=True` (scheduled) | $ | Batch-style SLA |
| > 1 hour | `availableNow=True` (scheduled) | $ | ETL pipelines |

See [trigger-and-cost-optimization.md](trigger-and-cost-optimization.md) for detailed cost calculations and cluster sizing.

---

## Watermark Essentials

Watermarks are **required** for stateful operations (joins, aggregations, deduplication) to bound state and handle late data.

```python
df.withWatermark("event_time", "10 minutes")
```

| Watermark | Effect | Use Case |
|-----------|--------|----------|
| `"5 minutes"` | Low latency, tight state | Real-time analytics |
| `"10 minutes"` | Moderate latency | General streaming |
| `"1 hour"` | High completeness | Financial transactions |
| `"24 hours"` | Batch-like completeness | Backfill scenarios |

**Rule of thumb**: Start with 2–3x your p95 event latency. Monitor late data rate and adjust.

See [stateful-operations.md](stateful-operations.md) for RocksDB configuration, state monitoring, and advanced patterns.

---

## Stream Join Patterns

### Stream-Stream Join

Both sides must have watermarks. Use time-range conditions to bound state:

```python
orders = spark.readStream.table("catalog.schema.orders") \
.withWatermark("order_time", "10 minutes")

payments = spark.readStream.table("catalog.schema.payments") \
.withWatermark("payment_time", "10 minutes")

joined = orders.join(payments,
expr("""
order_id = payment_order_id
AND payment_time >= order_time
AND payment_time <= order_time + INTERVAL 1 HOUR
"""),
"inner"
)
```

## Core Patterns
See [stream-stream-joins.md](stream-stream-joins.md) for left outer joins, multiple join keys, and monitoring.

| Pattern | Description | Reference |
|---------|-------------|-----------|
| **Kafka Streaming** | Kafka to Delta, Kafka to Kafka, Real-Time Mode | See [kafka-streaming.md](kafka-streaming.md) |
| **Stream Joins** | Stream-stream joins, stream-static joins | See [stream-stream-joins.md](stream-stream-joins.md), [stream-static-joins.md](stream-static-joins.md) |
| **Multi-Sink Writes** | Write to multiple tables, parallel merges | See [multi-sink-writes.md](multi-sink-writes.md) |
| **Merge Operations** | MERGE performance, parallel merges, optimizations | See [merge-operations.md](merge-operations.md) |
### Stream-Static Join

## Configuration
Use broadcast hints for small dimension tables:

| Topic | Description | Reference |
|-------|-------------|-----------|
| **Checkpoints** | Checkpoint management and best practices | See [checkpoint-best-practices.md](checkpoint-best-practices.md) |
| **Stateful Operations** | Watermarks, state stores, RocksDB configuration | See [stateful-operations.md](stateful-operations.md) |
| **Trigger & Cost** | Trigger selection, cost optimization, RTM | See [trigger-and-cost-optimization.md](trigger-and-cost-optimization.md) |
```python
from pyspark.sql.functions import broadcast

## Best Practices
dim_products = spark.table("catalog.schema.products")

| Topic | Description | Reference |
|-------|-------------|-----------|
| **Production Checklist** | Comprehensive best practices | See [streaming-best-practices.md](streaming-best-practices.md) |
enriched = stream_df.join(
broadcast(dim_products),
"product_id",
"left"
)
```

See [stream-static-joins.md](stream-static-joins.md) for refresh strategies and cache invalidation.

---

## Checkpoint Best Practices

- **Always use UC Volumes** for checkpoint storage: `/Volumes/catalog/schema/volume/checkpoints/stream_name`
- **One checkpoint per stream** — never share checkpoints between streams
- **Never delete checkpoints** of a running stream — this resets offsets
- **Fixed-size clusters** — autoscaling causes task redistribution issues with streaming

See [checkpoint-best-practices.md](checkpoint-best-practices.md) for migration, recovery, and cleanup patterns.

---

## Production Checklist

- [ ] Checkpoint location is persistent (UC volumes, not DBFS)
- [ ] Checkpoint location is persistent (UC Volumes, not DBFS)
- [ ] Unique checkpoint per stream
- [ ] Fixed-size cluster (no autoscaling for streaming)
- [ ] Monitoring configured (input rate, lag, batch duration)
- [ ] Exactly-once verified (txnVersion/txnAppId)
- [ ] Watermark configured for stateful operations
- [ ] Left joins for stream-static (not inner)
- [ ] Trigger interval explicitly set (default processes micro-batches as fast as possible, which is expensive)
- [ ] Monitoring configured (input rate, processing rate, batch duration)
- [ ] Watermark configured for all stateful operations
- [ ] Schema defined explicitly (not inferred) for Kafka sources
- [ ] `minPartitions` set to match Kafka partition count
- [ ] Error handling in foreachBatch (idempotent writes)
- [ ] Exactly-once verified (txnVersion/txnAppId for foreachBatch MERGE)

See [streaming-best-practices.md](streaming-best-practices.md) for the full beginner-to-expert checklist.

---

## Common Issues

| Issue | Cause | Solution |
|-------|-------|----------|
| **Increasing batch duration** | State store growing unbounded | Add or reduce watermark duration; enable RocksDB |
| **High S3/ADLS listing costs** | No trigger interval set | Always set `processingTime` or `availableNow` |
| **Duplicate records** | Missing deduplication in MERGE | Use `dropDuplicates` or add dedup logic in foreachBatch |
| **Stream-static join stale data** | Static DataFrame cached at start | Restart stream periodically or use Delta change feed |
| **Checkpoint corruption** | Cluster terminated mid-write | Delete last incomplete batch folder; restart stream |
| **OOM on state operations** | In-memory state store too large | Switch to RocksDB state store provider |
| **Late data dropped** | Watermark too aggressive | Increase watermark duration; monitor late event rate |

## Related Skills

- **[databricks-spark-declarative-pipelines](../databricks-spark-declarative-pipelines/SKILL.md)** — higher-level streaming with DLT/SDP (streaming tables, Auto Loader)
- **[databricks-jobs](../databricks-jobs/SKILL.md)** — scheduling `availableNow` streaming jobs
- **[databricks-unity-catalog](../databricks-unity-catalog/SKILL.md)** — checkpoint storage in UC Volumes, system tables for monitoring

## Resources

- [Structured Streaming Programming Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)
- [Databricks Structured Streaming Docs](https://docs.databricks.com/en/structured-streaming/index.html)
- [Real-Time Mode](https://docs.databricks.com/en/structured-streaming/real-time.html)
Loading