Back to Blog
advancedpatternsparallel-executionworkflowsdistributed-systems

Advanced Workflow Patterns in PyWorkflow

Master advanced patterns like parallel execution, conditional branching, dynamic task generation, and circuit breakers in PyWorkflow for building sophisticated distributed systems.

PyWorkflow Team16 min read

Advanced Workflow Patterns in PyWorkflow

Introduction: Moving Beyond Basic Workflows

Once you've mastered the basics of PyWorkflow, it's time to explore advanced patterns that unlock the full power of distributed workflow orchestration. These patterns solve real-world problems: coordinating work across multiple systems, handling conditional logic, generating tasks dynamically, and building resilient systems that gracefully degrade under failure.

This guide covers the patterns that separate simple workflows from sophisticated, production-grade systems. Each pattern is grounded in real-world use cases and includes detailed code examples you can adapt to your needs.

What Are Workflow Patterns and Why They Matter?

Understanding Pattern-Based Architecture

A workflow pattern is a reusable solution to a common problem in distributed systems. Rather than writing custom code for each scenario, patterns provide proven architectures that:

  • Reduce complexity — Patterns handle edge cases you might not anticipate
  • Improve maintainability — Team members recognize familiar patterns
  • Enable scaling — Patterns are designed to scale from small to large workloads
  • Ensure reliability — Patterns encode lessons learned from production failures

Pattern Categories in PyWorkflow

Pattern Category Purpose Complexity Use When
Parallel Execution Run independent tasks simultaneously Medium Multiple operations can happen at once
Conditional Branching Choose different paths based on data Low Workflow behavior depends on input/state
Dynamic Task Generation Create tasks based on runtime data High Number/type of tasks unknown upfront
Error Handling Recover from failures gracefully Medium Failures are expected and recoverable
Circuit Breaking Prevent cascading failures High Calling external services that might fail
Saga Pattern Distributed transactions High Multi-step changes across systems
Workflow Composition Combine multiple workflows Medium Workflows share common sub-processes

Parallel Execution: Coordinating Independent Operations

Why Parallel Execution Matters

In distributed systems, parallelism is essential for performance. If tasks are independent, running them sequentially wastes resources and time. Parallel execution allows you to:

  • Reduce total workflow time — Independent tasks run concurrently instead of one-by-one
  • Utilize resources efficiently — Workers can process multiple tasks across the cluster
  • Scale throughput — More workers automatically increase parallelism

Understanding Parallelism in PyWorkflow

PyWorkflow automatically parallelizes independent steps. When you call multiple steps without depending on each other's results, they execute in parallel:

@workflow()
async def parallel_workflow():
    # These three steps run in parallel
    # PyWorkflow distributes them to available workers
    result1 = await step1()  # Worker A
    result2 = await step2()  # Worker B  
    result3 = await step3()  # Worker C
    
    # Execution waits here for all three to complete
    # Then continues with the results
    return await aggregate_results(result1, result2, result3)

Pattern 1: Fan-Out / Fan-In

The fan-out / fan-in pattern distributes work across multiple parallel tasks (fan-out), then combines the results (fan-in).

Use Cases

  • Data processing — Process chunks of data in parallel
  • Batch operations — Process multiple items simultaneously
  • Aggregation — Combine results from multiple sources
  • Report generation — Gather data from multiple systems

Implementation

from pyworkflow import workflow, step
from typing import List, Dict

@step()
async def process_chunk(chunk_id: int, data: list) -> Dict:
    """
    Process a single chunk of data.
    
    In a real system, this might:
    - Apply ML models
    - Transform data
    - Compute statistics
    """
    print(f"Processing chunk {chunk_id} with {len(data)} items")
    
    # Simulate processing
    result = {
        "chunk_id": chunk_id,
        "count": len(data),
        "sum": sum(data),
        "avg": sum(data) / len(data) if data else 0
    }
    
    return result

@step()
async def aggregate_results(results: List[Dict]) -> Dict:
    """
    Combine results from all chunks.
    
    This step receives the results from all parallel tasks
    and combines them into a final result.
    """
    total_count = sum(r["count"] for r in results)
    total_sum = sum(r["sum"] for r in results)
    
    return {
        "chunks_processed": len(results),
        "total_items": total_count,
        "total_sum": total_sum,
        "overall_average": total_sum / total_count if total_count > 0 else 0,
        "chunk_results": results
    }

@workflow()
async def parallel_data_processing(data: list, chunk_size: int = 100):
    """
    Process large dataset in parallel chunks.
    
    This workflow:
    1. Splits data into chunks
    2. Processes each chunk in parallel
    3. Aggregates the results
    
    Performance: O(n/workers) instead of O(n)
    """
    # Fan-out: Create parallel tasks for each chunk
    chunks = [
        data[i:i + chunk_size]
        for i in range(0, len(data), chunk_size)
    ]
    
    # Execute all chunks in parallel
    chunk_results = []
    for chunk_id, chunk in enumerate(chunks):
        result = await process_chunk(chunk_id, chunk)
        chunk_results.append(result)
    
    # Fan-in: Aggregate all results
    final_result = await aggregate_results(chunk_results)
    
    return final_result

Performance Considerations

Scenario Sequential Time Parallel Time Speedup
10 chunks, 1 second each 10s 1s 10x
100 chunks, 100ms each 10s 1s 10x
1000 chunks, 10ms each 10s 1s 10x

