- Activity Choice
- Parallel Activity
- Cancellation
- Continue-as-New
- Long-running Entity
- Concurrent Timers
- Child Workflow
- Exceptions
- Local Activity
- Authenticate using mTLS
- Custom Metrics
- Encryption
- Polling (frequent)
- Polling (infrequent)
- Schedule
- Pydantic Converter
The Activity Choice pattern enables workflows to dynamically select and execute different activities based on input conditions. This pattern is useful for conditional processing where different business logic needs to be applied based on data values.
Key Implementation:
- Use conditional logic (
if/elif/else) to select activity functions - Enables flexible workflow execution paths while maintaining deterministic replay behavior.
- Each activity handles specific business logic for different cases
from enum import IntEnum
from typing import List
from temporalio import activity, workflow
# Import Pydantic safely for workflow use
with workflow.unsafe.imports_passed_through():
from pydantic import BaseModel
class Fruit(IntEnum):
APPLE = 1
BANANA = 2
CHERRY = 3
class ShoppingItem(BaseModel):
fruit: Fruit
amount: int
class ShoppingList(BaseModel):
items: List[ShoppingItem]
@activity.defn
def order_apples(amount: int) -> str:
return f"Ordered {amount} Apples..."
@activity.defn
def order_bananas(amount: int) -> str:
return f"Ordered {amount} Bananas..."
@workflow.defn
class PurchaseFruitsWorkflow:
@workflow.run
async def run(self, shopping_list: ShoppingList) -> str:
ordered = []
for item in shopping_list.items:
if item.fruit is Fruit.APPLE:
order_function = order_apples
elif item.fruit is Fruit.BANANA:
order_function = order_bananas
else:
raise ValueError(f"Unrecognized fruit: {item.fruit}")
result = await workflow.execute_activity(
order_function,
item.amount,
start_to_close_timeout=timedelta(seconds=5)
)
ordered.append(result)
return "".join(ordered)The Parallel Activity pattern enables workflows to execute multiple activities concurrently, improving performance when activities are independent and can run simultaneously. This pattern uses asyncio.gather() to coordinate parallel execution.
Key Implementation:
- Use
asyncio.gather()to execute multiple activities concurrently - Activities run independently and can complete in any order
- Results are collected and can be processed after all activities complete
- Significantly reduces total execution time for independent operations
import asyncio
from temporalio import activity, workflow
@activity.defn
def say_hello_activity(name: str) -> str:
return f"Hello, {name}!"
@workflow.defn
class SayHelloWorkflow:
@workflow.run
async def run(self) -> List[str]:
# Run 5 activities concurrently
results = await asyncio.gather(
workflow.execute_activity(
say_hello_activity,
"user1",
start_to_close_timeout=timedelta(seconds=5)
),
workflow.execute_activity(
say_hello_activity,
"user2",
start_to_close_timeout=timedelta(seconds=5)
),
workflow.execute_activity(
say_hello_activity,
"user3",
start_to_close_timeout=timedelta(seconds=5)
),
)
# Sort results since completion order is non-deterministic
return list(sorted(results))The Cancellation pattern enables workflows to gracefully handle cancellation requests while performing cleanup operations. Long-running activities must heartbeat to receive cancellation signals, and workflows can use try/finally blocks to ensure cleanup activities execute.
Key Implementation:
- Activities use
activity.heartbeat()to heartbeat long-running Activities and receive cancellation signals - Handle
CancelledErrorin activities for graceful shutdown - Use
try/finallyblocks in workflows to guarantee cleanup execution - Set appropriate heartbeat timeouts for long-running activities
from temporalio import activity, workflow
from temporalio.exceptions import CancelledError
@activity.defn
def never_complete_activity() -> None:
try:
while True:
print("Heartbeating activity")
activity.heartbeat() # Required for cancellation delivery
time.sleep(1)
except CancelledError:
print("Activity cancelled")
raise
@activity.defn
def cleanup_activity() -> None:
print("Executing cleanup activity")
@workflow.defn
class CancellationWorkflow:
@workflow.run
async def run(self) -> None:
try:
await workflow.execute_activity(
never_complete_activity,
start_to_close_timeout=timedelta(seconds=1000),
heartbeat_timeout=timedelta(seconds=2), # Critical for cancellation
)
finally:
# Cleanup always executes, even on cancellation
await workflow.execute_activity(
cleanup_activity,
start_to_close_timeout=timedelta(seconds=5)
)The Continue-as-New pattern enables workflows to reset their execution history while preserving state, preventing unbounded history growth in long-running or looping workflows. This creates a new workflow execution with the same Workflow ID but fresh Event History.
Key Implementation:
- Use
workflow.continue_as_new()to restart workflow with new parameters - Design workflow parameters to include current state for continuation
- Check
workflow.info().is_continue_as_new_suggested()for Continue-as-New timing - Avoid calling Continue-as-New from Update / Signal handlers. Use Workflow wait conditions to ensure your handler completes before a Workflow finishes.
- Essential for preventing Event History limits and performance degradation
from typing import Optional
from temporalio import workflow
# Import Pydantic safely for workflow use
with workflow.unsafe.imports_passed_through():
from pydantic import BaseModel
class WorkflowState(BaseModel):
iteration: int = 0
processed_items: int = 0
class WorkflowInput(BaseModel):
state: Optional[WorkflowState] = None
max_iterations: int = 1000
@workflow.defn
class LongRunningWorkflow:
@workflow.run
async def run(self, input: WorkflowInput) -> None:
# Initialize or restore state
self.state = input.state or WorkflowState()
while self.state.iteration < input.max_iterations:
# Perform work
await self.process_batch()
self.state.iteration += 1
# Check if Continue-as-New is suggested
if workflow.info().is_continue_as_new_suggested():
await workflow.wait_condition(workflow.all_handlers_finished)
workflow.continue_as_new(
WorkflowInput(
state=self.state,
max_iterations=input.max_iterations
)
)
return
workflow.logger.info("Completed all %d iterations", input.max_iterations)
async def process_batch(self):
# Simulate work
await asyncio.sleep(0.1)
self.state.processed_items += 10The Long-running Entity pattern enables workflows to model stateful entities that persist over long periods and respond to external events. This pattern uses signals, queries, and updates to manage entity state while leveraging Continue-as-New to prevent unbounded history growth.
Key Implementation:
- Use
@workflow.queryto expose entity state for external inspection - Use
@workflow.signalto modify entity state asynchronously - Use
@workflow.updateto modify entity state with validation and return values - Implement
while Trueloop withworkflow.wait_condition()for entity lifecycle management - Use Continue-as-New when
workflow.info().is_continue_as_new_suggested()returnsTrue - Use
await workflow.wait_condition(workflow.all_handlers_finished)to wait for all handlers to finish before Workflow termination and Continue-as-New - Handle entity termination through state flags and conditional logic
- Essential for modeling business entities like user accounts, orders, digital twins, or long-running processes
from datetime import datetime, timedelta
from temporalio import workflow
from temporalio.common import RetryPolicy
@workflow.defn
class CustomerRewardAccount:
"""Long-running entity workflow for managing customer reward accounts."""
def __init__(self):
# Entity state maintained throughout workflow lifetime
self._level: CustomerRewardLevel = CustomerRewardLevel.BASIC
self._points: int = 0
self._is_active: bool = True
self._user_id: str | None = None
self._create_time: datetime | None = None
self._cancel_time: datetime | None = None
self._update_count: int = 0
@workflow.run
async def run(self, inp: CustomerRewardAccountInput) -> CustomerRewardAccountStatus:
"""Initialize entity and enter long-running loop."""
workflow.logger.info("Creating reward account for %s", inp.user_id)
self._create_time = workflow.now()
# Initialize entity with external data
user: UserInfo = await workflow.execute_activity(
get_user,
inp.user_id,
start_to_close_timeout=timedelta(seconds=1),
)
self._user_id = user.id
workflow.logger.info(
"Reward account for %s created at %s",
self._user_id,
self._create_time
)
# Long-running entity loop
while True:
# Wait for entity termination or update count threshold
await workflow.wait_condition(
lambda: not self._is_active or workflow.info().is_continue_as_new_suggested()
)
# Handle entity termination
if not self._is_active:
workflow.logger.info(
"Terminating reward account for %s at %s",
self._user_id,
self._cancel_time,
)
await workflow.wait_condition(workflow.all_handlers_finished)
return CustomerRewardAccountStatus(
level=self._level,
points=self._points,
is_active=self._is_active,
)
# Handle Continue-as-New for history management
if workflow.info().is_continue_as_new_suggested():
await workflow.wait_condition(workflow.all_handlers_finished)
workflow.continue_as_new(
CustomerRewardAccountInput(
user_id=self._user_id,
starting_points=self._points,
starting_level=self._level,
)
)
@workflow.query
def query_reward_status(self) -> CustomerRewardAccountStatus:
"""Query handler to expose current entity state."""
return CustomerRewardAccountStatus(
level=self._level,
points=self._points,
is_active=self._is_active,
)
@workflow.update
async def cancel(self) -> CustomerRewardAccountStatus:
"""Update handler to terminate the entity."""
self._is_active = False
self._cancel_time = workflow.now()
return CustomerRewardAccountStatus(
level=self._level,
points=self._points,
is_active=self._is_active,
)
@workflow.update
async def add_points(self, inp: AddPointInput) -> CustomerRewardAccountStatus:
"""Update handler to modify entity state with business logic."""
workflow.logger.info("Adding points for %s by %i", self._user_id, inp.points)
self._update_count += 1
self._points += inp.points
self._points = max(self._points, 0) # Prevent negative points
# Update derived state based on business rules
if 500 <= self._points < 1000:
self._level = CustomerRewardLevel.GOLD
elif self._points >= 1000:
self._level = CustomerRewardLevel.PLATINUM
else:
self._level = CustomerRewardLevel.BASIC
return CustomerRewardAccountStatus(
level=self._level,
points=self._points,
is_active=self._is_active,
)
@add_points.validator
def validate_add_point(self, inp: AddPointInput) -> None:
"""Validate update input before processing."""
if not isinstance(inp.points, int):
raise ValueError("Points must be an integer.")Benefits:
- Stateful Processing: Maintains complex entity state across long periods
- Real-time Interaction: Responds immediately to queries and updates
- History Management: Uses Continue-as-New to prevent unbounded growth
- Strong Consistency: All state changes are durably persisted
- Event Sourcing: Complete audit trail of all entity state changes
The Concurrent Timers pattern enables workflows to manage multiple independent timers that can fire at different intervals. This pattern uses asyncio.create_task() with workflow.wait() to efficiently handle multiple concurrent timers, making it ideal for scheduling recurring events with different frequencies.
Key Implementation:
- Use
asyncio.create_task(asyncio.sleep())to create independent timer tasks - Use
workflow.wait()withasyncio.FIRST_COMPLETEDto wait for any timer to complete - Reset completed timers by creating new tasks with the same intervals
- Handle multiple timer events in a single workflow execution loop
- Essential for managing recurring events with different schedules
import asyncio
from datetime import timedelta
from temporalio import activity, workflow
@activity.defn
async def send_notification(user_email: str, message: str) -> None:
# Replace this stub with your actual notification logic:
# e.g., email, Slack, push, etc.
activity.logger.info(f"[Notify] {user_email}: {message}")
@workflow.defn
class MaintenanceWorkflow:
@workflow.run
async def run(self, user_email: str) -> None:
# Create concurrent timers for different maintenance events
oil_change_timer = asyncio.create_task(
asyncio.sleep(timedelta(months=3).total_seconds()) # Every 3 months
)
inspection_timer = asyncio.create_task(
asyncio.sleep(timedelta(days=365).total_seconds()) # Yearly
)
coolant_timer = asyncio.create_task(
asyncio.sleep(timedelta(months=24).total_seconds()) # Every 2 years
)
while True:
# Wait for whichever timer fires first
done, _ = await workflow.wait(
{oil_change_timer, inspection_timer, coolant_timer},
return_when=asyncio.FIRST_COMPLETED,
)
# Handle oil change timer
if oil_change_timer in done:
workflow.logger.info("Time for oil change")
await workflow.execute_activity(
send_notification,
user_email,
"Your car needs an oil change",
start_to_close_timeout=timedelta(seconds=30),
)
# Reset timer for next oil change
oil_change_timer = asyncio.create_task(
asyncio.sleep(timedelta(months=3).total_seconds())
)
# Handle inspection timer
if inspection_timer in done:
workflow.logger.info("Time for car inspection")
await workflow.execute_activity(
send_notification,
user_email,
"Your car needs an annual inspection",
start_to_close_timeout=timedelta(seconds=30),
)
# Reset timer for next inspection
inspection_timer = asyncio.create_task(
asyncio.sleep(timedelta(days=365).total_seconds())
)
# Handle coolant timer
if coolant_timer in done:
workflow.logger.info("Time for coolant change")
await workflow.execute_activity(
send_notification,
user_email,
"Your car needs a coolant system service",
start_to_close_timeout=timedelta(seconds=30),
)
# Reset timer for next coolant change
coolant_timer = asyncio.create_task(
asyncio.sleep(timedelta(months=24).total_seconds())
)The Child Workflow pattern enables workflows to spawn and manage other workflow executions as children, providing composition and modularity. Child workflows run independently but are tracked in the parent's Event History, enabling complex orchestration patterns.
Key Implementation:
- Use
workflow.execute_child_workflow()to start and wait for completion - Use
workflow.start_child_workflow()to start and get handle for advanced control - Set
parent_close_policyto control child behavior when parent closes - Child workflow events are logged in parent's Event History
- In general, Activity or chain of Activity can be used in place of Child Workflows. If possible, it is recommended to use Activity instead of Child Workflows
from temporalio import workflow
from temporalio.workflow import ParentClosePolicy
# Import Pydantic safely for workflow use
with workflow.unsafe.imports_passed_through():
from pydantic import BaseModel
class ComposeGreetingInput(BaseModel):
greeting: str
name: str
@workflow.defn
class ComposeGreetingWorkflow:
"""Child workflow that composes a greeting message."""
@workflow.run
async def run(self, input: ComposeGreetingInput) -> str:
return f"{input.greeting}, {input.name}!"
@workflow.defn
class GreetingWorkflow:
"""Parent workflow that orchestrates child workflows."""
@workflow.run
async def run(self, name: str) -> str:
# Execute child workflow and wait for completion
return await workflow.execute_child_workflow(
ComposeGreetingWorkflow.run,
ComposeGreetingInput("Hello", name),
id="greeting-child-workflow-id",
parent_close_policy=ParentClosePolicy.ABANDON,
)
# Advanced usage with handle
@workflow.defn
class AdvancedParentWorkflow:
@workflow.run
async def run(self, name: str) -> str:
# Start child workflow and get handle
handle = await workflow.start_child_workflow(
ComposeGreetingWorkflow.run,
ComposeGreetingInput("Hi", name),
id="advanced-child-workflow-id",
)
# Can signal the child or perform other operations
workflow.logger.info(f"Started child: {handle.id}")
# Wait for completion
return await handleThe Exception pattern demonstrates proper error handling in Temporal workflows, including activity failures, retry policies, and exception propagation. Temporal wraps exceptions in specific error types that preserve stack traces and failure details for debugging.
Key Implementation:
- Activities can raise exceptions that propagate through workflows
- Use
RetryPolicyto configure automatic retry behavior for failed activities - Handle
WorkflowFailureErrorwhen executing workflows from clients - Exceptions maintain causality chain: WorkflowFailureError → ActivityError → ApplicationError
from datetime import timedelta
from temporalio import activity, workflow
from temporalio.client import WorkflowFailureError
from temporalio.common import RetryPolicy
from temporalio.exceptions import FailureError
# Import Pydantic safely for workflow use
with workflow.unsafe.imports_passed_through():
from pydantic import BaseModel
class ProcessingInput(BaseModel):
data: str
should_fail: bool = False
@activity.defn
def process_data(input: ProcessingInput) -> str:
if input.should_fail:
# Activity raises exception
raise RuntimeError(f"Processing failed for: {input.data}")
return f"Processed: {input.data}"
@workflow.defn
class DataProcessingWorkflow:
@workflow.run
async def run(self, data: str) -> str:
return await workflow.execute_activity(
process_data,
ProcessingInput(data, should_fail=True),
start_to_close_timeout=timedelta(seconds=10),
# Configure retry behavior
retry_policy=RetryPolicy(maximum_attempts=3),
)
# Client-side exception handling
async def execute_with_error_handling():
try:
result = await client.execute_workflow(
DataProcessingWorkflow.run,
"test-data",
id="exception-workflow-id",
task_queue="exception-task-queue",
)
except WorkflowFailureError as err:
# Enhance error with stack trace
append_temporal_stack(err)
logger.exception("Workflow execution failed")
raise
def append_temporal_stack(exc: BaseException) -> None:
"""Helper to append Temporal stack traces to exception messages."""
while exc:
if (isinstance(exc, FailureError) and exc.failure and
exc.failure.stack_trace and "\\nStack:\\n" not in str(exc)):
exc.args = (f"{exc}\\nStack:\\n{exc.failure.stack_trace.rstrip()}",)
exc = exc.__cause__The Local Activity pattern enables workflows to execute activities directly within the worker process without task queue scheduling. Local activities provide lower latency and reduced overhead for short-duration operations, but sacrifice some of Temporal's durability guarantees.
Key Implementation:
- Use
workflow.execute_local_activity()instead ofworkflow.execute_activity() - Activities run in the same worker process as the workflow
- Lower latency and reduced network overhead compared to regular activities
- Limited retry capabilities and no cross-worker execution
- Best for fast, lightweight operations that don't require full durability
- Avoid Local Acitivty for external API calls, long-running operations, operations requiring durability
from datetime import timedelta
from temporalio import activity, workflow
# Import Pydantic safely for workflow use
with workflow.unsafe.imports_passed_through():
from pydantic import BaseModel
class ProcessingInput(BaseModel):
greeting: str
name: str
@activity.defn
def compose_greeting(input: ProcessingInput) -> str:
"""Fast local activity for simple string processing."""
return f"{input.greeting}, {input.name}!"
@activity.defn
def validate_input(data: str) -> bool:
"""Quick validation that runs locally."""
return len(data.strip()) > 0
@workflow.defn
class LocalActivityWorkflow:
@workflow.run
async def run(self, name: str) -> str:
# Execute local activity for fast processing
result = await workflow.execute_local_activity(
compose_greeting,
ProcessingInput("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)
# Chain multiple local activities
is_valid = await workflow.execute_local_activity(
validate_input,
result,
start_to_close_timeout=timedelta(seconds=5),
)
return result if is_valid else "Invalid result"The mTLS (mutual Transport Layer Security) pattern enables secure authentication between Temporal clients/workers and the Temporal server using client certificates. This provides strong authentication and encryption for production deployments.
Key Implementation:
- Use
TLSConfigto configure client certificates and server CA validation - Load client certificate and private key from files
- Optionally specify server root CA certificate for validation
- Apply TLS configuration to both client connections and workers
- Essential for secure production Temporal deployments
import argparse
from typing import Optional
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.service import TLSConfig
from temporalio.worker import Worker
async def create_secure_client(
target_host: str = "localhost:7233",
namespace: str = "default",
server_root_ca_cert_path: Optional[str] = None,
client_cert_path: str = "client.crt",
client_key_path: str = "client.key"
) -> Client:
"""Create a Temporal client with mTLS authentication."""
# Load server root CA certificate (optional)
server_root_ca_cert: Optional[bytes] = None
if server_root_ca_cert_path:
with open(server_root_ca_cert_path, "rb") as f:
server_root_ca_cert = f.read()
# Load client certificate and private key (required)
with open(client_cert_path, "rb") as f:
client_cert = f.read()
with open(client_key_path, "rb") as f:
client_key = f.read()
# Create client with TLS configuration
return await Client.connect(
target_host,
namespace=namespace,
tls=TLSConfig(
server_root_ca_cert=server_root_ca_cert,
client_cert=client_cert,
client_private_key=client_key,
),
)
@workflow.defn
class SecureWorkflow:
@workflow.run
async def run(self, data: str) -> str:
return f"Securely processed: {data}"
# Usage example
async def main():
# Create secure client
client = await create_secure_client(
target_host="your-temporal-server:7233",
client_cert_path="/path/to/client.crt",
client_key_path="/path/to/client.key",
server_root_ca_cert_path="/path/to/server-ca.crt"
)
# Worker also uses the same secure client
async with Worker(
client,
task_queue="secure-task-queue",
workflows=[SecureWorkflow],
):
result = await client.execute_workflow(
SecureWorkflow.run,
"sensitive-data",
id="secure-workflow-id",
task_queue="secure-task-queue",
)
print(f"Result: {result}")The Custom Metrics pattern enables workflows and activities to emit custom telemetry data using Temporal's built-in metrics system. This pattern uses interceptors to capture timing data and Prometheus for metrics collection and monitoring.
Key Implementation:
- Use
RuntimewithTelemetryConfigto configure Prometheus metrics - Create interceptors to capture custom metrics during activity execution
- Use
activity.metric_meter()to create and record histogram metrics - Configure Prometheus endpoint for metrics collection
- Essential for monitoring workflow performance and business metrics
import asyncio
from concurrent.futures import ThreadPoolExecutor
from temporalio import activity
from temporalio.client import Client
from temporalio.runtime import PrometheusConfig, Runtime, TelemetryConfig
from temporalio.worker import (
ActivityInboundInterceptor,
ExecuteActivityInput,
Interceptor,
Worker,
)
class CustomMetricsInterceptor(Interceptor):
"""Interceptor to add custom metrics collection."""
def intercept_activity(
self, next: ActivityInboundInterceptor
) -> ActivityInboundInterceptor:
return ActivityMetricsInterceptor(next)
class ActivityMetricsInterceptor(ActivityInboundInterceptor):
"""Captures activity scheduling and execution metrics."""
async def execute_activity(self, input: ExecuteActivityInput):
# Calculate schedule-to-start latency
schedule_to_start = (
activity.info().started_time -
activity.info().current_attempt_scheduled_time
)
# Create custom histogram metric
meter = activity.metric_meter()
latency_histogram = meter.create_histogram_timedelta(
"activity_schedule_to_start_latency",
description="Time between activity scheduling and start",
unit="duration",
)
# Record metric with labels
latency_histogram.record(
schedule_to_start,
{
"workflow_type": activity.info().workflow_type,
"activity_type": activity.info().activity_type,
}
)
# Create business metrics
counter = meter.create_counter_int(
"activity_executions_total",
description="Total number of activity executions",
)
counter.add(1, {"status": "started"})
try:
result = await self.next.execute_activity(input)
counter.add(1, {"status": "completed"})
return result
except Exception as e:
counter.add(1, {"status": "failed"})
raise
async def create_metrics_worker():
"""Create worker with custom metrics configuration."""
# Configure Prometheus metrics
runtime = Runtime(
telemetry=TelemetryConfig(
metrics=PrometheusConfig(bind_address="0.0.0.0:9090")
)
)
# Create client with metrics runtime
client = await Client.connect("localhost:7233", runtime=runtime)
# Create worker with custom interceptor
return Worker(
client,
task_queue="metrics-task-queue",
interceptors=[CustomMetricsInterceptor()],
workflows=[MyWorkflow],
activities=[my_activity],
activity_executor=ThreadPoolExecutor(2),
)
# Metrics are available at http://localhost:9090/metrics
# Common metrics: activity latency, execution counts, error rates, business KPIsThe Encryption pattern enables end-to-end encryption of workflow and activity payloads using custom payload codecs. This ensures sensitive data is encrypted at rest and in transit, with only authorized workers able to decrypt the data.
Key Implementation:
- Create custom
PayloadCodecto handle encryption/decryption of payload data - Configure
data_converterwith custom codec for both clients and workers - Include key ID metadata for key rotation and management
- Essential for protecting sensitive data in workflows
import os
from typing import Iterable, List
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from temporalio.api.common.v1 import Payload
from temporalio.converter import DataConverter, PayloadCodec
import temporalio.converter
from temporalio.client import Client
from temporalio.worker import Worker
class EncryptionCodec(PayloadCodec):
"""Custom codec for encrypting/decrypting workflow payloads."""
def __init__(self, key_id: str = "production-key-id", key: bytes = None) -> None:
super().__init__()
self.key_id = key_id
if key is None:
# In production, load from secure key management system
key = os.environ.get("TEMPORAL_ENCRYPTION_KEY", "").encode()
self.encryptor = AESGCM(key)
async def encode(self, payloads: Iterable[Payload]) -> List[Payload]:
"""Encrypt all payloads with AES-GCM encryption."""
return [
Payload(
metadata={
"encoding": b"binary/encrypted",
"encryption-key-id": self.key_id.encode(),
},
data=self.encrypt(p.SerializeToString()),
)
for p in payloads
]
async def decode(self, payloads: Iterable[Payload]) -> List[Payload]:
"""Decrypt payloads, skipping non-encrypted ones."""
ret: List[Payload] = []
for p in payloads:
# Skip non-encrypted payloads
if p.metadata.get("encoding", b"").decode() != "binary/encrypted":
ret.append(p)
continue
# Verify key ID matches
key_id = p.metadata.get("encryption-key-id", b"").decode()
if key_id != self.key_id:
raise ValueError(f"Unknown key ID: {key_id}")
# Decrypt and append
ret.append(Payload.FromString(self.decrypt(p.data)))
return ret
def encrypt(self, data: bytes) -> bytes:
"""Encrypt data with random nonce."""
nonce = os.urandom(12) # 96-bit nonce for AES-GCM
return nonce + self.encryptor.encrypt(nonce, data, None)
def decrypt(self, data: bytes) -> bytes:
"""Decrypt data, extracting nonce from prefix."""
nonce = data[:12]
ciphertext = data[12:]
return self.encryptor.decrypt(nonce, ciphertext, None)
# Configure client and worker with encryption
async def create_encrypted_client() -> Client:
"""Create client with encryption codec."""
# Create data converter with custom encryption codec
default_converter = temporalio.converter.default()
encrypted_converter = DataConverter(
payload_converters=default_converter.payload_converters,
failure_converters=default_converter.failure_converters,
payload_codec=EncryptionCodec()
)
return await Client.connect(
"localhost:7233",
data_converter=encrypted_converter,
)
async def run_encrypted_worker():
"""Run worker with encryption support."""
client = await create_encrypted_client()
async with Worker(
client,
task_queue="encrypted-task-queue",
workflows=[MyWorkflow],
activities=[my_activity],
):
print("Encrypted worker running...")
# Worker processes encrypted payloads transparently
# Key management best practices:
# - Use environment variables or secure key management systems
# - Implement key rotation with multiple key IDs
# - Never hardcode encryption keys in source codeThe Frequent Polling pattern enables activities to continuously poll external services until a condition is met or data becomes available. This pattern uses heartbeating and exception handling to maintain resilience during polling operations.
Key Implementation:
- Use infinite loop with
while Truefor continuous polling - Implement
activity.heartbeat()to prevent activity timeouts during long polls - Handle exceptions gracefully to continue polling when services are temporarily unavailable
- Use
asyncio.sleep()for polling intervals to avoid overwhelming external services - Handle
asyncio.CancelledErrorfor proper cleanup when activity is cancelled
import asyncio
from temporalio import activity
from typing import Optional
@activity.defn
async def poll_external_service(service_url: str, poll_interval: int = 1) -> str:
"""Poll external service until data is available."""
while True:
try:
try:
# Attempt to get result from external service
result = await fetch_from_service(service_url)
if result is not None:
activity.logger.info(f"Service returned result: {result}")
return result
except Exception as e:
# Log but swallow exception - service may be temporarily down
activity.logger.debug(
f"Service call failed: {e}, retrying in {poll_interval}s",
exc_info=True
)
# Heartbeat to prevent activity timeout
activity.heartbeat(f"Polling service at {service_url}")
# Wait before next poll attempt
await asyncio.sleep(poll_interval)
except asyncio.CancelledError:
# Handle cancellation for cleanup
activity.logger.info("Polling activity cancelled")
# Perform any necessary cleanup here
raise
async def fetch_from_service(url: str) -> Optional[str]:
"""Simulate external service call."""
# Implementation would make actual HTTP request
# Return None if no data available, raise exception on error
pass
# Usage in workflow:
# result = await workflow.execute_activity(
# poll_external_service,
# "https://api.example.com/status",
# start_to_close_timeout=timedelta(minutes=30),
# heartbeat_timeout=timedelta(seconds=10),
# )The Infrequent Polling pattern uses Temporal's retry mechanism to poll external services at longer intervals without maintaining long-running activities. This pattern leverages activity failures and retry policies to achieve polling behavior efficiently.
Key Implementation:
- Use short activity timeouts with retry policies for polling intervals
- Configure
RetryPolicywith appropriateinitial_intervalfor polling frequency - Set
backoff_coefficient=1.0to maintain consistent polling intervals - Activities fail quickly and rely on Temporal's retry system for timing
- More efficient than long-running activities for infrequent polling needs
from datetime import timedelta
from temporalio import activity, workflow
from temporalio.common import RetryPolicy
# Import activities safely in workflow
with workflow.unsafe.imports_passed_through():
from my_app.activities import check_external_service
@activity.defn
async def poll_service_status(service_url: str) -> str:
"""Short-lived activity that checks service status."""
# Attempt to get result from external service
result = await fetch_service_status(service_url)
if result is None or not result.is_ready:
# Fail the activity to trigger retry
activity.logger.info("Service not ready, will retry")
raise RuntimeError("Service not ready")
activity.logger.info(f"Service is ready: {result.status}")
return result.status
@activity.defn
async def check_job_completion(job_id: str) -> bool:
"""Check if a long-running job has completed."""
job_status = await get_job_status(job_id)
if job_status.state in ["pending", "running"]:
activity.logger.info(f"Job {job_id} still {job_status.state}")
raise RuntimeError(f"Job not complete: {job_status.state}")
if job_status.state == "failed":
raise ValueError(f"Job {job_id} failed: {job_status.error}")
activity.logger.info(f"Job {job_id} completed successfully")
return True
@workflow.defn
class InfrequentPollingWorkflow:
@workflow.run
async def run(self, service_url: str) -> str:
# Poll every 60 seconds until service is ready
return await workflow.execute_activity(
poll_service_status,
service_url,
start_to_close_timeout=timedelta(seconds=5), # Short timeout
retry_policy=RetryPolicy(
backoff_coefficient=1.0, # No exponential backoff
initial_interval=timedelta(seconds=60), # Poll every 60 seconds
maximum_interval=timedelta(seconds=60), # Keep consistent
maximum_attempts=100, # Limit total attempts
),
)
@workflow.defn
class JobMonitoringWorkflow:
@workflow.run
async def run(self, job_id: str) -> bool:
# Check job completion every 5 minutes
return await workflow.execute_activity(
check_job_completion,
job_id,
start_to_close_timeout=timedelta(seconds=10),
retry_policy=RetryPolicy(
backoff_coefficient=1.0,
initial_interval=timedelta(minutes=5), # Check every 5 minutes
maximum_interval=timedelta(minutes=5),
maximum_attempts=48, # 4 hours maximum (48 * 5min)
),
)
# Helper functions (would be implemented based on your external services)
async def fetch_service_status(url: str):
"""Fetch status from external service."""
pass
async def get_job_status(job_id: str):
"""Get job status from external system."""
pass
# Benefits over frequent polling:
# - Lower resource usage (no long-running activities)
# - Leverages Temporal's built-in retry mechanism
# - Automatic failure handling and exponential backoff if needed
# - Better for polling intervals > 30 secondsThe Schedule pattern enables automatic execution of workflows at specified intervals or times using Temporal's built-in scheduling system. Schedules provide cron-like functionality with additional features like manual triggering, backfilling, and overlap policies.
Key Implementation:
- Use
client.create_schedule()to define recurring workflow executions - Configure
ScheduleSpecwith intervals, cron expressions, or calendars - Support manual triggering with
handle.trigger()for on-demand execution - Enable backfilling with
handle.backfill()to run missed executions - Control execution overlap with
ScheduleOverlapPolicysettings
import asyncio
from datetime import datetime, timedelta
from temporalio.client import (
Client,
Schedule,
ScheduleActionStartWorkflow,
ScheduleBackfill,
ScheduleIntervalSpec,
ScheduleOverlapPolicy,
ScheduleSpec,
ScheduleState,
)
from temporalio import workflow
@workflow.defn
class ScheduledWorkflow:
@workflow.run
async def run(self, data: str) -> str:
workflow.logger.info(f"Scheduled execution with data: {data}")
return f"Processed: {data}"
async def create_interval_schedule():
"""Create a schedule that runs every 2 minutes."""
client = await Client.connect("localhost:7233")
await client.create_schedule(
"interval-schedule-id",
Schedule(
action=ScheduleActionStartWorkflow(
ScheduledWorkflow.run,
"scheduled data", # Workflow arguments
id="scheduled-workflow-id",
task_queue="scheduled-task-queue",
),
spec=ScheduleSpec(
intervals=[ScheduleIntervalSpec(every=timedelta(minutes=2))]
),
state=ScheduleState(
note="Runs every 2 minutes",
paused=False, # Schedule is active
),
),
)
async def create_cron_schedule():
"""Create a schedule using cron expression."""
client = await Client.connect("localhost:7233")
await client.create_schedule(
"cron-schedule-id",
Schedule(
action=ScheduleActionStartWorkflow(
ScheduledWorkflow.run,
"daily report",
id="daily-report-workflow",
task_queue="reports-task-queue",
),
spec=ScheduleSpec(
cron_expressions=["0 9 * * MON-FRI"] # 9 AM weekdays
),
state=ScheduleState(note="Daily report generation"),
),
)
async def trigger_schedule_manually():
"""Manually trigger a scheduled workflow execution."""
client = await Client.connect("localhost:7233")
handle = client.get_schedule_handle("interval-schedule-id")
# Trigger immediate execution
await handle.trigger()
print("Schedule triggered manually")
async def backfill_schedule():
"""Backfill missed schedule executions."""
client = await Client.connect("localhost:7233")
handle = client.get_schedule_handle("interval-schedule-id")
now = datetime.utcnow()
await handle.backfill(
ScheduleBackfill(
start_at=now - timedelta(hours=2), # Backfill last 2 hours
end_at=now - timedelta(minutes=5), # Up to 5 minutes ago
overlap=ScheduleOverlapPolicy.ALLOW_ALL, # Allow overlapping executions
),
)
print("Schedule backfilled successfully")
async def manage_schedule():
"""Comprehensive schedule management example."""
client = await Client.connect("localhost:7233")
# Create schedule with multiple intervals
await client.create_schedule(
"complex-schedule-id",
Schedule(
action=ScheduleActionStartWorkflow(
ScheduledWorkflow.run,
"complex data",
id="complex-workflow-id",
task_queue="complex-task-queue",
),
spec=ScheduleSpec(
intervals=[
ScheduleIntervalSpec(every=timedelta(minutes=30)), # Every 30 min
],
cron_expressions=["0 0 * * SUN"], # Also weekly on Sunday
),
state=ScheduleState(
note="Mixed interval and cron schedule",
paused=False,
),
),
)
# Get handle for management operations
handle = client.get_schedule_handle("complex-schedule-id")
# Pause the schedule
await handle.pause(note="Pausing for maintenance")
# Resume the schedule
await handle.unpause(note="Maintenance complete")
# Update the schedule
async def updater(input):
input.schedule.state.note = "Updated schedule"
return input.schedule
await handle.update(updater)
# Common use cases:
# - Periodic data processing (ETL jobs)
# - Regular health checks and monitoring
# - Scheduled reports and notifications
# - Batch processing at off-peak hours
# - Cleanup and maintenance tasksThe Pydantic Converter pattern enables seamless serialization and deserialization of Pydantic models in Temporal workflows and activities. This provides type safety, validation, and rich data modeling capabilities while maintaining compatibility with Temporal's payload system.
Key Implementation:
- Use
pydantic_data_converterfor automatic Pydantic model serialization - Import Pydantic safely with
workflow.unsafe.imports_passed_through() - Configure both client and worker with the same data converter
- Leverage Pydantic's validation and type conversion features
- Essential for complex data structures and type safety
from datetime import datetime, timedelta
from ipaddress import IPv4Address
from typing import List
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker
# Import Pydantic safely for workflow use
with workflow.unsafe.imports_passed_through():
from pydantic import BaseModel, validator
from temporalio.contrib.pydantic import pydantic_data_converter
class UserData(BaseModel):
"""Simple user data with validation."""
user_id: int
ip_address: IPv4Address
@validator('user_id')
def validate_user_id(cls, v):
if v <= 0:
raise ValueError('user_id must be positive')
return v
@activity.defn
async def process_users(users: List[UserData]) -> int:
"""Process users and return count."""
activity.logger.info(f"Processing {len(users)} users")
for user in users:
# Pydantic ensures type safety and validation
activity.logger.info(f"Processing user {user.user_id} from {user.ip_address}")
return len(users)
@workflow.defn
class PydanticWorkflow:
@workflow.run
async def run(self, users: List[UserData]) -> int:
"""Process users with type-safe Pydantic models."""
return await workflow.execute_activity(
process_users,
users,
start_to_close_timeout=timedelta(minutes=1)
)
# Setup client and worker with Pydantic converter
async def main():
client = await Client.connect(
"localhost:7233",
data_converter=pydantic_data_converter
)
async with Worker(
client,
task_queue="pydantic-task-queue",
workflows=[PydanticWorkflow],
activities=[process_users],
):
# Execute workflow with validated Pydantic models
users = [
UserData(user_id=1, ip_address="192.168.1.1"),
UserData(user_id=2, ip_address="10.0.0.1")
]
result = await client.execute_workflow(
PydanticWorkflow.run,
users,
id="pydantic-workflow-id",
task_queue="pydantic-task-queue"
)
print(f"Processed {result} users")