Getting Started with PyWorkflow: Build Your First Distributed Workflow
Learn how to build your first distributed workflow with PyWorkflow. This comprehensive guide covers installation, core concepts, practical examples, and best practices for workflow orchestration.
Getting Started with PyWorkflow
What is Workflow Orchestration and Why Does It Matter?
Workflow orchestration is the practice of coordinating multiple automated tasks across business applications and services to help ensure seamless execution. Unlike simple workflow automation, which focuses on automating individual tasks, workflow orchestration creates a connected framework where these automated tasks interact efficiently, follow a logical sequence, and integrate with other systems to achieve complete end-to-end business processes.
The Problem Traditional Approaches Can't Solve
Why Manual Workflow Management Fails
Building reliable, long-running distributed processes is one of the hardest problems in modern software engineering. Traditional approaches require you to manually handle:
- Server restarts mid-process — Your workflow stops when infrastructure fails, and you must rebuild state from scratch
- Retry logic without duplicating work — Implementing retries safely across distributed systems is notoriously difficult; get it wrong and you process the same request twice
- Long pauses without holding connections — Sleeping for hours or days while holding database connections and memory is wasteful and doesn't scale
- Complete auditability and tracking — Understanding what happened at each step requires building custom logging infrastructure
- Scaling across machines — Distributing work across multiple servers introduces coordination complexity that's easy to get wrong
These challenges force teams to either build custom infrastructure (expensive and error-prone) or use legacy workflow systems (rigid and hard to extend).
How Workflow Orchestration Solves These Problems
Workflow orchestration platforms like PyWorkflow abstract away these complexities. They provide:
- Automatic failure recovery — Your workflows resume automatically from the exact point of failure
- Built-in retry mechanisms — Configurable retry strategies with exponential backoff prevent duplicate processing
- Zero-resource suspension — Sleep without holding connections or memory; the orchestrator persists your state
- Complete event sourcing — Every state change is recorded as an immutable event, providing full auditability
- Horizontal scaling — Distribute work across workers automatically; the framework handles coordination
The Evolution of Workflow Orchestration
Historical Context and Why It Matters Today
Workflow orchestration isn't new—enterprises have used it for decades in systems like IBM Workflow or SAP Process Orchestration. However, these legacy systems were:
- Expensive — Required significant licensing fees and consulting costs
- Inflexible — Built on proprietary languages and workflows, hard to extend
- Slow to develop — GUI-based design tools made iteration tedious
- Tightly coupled — Difficult to integrate with modern cloud services and microservices
The emergence of open-source orchestration frameworks (Airflow, Prefect, Temporal) and cloud-native approaches has democratized workflow orchestration. Modern tools let you define workflows in code, integrate seamlessly with cloud services, and scale from small scripts to enterprise-grade systems.
What is PyWorkflow and How Does It Work?
PyWorkflow's Core Architecture
PyWorkflow is a distributed workflow orchestration framework built specifically for Python developers. It enables you to build complex, long-running business processes as simple, readable Python code while the framework handles all the infrastructure complexity.
Key Architectural Components
The Workflow Definition Layer — You write workflows as Python async functions, using familiar language features. PyWorkflow parses your code to understand task dependencies and execution order.
The Execution Engine — The engine manages workflow instances, persisting state after each step. When a worker crashes, the engine resumes from the last successful checkpoint.
The Message Broker — Tasks are queued and distributed to available workers via Redis or RabbitMQ. The broker ensures tasks are processed exactly once, even if workers fail.
The State Store — Event sourcing captures every state transition as immutable events. This provides complete auditability and enables replay and recovery.
The Monitoring Layer — Real-time dashboards and APIs let you track workflow progress, identify bottlenecks, and debug issues.
Why PyWorkflow Stands Out
Comparison with Alternative Approaches
| Feature | PyWorkflow | Apache Airflow | Celery | Temporal | Manual Implementation |
|---|---|---|---|---|---|
| Language | Python-native | Python DAGs | Python | Multi-language | Any |
| Fault Tolerance | Built-in, automatic | Manual handling | Manual | Built-in | Not included |
| State Management | Event sourcing | Database state | In-memory | Durable execution | Custom code |
| Retry Logic | Configurable, automatic | Manual | Manual | Automatic | Manual |
| Learning Curve | Gentle | Moderate | Steep | Moderate | Steep |
| Scaling | Horizontal, automatic | Horizontal, manual | Horizontal, manual | Horizontal, automatic | Not scalable |
| Zero-Resource Sleep | Yes | No | No | Yes | No |
| Observability | Built-in | Good | Limited | Excellent | None |
Why Choose PyWorkflow for Your Next Project
PyWorkflow is optimized for a specific use case: Python developers building distributed workflows that need automatic fault tolerance and event sourcing without the operational overhead of systems like Airflow.
Best for:
- Microservices that coordinate across systems
- Order processing and fulfillment pipelines
- Multi-step user onboarding flows
- Long-running background jobs with checkpoints
- Event-driven automation
Not ideal for:
- Simple scheduled cron jobs (use scheduling libraries)
- Massive ETL pipelines (use Airflow or Spark)
- Real-time streaming (use Kafka or Flink)
Installation and Initial Setup
System Requirements and Prerequisites
Before installing PyWorkflow, ensure you have:
- Python 3.8+ — PyWorkflow requires modern Python async support
- Redis 6.0+ or RabbitMQ 3.8+ — For the message broker (Redis is recommended for development)
- pip — Python package manager (included with Python 3.4+)
- Git (optional) — For cloning example repositories
Step-by-Step Installation
Installing PyWorkflow
The simplest installation method is via pip:
pip install pyworkflowFor development with the latest features:
git clone https://github.com/yasha-dev1/pyworkflow.git
cd pyworkflow
pip install -e .Setting Up Your Message Broker
Option 1: Redis (Recommended for Development)
Install Redis:
# macOS
brew install redis
# Ubuntu/Debian
sudo apt-get install redis-server
# Docker
docker run -d -p 6379:6379 redis:latestStart Redis:
redis-serverOption 2: RabbitMQ (Recommended for Production)
Install RabbitMQ:
# macOS
brew install rabbitmq
# Ubuntu/Debian
sudo apt-get install rabbitmq-server
# Docker
docker run -d -p 5672:5672 -p 15672:15672 rabbitmq:3-managementStart RabbitMQ:
rabbitmq-serverVerifying Your Installation
Create a simple test file to verify everything works:
# test_setup.py
from pyworkflow import workflow, step
@step()
async def test_step():
return "PyWorkflow is installed correctly!"
@workflow()
async def test_workflow():
result = await test_step()
return result
if __name__ == "__main__":
from pyworkflow import start
run_id = start(test_workflow)
print(f"Test workflow started: {run_id}")Run it:
python test_setup.pyIf you see the workflow ID printed, your installation is successful!
Core Concepts: Understanding PyWorkflow's Building Blocks
Steps: The Atomic Unit of Work
What Are Steps and Why They Matter
A step is the smallest unit of work in a PyWorkflow workflow. It's a single async function that performs one logical operation and can be retried independently.
Key characteristics:
- Atomic — Either fully succeeds or fully fails; no partial completion
- Idempotent — Safe to retry multiple times with the same inputs and get the same result
- Traceable — Every execution is logged with timing and results
- Resilient — Automatically retried on transient failures
Defining Your First Step
from pyworkflow import step
import logging
logger = logging.getLogger(__name__)
@step()
async def send_welcome_email(user_id: str, user_email: str):
"""
Send a welcome email to a new user.
This step demonstrates:
- Type hints for clarity
- Logging for observability
- Idempotent design (safe to retry)
"""
logger.info(f"Sending welcome email to {user_email}")
# In a real system, this would call your email service
email_sent = await send_email(
to=user_email,
subject="Welcome to Our Service!",
template="welcome"
)
logger.info(f"Welcome email sent to {user_email}")
return {"status": "sent", "email": user_email}Configuring Step Behavior
Steps support configuration for error handling and resource management:
@step(
max_retries=3, # Retry up to 3 times on failure
retry_delay=5, # Wait 5 seconds before first retry
retry_backoff=2, # Double the delay each retry (5s, 10s, 20s)
timeout=30, # Fail if step takes >30 seconds
name="send_email_step" # Custom name for monitoring
)
async def send_email_with_retries(email: str, subject: str):
"""Send email with automatic retries and timeout protection."""
return await email_service.send(email, subject)Understanding Step Idempotency
Idempotency is crucial for distributed systems. A step is idempotent if calling it multiple times with the same inputs produces the same result and has the same side effects as calling it once.
# ✓ IDEMPOTENT - Safe to retry
@step()
async def create_user_record(user_id: str, email: str):
"""Create or return existing user record."""
user = await db.find_user(user_id)
if user:
return user # Already exists, return it
return await db.create_user(user_id, email)
# ✗ NOT IDEMPOTENT - Unsafe to retry
@step()
async def increment_counter(counter_id: str):
"""Increment a counter - unsafe to retry!"""
current = await db.get(counter_id)
await db.set(counter_id, current + 1) # Retrying doubles the increment
return current + 1
# ✓ IDEMPOTENT - Better version
@step()
async def increment_counter_safely(counter_id: str):
"""Increment counter with idempotency check."""
current = await db.get(counter_id)
new_value = current + 1
# Use conditional update to ensure we only increment once
success = await db.compare_and_set(counter_id, current, new_value)
return new_valueWorkflows: Orchestrating Steps into Processes
What Are Workflows
A workflow is an async function decorated with @workflow() that orchestrates multiple steps into a cohesive process. Workflows define the execution order, data flow, and control logic for your business process.
Key characteristics:
- Composable — Workflows can call other workflows
- Durable — State is persisted after each step
- Observable — Complete execution history is recorded
- Recoverable — Failed workflows resume from the last successful step
Your First Workflow: User Onboarding
from pyworkflow import workflow, step, sleep
@step()
async def create_user_account(email: str, name: str):
"""Create a new user account in the system."""
user = await db.users.create(email=email, name=name)
return {"user_id": user.id, "email": email}
@step()
async def send_welcome_email(user_id: str, email: str):
"""Send welcome email to the new user."""
await email_service.send(
to=email,
template="welcome",
context={"user_id": user_id}
)
return {"status": "sent"}
@step()
async def schedule_onboarding_call(user_id: str):
"""Schedule an onboarding call with the user."""
call = await calendar.schedule_call(
user_id=user_id,
duration_minutes=30,
template="onboarding"
)
return {"call_id": call.id}
@step()
async def send_tips_email(user_id: str, email: str):
"""Send helpful tips email after user has had time to explore."""
await email_service.send(
to=email,
template="tips",
context={"user_id": user_id}
)
return {"status": "sent"}
@workflow()
async def user_onboarding_workflow(email: str, name: str):
"""
Complete user onboarding workflow.
Steps:
1. Create user account
2. Send welcome email
3. Schedule onboarding call
4. Wait 3 days
5. Send helpful tips email
"""
# Step 1: Create the account
user_result = await create_user_account(email, name)
user_id = user_result["user_id"]
# Step 2: Send welcome email immediately
await send_welcome_email(user_id, email)
# Step 3: Schedule onboarding call
call_result = await schedule_onboarding_call(user_id)
# Step 4: Wait 3 days - zero resources consumed!
# The workflow is suspended and resumed automatically after 3 days
await sleep("3d")
# Step 5: Send tips email after user has explored
await send_tips_email(user_id, email)
return {
"status": "onboarding_complete",
"user_id": user_id,
"call_id": call_result["call_id"]
}
# Start a workflow instance
if __name__ == "__main__":
from pyworkflow import start
run_id = start(
user_onboarding_workflow,
email="alice@example.com",
name="Alice Smith"
)
print(f"Onboarding workflow started: {run_id}")Understanding Workflow State and Durability
When you call a step from within a workflow, PyWorkflow:
- Serializes the step call — Converts function arguments to JSON for storage
- Executes the step — Runs the step function, potentially on a different worker
- Persists the result — Stores the step result as an immutable event
- Resumes the workflow — Continues with the next step using the stored result
This process ensures that if any worker crashes, the workflow can resume from exactly where it left off.
@workflow()
async def resilient_workflow():
# Step 1 executes
result1 = await step1() # Stored: step1 returned result1
# If the worker crashes here, the next worker will:
# 1. Load the workflow state
# 2. See that step1 already completed
# 3. Skip step1 and use the stored result1
# 4. Continue with step2
result2 = await step2(result1) # Stored: step2 returned result2
return {"final": result2}Event Sourcing: The Foundation of Durability
What is Event Sourcing and Why It's Powerful
Event sourcing is an architectural pattern where every change to application state is captured as an immutable event. Instead of storing only the current state, you store the complete history of events that led to that state.
Benefits:
- Complete auditability — You can see exactly what happened and when
- Replay capability — Replay events to recover from failures or investigate issues
- Temporal queries — Ask "what was the state at time T?"
- Debugging — Understand the exact sequence of operations that led to a failure
How PyWorkflow Uses Event Sourcing
# When you run a workflow, PyWorkflow stores events like:
[
{
"type": "workflow_started",
"workflow_id": "onboarding_abc123",
"timestamp": "2024-04-20T10:00:00Z",
"input": {"email": "alice@example.com", "name": "Alice"}
},
{
"type": "step_started",
"step_name": "create_user_account",
"timestamp": "2024-04-20T10:00:01Z"
},
{
"type": "step_completed",
"step_name": "create_user_account",
"timestamp": "2024-04-20T10:00:02Z",
"result": {"user_id": "user_123", "email": "alice@example.com"}
},
{
"type": "step_started",
"step_name": "send_welcome_email",
"timestamp": "2024-04-20T10:00:03Z"
},
{
"type": "step_completed",
"step_name": "send_welcome_email",
"timestamp": "2024-04-20T10:00:05Z",
"result": {"status": "sent"}
},
{
"type": "workflow_sleeping",
"duration": "3d",
"timestamp": "2024-04-20T10:00:06Z"
}
// ... more events ...
]Querying Workflow History
from pyworkflow import get_workflow_history
# Get complete event history
history = get_workflow_history("onboarding_abc123")
# See all steps and their results
for event in history:
if event["type"] == "step_completed":
print(f"Step {event['step_name']} completed: {event['result']}")
# Calculate total time
start_time = history[0]["timestamp"]
end_time = history[-1]["timestamp"]
total_duration = end_time - start_time
print(f"Total workflow duration: {total_duration}")Building Your First Real Workflow: Order Processing
See the full example in the original blog post - this is a comprehensive 200+ line example demonstrating all core concepts.
Best Practices for Production Workflows
Design Principles
1. Keep Steps Focused and Composable
Each step should do one thing well. This makes them:
- Easier to test
- Simpler to retry
- More reusable across workflows
# ✓ GOOD - Focused steps
@step()
async def validate_email(email: str):
# Only validates email format
pass
@step()
async def check_email_not_registered(email: str):
# Only checks if email is already registered
pass
# ✗ AVOID - Too many responsibilities
@step()
async def validate_and_register_user(email: str, name: str):
# Validates, checks registration, AND registers
# Hard to retry, hard to test
pass2. Use Meaningful Names for Observability
Your step and workflow names appear in logs and dashboards. Make them descriptive.
3. Make Steps Idempotent
Always design steps to be safe for retry.
4. Handle Errors Gracefully
Plan for failures and handle them explicitly.
5. Log Important Events
Logging helps with debugging and monitoring.
Scaling and Monitoring
Horizontal Scaling
As your workload grows, simply add more workers. All workers automatically distribute tasks, and PyWorkflow handles coordination.
Monitoring Workflow Performance
Track metrics like total runs, success rate, average duration, and failure counts.
Common Use Cases and Patterns
- Multi-Step User Onboarding
- Order Processing and Fulfillment
- Data Pipeline Orchestration
- Event-Driven Automation
Next Steps and Further Learning
Explore the PyWorkflow Documentation, join the community, and build your first workflow today.
Conclusion
PyWorkflow makes building reliable, distributed workflows simple. Start with a simple workflow in your domain and gradually add complexity as you become comfortable with the patterns.
The workflows you build today will be the foundation of scalable, reliable systems tomorrow. Happy workflow building! 🚀