The speedup depends on the number of available workers. With N workers, you can achieve up to N-fold speedup.

Pattern 2: Parallel Branches with Convergence

Sometimes you need to execute different branches in parallel, then converge on a common result. See the full example in the original blog for detailed implementation.

Conditional Branching: Making Decisions in Workflows

Why Conditional Logic is Essential

Real-world workflows rarely follow a single path. They make decisions based on:

  • Input data — Different processing for different inputs
  • Intermediate results — Different paths based on step outcomes
  • External state — Different actions based on system state

Pattern 3: Simple Conditional Branching

The simplest pattern uses Python's if statements to choose between paths:

@step()
async def validate_order(order: Dict) -> Dict:
    """Validate order and return validation result."""
    errors = []
    
    if not order.get("customer_id"):
        errors.append("Missing customer_id")
    if not order.get("items"):
        errors.append("No items in order")
    if order.get("total", 0) <= 0:
        errors.append("Invalid total")
    
    return {
        "valid": len(errors) == 0,
        "errors": errors
    }

@step()
async def process_valid_order(order: Dict) -> Dict:
    """Process an order that passed validation."""
    # Charge payment, reserve inventory, etc.
    return {"status": "processed", "order_id": order["id"]}

@step()
async def handle_invalid_order(order: Dict, errors: List[str]) -> Dict:
    """Handle an order that failed validation."""
    # Log error, notify customer, etc.
    return {
        "status": "rejected",
        "order_id": order["id"],
        "reason": "; ".join(errors)
    }

@workflow()
async def order_workflow_with_branching(order: Dict) -> Dict:
    """
    Process order with conditional branching.
    
    Flow:
    1. Validate order
    2. If valid: process order
    3. If invalid: handle error
    """
    validation = await validate_order(order)
    
    if validation["valid"]:
        result = await process_valid_order(order)
    else:
        result = await handle_invalid_order(order, validation["errors"])
    
    return result

Pattern 4: Multi-Way Branching

For more complex decisions, use multiple branches based on classification. See the full blog for detailed implementation.

Dynamic Task Generation: Creating Tasks at Runtime

Why Dynamic Generation is Powerful

Sometimes you don't know how many tasks to create until runtime. Dynamic generation allows:

  • Flexible workloads — Handle variable-sized jobs
  • Scalability — Process any number of items
  • Responsiveness — Generate tasks based on real-time data

Pattern 5: Map Pattern (Generate and Execute)

The map pattern generates a task for each item in a collection. See the full blog for detailed implementation.

Pattern 6: Filter-Map-Reduce

Combine filtering, mapping, and reduction for complex data processing. See the full blog for detailed implementation.

Error Handling and Resilience Patterns

Pattern 7: Retry with Exponential Backoff

Automatic retries are essential for handling transient failures:

@step(
    max_retries=5,           # Retry up to 5 times
    retry_delay=1,           # Start with 1 second delay
    retry_backoff=2,         # Double the delay each time
    max_retry_delay=60       # Cap at 60 seconds
)
async def call_external_api(endpoint: str, data: Dict) -> Dict:
    """
    Call external API with automatic retries.
    
    Retry delays: 1s, 2s, 4s, 8s, 16s
    
    This handles transient failures:
    - Network timeouts
    - Temporary service unavailability
    - Rate limiting (with backoff)
    """
    import aiohttp
    
    async with aiohttp.ClientSession() as session:
        async with session.post(endpoint, json=data) as response:
            if response.status >= 500:
                # Transient error, will be retried
                raise Exception(f"Server error: {response.status}")
            return await response.json()

@workflow()
async def resilient_api_workflow(endpoint: str) -> Dict:
    """Workflow that calls external APIs with resilience."""
    result = await call_external_api(endpoint, {"data": "test"})
    return result

Pattern 8: Circuit Breaker

Prevent cascading failures by stopping requests when a service is failing. See the full blog for detailed implementation.

Workflow Composition: Building Complex Systems

Pattern 9: Workflow Composition

Compose multiple workflows together for complex business logic. See the full blog for detailed implementation.

Performance Optimization Patterns

Pattern 10: Caching for Expensive Operations

Cache expensive computations to avoid redundant work. See the full blog for detailed implementation.

Monitoring and Debugging Advanced Workflows

Observability Best Practices

See the full blog for comprehensive observability patterns and examples.

Best Practices Summary

Practice Benefit Example
Keep patterns simple Easier to understand and debug Use basic fan-out/fan-in before complex patterns
Measure performance Identify bottlenecks Log timing for each step
Test failure paths Ensure resilience Mock service failures in tests
Use meaningful names Better observability parallel_order_processing not workflow1
Document assumptions Prevent misuse Comment on idempotency requirements
Monitor in production Catch issues early Track success rate and duration

Conclusion

Advanced workflow patterns are the building blocks of sophisticated, production-grade distributed systems. By mastering these patterns, you can:

  • Build scalable systems — Handle growing workloads
  • Ensure reliability — Gracefully handle failures
  • Maintain code quality — Use proven solutions
  • Debug efficiently — Understand what's happening

Start with simple patterns and gradually adopt more complex ones as your needs grow. The PyWorkflow documentation and community examples provide additional patterns and real-world use cases.

Build with confidence. Your workflows will be reliable, observable, and ready for production. 🚀

Get the latest PyWorkflow articles

Subscribe to our newsletter for tutorials, best practices, and updates on distributed workflow orchestration.