Code examples
Eight end-to-end recipes plus three patterns. Each recipe shows the same task in cURL, Python, Bash, and PowerShell — pick the tab that matches your stack.
Every example assumes you've set ETLWORKS_URL (e.g. https://app.etlworks.com/rest) and ETLWORKS_API_KEY in your environment. See the Quickstart for getting these.
Run a flow and wait for completion
Trigger a flow, then poll the executions endpoint until it reaches a terminal status. The most common pattern in production scripts.
FlowExecutionRequest / FlowAuditRecordRun body is a flat map of string→string parameters — not wrapped in {"params": …}. Send {} if your flow takes no parameters. The response is a FlowExecutionResponse with flowId, auditId, and an int status code. To track progress, use auditId (not the int code) against GET /v1/executions/{flowId}?auditId={auditId}, which returns a FlowAuditRecord with a string status: queued, running, success, warning, error, or canceled.
FLOW_ID=12345
# Run the flow. Body is a flat map of parameters (or {} for none).
AUDIT_ID=$(curl -s -X POST "$ETLWORKS_URL/v1/flows/$FLOW_ID/run" \
-H "Authorization: Bearer $ETLWORKS_API_KEY" \
-H "Content-Type: application/json" \
-d '{"runDate":"2026-05-09"}' | jq -r '.auditId')
# Poll the audit record until status is terminal.
while :; do
STATUS=$(curl -s "$ETLWORKS_URL/v1/executions/$FLOW_ID?auditId=$AUDIT_ID" \
-H "Authorization: Bearer $ETLWORKS_API_KEY" | jq -r '.status')
echo " $STATUS"
case "$STATUS" in success|warning|error|canceled) break ;; esac
sleep 5
done
import os, time, requests
BASE = os.environ["ETLWORKS_URL"]
H = {"Authorization": f"Bearer {os.environ['ETLWORKS_API_KEY']}"}
TERMINAL = {"success", "warning", "error", "canceled"}
def run_and_wait(flow_id, parameters=None, timeout=900):
# Body is a flat map of {string: string} parameters (or {} for none).
r = requests.post(f"{BASE}/v1/flows/{flow_id}/run",
headers={**H, "Content-Type": "application/json"},
json=parameters or {})
r.raise_for_status()
audit_id = r.json()["auditId"] # NOT executionId
deadline = time.time() + timeout
while time.time() < deadline:
rec = requests.get(f"{BASE}/v1/executions/{flow_id}",
headers=H, params={"auditId": audit_id}).json()
if rec["status"] in TERMINAL:
return rec
time.sleep(5)
raise TimeoutError(f"Flow {flow_id} (audit {audit_id}) still running after {timeout}s")
print(run_and_wait(12345, parameters={"runDate": "2026-05-09"}))
run_and_wait () {
local flow_id="$1" body="${2:-{}}"
local audit_id status
audit_id=$(curl -s -X POST "$ETLWORKS_URL/v1/flows/$flow_id/run" \
-H "Authorization: Bearer $ETLWORKS_API_KEY" \
-H "Content-Type: application/json" -d "$body" | jq -r '.auditId')
while :; do
status=$(curl -s "$ETLWORKS_URL/v1/executions/$flow_id?auditId=$audit_id" \
-H "Authorization: Bearer $ETLWORKS_API_KEY" | jq -r '.status')
case "$status" in success|warning|error|canceled) echo "$status"; return ;; esac
sleep 5
done
}
run_and_wait 12345 '{"runDate":"2026-05-09"}'
function Invoke-Flow ($FlowId, $Parameters = @{}) {
$headers = @{ "Authorization" = "Bearer $env:ETLWORKS_API_KEY" }
# Body is the parameters map directly — not wrapped in another object.
$body = $Parameters | ConvertTo-Json -Compress
$started = Invoke-RestMethod -Method Post -Uri "$env:ETLWORKS_URL/v1/flows/$FlowId/run" `
-Headers $headers -ContentType "application/json" -Body $body
$auditId = $started.auditId
$terminal = "success","warning","error","canceled"
do {
Start-Sleep 5
$rec = Invoke-RestMethod -Uri "$env:ETLWORKS_URL/v1/executions/$FlowId`?auditId=$auditId" -Headers $headers
Write-Host " $($rec.status)"
} while ($rec.status -notin $terminal)
$rec
}
Invoke-Flow -FlowId 12345 -Parameters @{ runDate = "2026-05-09" }
Create a connection
Add a Postgres connection programmatically — useful when provisioning environments from CI/CD or Terraform-style scripts.
The connectionType discriminator and the keys inside properties are not arbitrary strings — they're defined per-connector in the platform's connector library (under src/main/resources/di/…/connections/<type>.json). For PostgreSQL, the type is connection.db.postgres and the property keys are field.connection.template.host, field.connection.template.port, field.connection.template.database, etc. The same pattern holds for every connector.
Easiest way to discover the right shape for any connector: create the connection once in the Etlworks UI, then GET /v1/connections/{id} to see the exact connectionType and properties keys it uses. Copy that shape into your script and parameterize the values.
curl -s -X POST "$ETLWORKS_URL/v1/connections" \
-H "Authorization: Bearer $ETLWORKS_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"name": "warehouse-prod",
"description": "Production warehouse",
"connectionType": "connection.db.postgres",
"properties": {
"field.connection.template.host": "warehouse.internal",
"field.connection.template.port": "5432",
"field.connection.template.database": "analytics",
"field.connection.user": "etlworks",
"field.connection.password": "{{secret:warehouse_pw}}"
},
"tags": ["production", "warehouse"]
}'
The {{secret:…}} placeholder pulls from your tenant's secret store at runtime — the literal value is never sent over the API. Test the connection without saving via POST /v1/connections/test with the same body.
Schedule a flow
Schedule an existing flow. The body matches the FlowSchedule model: flowId, name, the schedule string (the platform's scheduling syntax — the UI's schedule picker is the source of truth for valid expressions), and enabled. Note: pause/resume use /disable and /enable, not /pause//resume.
curl -s -X POST "$ETLWORKS_URL/v1/schedules" \
-H "Authorization: Bearer $ETLWORKS_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"name": "Daily warehouse refresh",
"description": "Refresh the warehouse from staging every day at 4am",
"flowId": 12345,
"schedule": "0 0 4 * * ?",
"enabled": true,
"tags": ["production"]
}'
# Pause:
curl -s -X PUT "$ETLWORKS_URL/v1/schedules/$SCHEDULE_ID/disable" \
-H "Authorization: Bearer $ETLWORKS_API_KEY"
# Resume:
curl -s -X PUT "$ETLWORKS_URL/v1/schedules/$SCHEDULE_ID/enable" \
-H "Authorization: Bearer $ETLWORKS_API_KEY"
Query the audit log
Pull every change made to flows in the last 7 days — useful for compliance reports or debugging "what changed?" questions.
from datetime import datetime, timedelta, timezone
since = (datetime.now(timezone.utc) - timedelta(days=7)).isoformat()
events = requests.get(
f"{BASE}/v1/audit",
headers=H,
params={"resourceType": "flow", "since": since, "limit": 200},
).json()
for e in events:
print(f"{e['timestamp']} {e['user']} {e['action']:<8} flow {e['resourceId']}")
Export and import (env promotion)
Export a flow plus its connections from staging, import into production. The /v1/io endpoints bundle resources into a self-contained payload you can store, diff, or apply to another tenant.
# Export flow + dependencies
curl -s -X POST "$ETLWORKS_URL/v1/io/export" \
-H "Authorization: Bearer $ETLWORKS_API_KEY" \
-H "Content-Type: application/json" \
-d '{"flowIds":[12345], "includeConnections":true}' \
> flow-bundle.json
# Import into another tenant (preview mode first)
curl -s -X POST "$ETLWORKS_PROD_URL/v1/io/import/preview" \
-H "Authorization: Bearer $PROD_API_KEY" \
-H "Content-Type: application/json" \
--data-binary @flow-bundle.json
# Apply if the preview looks right
curl -s -X POST "$ETLWORKS_PROD_URL/v1/io/import" \
-H "Authorization: Bearer $PROD_API_KEY" \
-H "Content-Type: application/json" \
--data-binary @flow-bundle.json
Bulk-tag resources
Tags live on the resource itself (the Flow's tags field is List<String>). To add a tag, GET the resource, modify tags, PUT it back. Refetching first matters: PUT replaces the whole flow, so you'd clobber other fields if you hand-built a partial body.
summaries = requests.get(f"{BASE}/v1/flows", headers=H,
params={"q": "warehouse", "limit": 200}).json()
for s in summaries:
flow = requests.get(f"{BASE}/v1/flows/{s['id']}", headers=H).json()
flow["tags"] = sorted(set((flow.get("tags") or []) + ["warehouse-pipeline"]))
requests.put(f"{BASE}/v1/flows/{s['id']}",
headers={**H, "Content-Type": "application/json"},
json=flow).raise_for_status()
print(f" tagged: {flow['name']}")
Build a metrics dashboard
Pull last-24h flow execution counts, error rates, and average duration — the data behind any internal "is the platform healthy?" dashboard.
curl -s "$ETLWORKS_URL/v1/metrics/executions/summary?windowHours=24" \
-H "Authorization: Bearer $ETLWORKS_API_KEY"
# → { "total": 1842, "succeeded": 1809, "failed": 33,
# "avgDurationMs": 12480, "p95DurationMs": 64210 }
Subscribe to flow events (outbound webhooks)
Etlworks fires webhooks — it doesn't receive them. Register a webhook to be POSTed to your URL when the listed events fire on the flows you care about. Body matches the Webhook model.
That's a listener connection, not a webhook. Create an HTTP listener (or queue listener for Kafka/SQS/MQTT) via POST /v1/connections with the appropriate connectionType, then bind a flow to it. The listener URL Etlworks returns is what your upstream system POSTs to.
curl -s -X POST "$ETLWORKS_URL/v1/webhooks" \
-H "Authorization: Bearer $ETLWORKS_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"name": "warehouse-flow-events",
"description": "Notify ops on warehouse-pipeline failures and successes",
"events": ["FLOW_FAILED","FLOW_SUCCESS"],
"url": "https://your-app.example.com/etlworks-events",
"method": "POST",
"secret": "use-this-for-hmac-verification",
"flowIncludes": [12345],
"enabled": true
}'
Etlworks signs each delivery with the secret (HMAC) so your endpoint can verify authenticity. Filter to specific flows via flowIncludes, or send all flow events except a deny-list with flowExcludes.
Pattern: paginate a large list
For paginated endpoints (/v1/audit, execution history, metrics — the per-group reference pages document which), drive offset + limit until you've consumed X-Total-Count items or a page comes back shorter than limit. Note: some list endpoints like GET /v1/flows return the full scope unpaginated — this helper is safe to use anyway, since a non-paginated endpoint will return everything on the first call and stop.
def paginate(path, params=None, page_size=200):
offset, params = 0, dict(params or {})
while True:
params.update(limit=page_size, offset=offset)
page = requests.get(f"{BASE}{path}", headers=H, params=params).json()
if not page:
return
for item in page:
yield item
if len(page) < page_size:
return
offset += page_size
for flow in paginate("/v1/flows"):
print(flow["id"], flow["name"])
Pattern: retry with exponential backoff
Wraps a request in retry-on-5xx and retry-on-429 logic, honoring the Retry-After header.
import time, random
def call_with_retry(method, url, attempts=5, **kwargs):
for i in range(attempts):
r = requests.request(method, url, headers=H, **kwargs)
if r.status_code < 500 and r.status_code != 429:
r.raise_for_status()
return r.json() if r.content else None
delay = int(r.headers.get("Retry-After", 0)) or min(60, (2 ** i) + random.random())
time.sleep(delay)
raise RuntimeError(f"Gave up after {attempts} attempts: {url}")
Pattern: parallel fan-out
Run twenty flows at once, wait for all of them. Useful for nightly batch jobs that have no shared dependency.
from concurrent.futures import ThreadPoolExecutor
flow_ids = [12345, 12346, 12347, 12348, …]
with ThreadPoolExecutor(max_workers=8) as pool:
results = list(pool.map(lambda fid: run_and_wait(fid), flow_ids))
failed = [r for r in results if r["status"] != "success"]
print(f" {len(results)} run, {len(failed)} failed")