A production-ready AI workflow orchestration platform. Build durable, event-driven workflows with automatic retries, step memoization, and LLM integration.
- Durable Execution: Steps are memoized and checkpointed. If a workflow fails and restarts, completed steps won't re-execute.
- Event-Driven: Trigger workflows from events, webhooks, or cron schedules.
- AI-Native: Built-in
step.ai()for LLM calls with automatic retries and model routing via LiteLLM. - Flow Control: Concurrency limiting, rate limiting, throttling, and debouncing.
- Multi-Tenant: Fair queue with tenant isolation for SaaS workloads.
- Role-Based Access: Admin, Member, and Viewer roles with granular permissions.
- Developer Experience: CLI for local development with hot reload and event simulation.
FlowForge uses a client-server architecture where your workflow code runs on workers, while the central server handles orchestration.
┌─────────────────────────────────────────────────────────────────┐
│ Your Application │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Next.js │ │ FastAPI │ │ Cron Job │ │
│ │ Frontend │ │ Backend │ │ │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ └──────────────────┼──────────────────┘ │
│ │ send events │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ FlowForge Server │ │
│ │ ┌─────────┐ ┌─────────┐ ┌──────────┐ ┌───────────────┐ │ │
│ │ │ API │ │ Queue │ │ Runner │ │ Executor │ │ │
│ │ │ :8000 │ │ (Redis) │ │ │ │ │ │ │
│ │ └─────────┘ └─────────┘ └──────────┘ └───────┬───────┘ │ │
│ └──────────────────────────────────────────────────┼──────────┘ │
│ │ │
│ invoke function │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Your Worker(s) │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ @flowforge.function("process-order") │ │ │
│ │ │ async def process_order(ctx): │ │ │
│ │ │ await step.run("validate", ...) │ │ │
│ │ │ await step.ai("fraud-check", ...) │ │ │
│ │ └─────────────────────────────────────────────────────┘ │ │
│ │ :8080 │ │
│ └─────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
The Flow:
- Your app sends an event →
client.sendEvent("order/created", {...}) - FlowForge Server receives it and matches to registered functions
- Server calls your Worker's
/invokeendpoint - Worker executes your Python code, step by step
- Each step result is saved (durable execution)
- If worker crashes, server retries from last checkpoint
pip install flowforge-sdkfrom flowforge import FlowForge, Context, step
# Initialize with server connection
flowforge = FlowForge(
app_id="my-app", # Your application identifier
api_url="http://localhost:8000", # FlowForge server URL
api_key="ff_live_xxx", # API key for authentication (optional)
signing_key="sk_xxx", # Request signing key (optional)
)| Parameter | Description | Default |
|---|---|---|
app_id |
Unique identifier for your application | Required |
api_url |
URL of the FlowForge server | FLOWFORGE_API_URL env or http://localhost:8000 |
api_key |
API key for authentication (ff_live_xxx) | FLOWFORGE_API_KEY env or None |
signing_key |
Key for HMAC request signing | FLOWFORGE_SIGNING_KEY env or None |
Environment Variables:
export FLOWFORGE_API_URL=http://localhost:8000
export FLOWFORGE_API_KEY=ff_live_xxx
export FLOWFORGE_SIGNING_KEY=sk_xxx
# For worker mode:
export FLOWFORGE_SERVER_URL=http://localhost:8000 # Alias for FLOWFORGE_API_URL
export FLOWFORGE_WORKER_URL=http://localhost:8080/api/flowforge@flowforge.function(
id="process-order",
trigger=flowforge.trigger.event("order/created"),
retries=3,
)
async def process_order(ctx: Context) -> dict:
order = ctx.event.data
# Step 1: Validate (memoized)
validation = await step.run("validate", validate_order, order)
# Step 2: AI fraud check
fraud_check = await step.ai(
"fraud-check",
model="gpt-4o",
prompt=f"Check order {order['id']} for fraud..."
)
# Step 3: Process payment
payment = await step.run("payment", process_payment, order)
# Step 4: Wait before confirmation
await step.sleep("delay", "30s")
# Step 5: Send email
await step.run("confirm", send_email, order)
return {"status": "completed", "order_id": order["id"]}For development, use the CLI to run everything locally:
# Install CLI
pip install flowforge-cli
# Start dev server (runs server + executes functions locally)
flowforge dev .
# Send a test event
flowforge send order/created -d '{"id": "123", "customer": "Alice"}'Create agent-based functions via API that run directly on the server:
# Create a serverless function via API
curl -X POST http://localhost:8000/api/v1/functions/inline \
-H "Content-Type: application/json" \
-d '{
"function_id": "support-agent",
"name": "Support Agent",
"trigger_type": "event",
"trigger_value": "ticket/created",
"system_prompt": "You are a helpful support agent...",
"tools": ["web_search", "send_email"],
"agent_config": {
"model": "gpt-4o",
"max_iterations": 10
}
}'No worker deployment needed — the server executes the agent internally using the configured tools.
In production, run the server separately and connect workers:
Start the server:
docker-compose up -dRun your worker:
# main.py
from flowforge import FlowForge, Context, step
flowforge = FlowForge(app_id="my-app")
@flowforge.function(id="process-order", ...)
async def process_order(ctx: Context):
...
# Start as a worker - connects to the server
if __name__ == "__main__":
flowforge.work(
server_url="http://flowforge-server:8000", # Central server
host="0.0.0.0",
port=8080, # Worker listens here
worker_url="http://my-worker:8080/api/flowforge", # How server reaches us
)What work() does:
- Starts a FastAPI server on your machine (port 8080)
- Registers your functions with the central server
- Exposes
/api/flowforge/invokefor the server to call - Handles function execution when invoked
Environment variables for workers:
export FLOWFORGE_SERVER_URL=http://flowforge-server:8000
export FLOWFORGE_WORKER_URL=http://my-worker:8080/api/flowforge| Step | Description |
|---|---|
step.run(id, fn, *args) |
Execute a function with memoization |
step.sleep(id, duration) |
Pause execution ("5s", "1h", "30m") |
step.ai(id, model=, prompt=) |
LLM call with retry |
step.wait_for_event(id, event=, match=) |
Pause until a matching event arrives |
step.invoke(id, function_id=, data=) |
Call another FlowForge function |
step.send_event(id, name=, data=) |
Emit an event |
Trigger workflows from your Next.js or Node.js app:
npm install flowforge-clientimport { createClient } from 'flowforge-client';
// Create client (Supabase-style API)
const ff = createClient('http://localhost:8000', {
apiKey: 'ff_live_xxx', // Optional: API key for authentication
});
// Send an event to trigger a workflow
const { data, error } = await ff.events.send('order/created', {
order_id: '123',
customer: 'Alice',
});
if (error) {
console.error('Failed:', error.message);
} else {
console.log('Triggered runs:', data.runs);
}
// Wait for a run to complete
const { data: run } = await ff.runs.waitFor(data.runs[0].id, {
timeout: 60000
});
console.log('Output:', run.output);See packages/flowforge-client-ts for full documentation.
FlowForge supports two authentication methods:
Access the dashboard with email/password. Three roles are available:
| Role | Permissions |
|---|---|
| Admin | Full access: manage users, API keys, functions, tools, everything |
| Member | Create/edit functions, tools, send events, view runs, manage approvals |
| Viewer | Read-only: view runs, events, functions, tools (no create/edit/delete) |
Create the first admin:
# Using Docker (recommended)
docker exec flowforge-server flowforge-create-admin -e admin@example.com -p 'your-secure-password' -n 'Admin User'
# Using the server CLI (local development)
flowforge-server create-admin --email admin@example.com --password secret123
# Or the standalone command
flowforge-create-admin -e admin@example.com -p secret123 -n "Admin User"Users can optionally enable two-factor authentication for added security:
- Go to Settings → Security in the dashboard
- Click Enable 2FA
- Scan the QR code with an authenticator app (Google Authenticator, Authy, 1Password, etc.)
- Enter the 6-digit verification code
- Save your backup codes in a safe place
When 2FA is enabled, users must enter a verification code from their authenticator app after their password during login. Backup codes can be used for recovery if access to the authenticator is lost.
Requirements: 2FA requires the FLOWFORGE_ENCRYPTION_KEY environment variable to be set for encrypting TOTP secrets.
For SDK and server-to-server authentication:
# Key format: ff_{type}_{random}
# Types: live (production), test (development), ro (read-only)
# Example: ff_live_a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6Create API keys via the dashboard (Settings → API Keys) or API.
Using API keys in the SDK:
# Python SDK
flowforge = FlowForge(
app_id="my-app",
api_url="http://localhost:8000",
api_key="ff_live_xxx", # For authentication
)
# Requests are sent with the key in the header:
# X-FlowForge-API-Key: ff_live_xxx// TypeScript client
const ff = createClient('http://localhost:8000', {
apiKey: 'ff_live_xxx',
});flowforge/
├── packages/
│ ├── flowforge-sdk/ # Python SDK
│ ├── flowforge-cli/ # CLI tool
│ └── flowforge-client-ts/ # TypeScript client
├── server/ # Orchestration server (FastAPI)
├── dashboard/ # Admin dashboard (Next.js)
├── examples/ # Example workflows
└── tests/ # Test suites
- Python 3.11+
- Node.js 18+ (for dashboard)
- Docker (for PostgreSQL and Redis)
# Start infrastructure
docker-compose up -d
# Install SDK in development mode
pip install -e "packages/flowforge-sdk[all]"
pip install -e packages/flowforge-cli
pip install -e server
# Run tests
pytest
# Start dashboard
cd dashboard && pnpm install && pnpm devCreate a .env file:
# Database
DATABASE_URL=postgresql://flowforge:flowforge@localhost:5432/flowforge
# Redis
REDIS_URL=redis://localhost:6379
# AI (for step.ai)
OPENAI_API_KEY=sk-...
ANTHROPIC_API_KEY=sk-ant-...
# Authentication & Security
FLOWFORGE_JWT_SECRET=your-jwt-secret-change-in-production
FLOWFORGE_ENCRYPTION_KEY=your-encryption-key # Required for 2FA and storing AI provider keys
# SDK/Worker configuration
FLOWFORGE_SERVER_URL=http://localhost:8000
FLOWFORGE_EVENT_KEY=ff_live_xxx
FLOWFORGE_SIGNING_KEY=sk_xxx
FLOWFORGE_WORKER_URL=http://localhost:8080/api/flowforgeGenerate an encryption key:
python -c "import secrets; print(secrets.token_urlsafe(32))"┌─────────────────────────────────────────────────────────────┐
│ FlowForge Server │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. Event arrives at /api/v1/events │
│ │ │
│ ▼ │
│ 2. Server matches event to registered functions │
│ │ │
│ ▼ │
│ 3. Job enqueued to fair queue (Redis) │
│ │ │
│ ▼ │
│ 4. Executor dequeues and calls worker's /invoke endpoint │
│ │ │
└────────────────────┼────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Your Worker │
├─────────────────────────────────────────────────────────────┤
│ │
│ 5. Worker executes your function │
│ │ │
│ ▼ │
│ 6. On step.* call → returns control to server │
│ (raises StepCompleted exception) │
│ │ │
│ ▼ │
│ 7. Step result saved, function re-enqueued │
│ │ │
│ ▼ │
│ 8. Repeat until function returns or fails │
│ │
└─────────────────────────────────────────────────────────────┘
Steps raise control flow exceptions (StepCompleted) to yield control back to the server, enabling durable execution across restarts.
MIT