REST API

Code examples

Eight end-to-end recipes plus three patterns. Each recipe shows the same task in cURL, Python, Bash, and PowerShell — pick the tab that matches your stack.

Setup

Every example assumes you've set ETLWORKS_URL (e.g. https://app.etlworks.com/rest) and ETLWORKS_API_KEY in your environment. See the Quickstart for getting these.

Run a flow and wait for completion

Trigger a flow, then poll the executions endpoint until it reaches a terminal status. The most common pattern in production scripts.

Real shapes from FlowExecutionRequest / FlowAuditRecord

Run body is a flat map of string→string parameters — not wrapped in {"params": …}. Send {} if your flow takes no parameters. The response is a FlowExecutionResponse with flowId, auditId, and an int status code. To track progress, use auditId (not the int code) against GET /v1/executions/{flowId}?auditId={auditId}, which returns a FlowAuditRecord with a string status: queued, running, success, warning, error, or canceled.

Shell
FLOW_ID=12345

# Run the flow. Body is a flat map of parameters (or {} for none).
AUDIT_ID=$(curl -s -X POST "$ETLWORKS_URL/v1/flows/$FLOW_ID/run" \
  -H "Authorization: Bearer $ETLWORKS_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"runDate":"2026-05-09"}' | jq -r '.auditId')

# Poll the audit record until status is terminal.
while :; do
  STATUS=$(curl -s "$ETLWORKS_URL/v1/executions/$FLOW_ID?auditId=$AUDIT_ID" \
    -H "Authorization: Bearer $ETLWORKS_API_KEY" | jq -r '.status')
  echo "  $STATUS"
  case "$STATUS" in success|warning|error|canceled) break ;; esac
  sleep 5
done
Python
import os, time, requests

BASE = os.environ["ETLWORKS_URL"]
H = {"Authorization": f"Bearer {os.environ['ETLWORKS_API_KEY']}"}
TERMINAL = {"success", "warning", "error", "canceled"}

def run_and_wait(flow_id, parameters=None, timeout=900):
    # Body is a flat map of {string: string} parameters (or {} for none).
    r = requests.post(f"{BASE}/v1/flows/{flow_id}/run",
                      headers={**H, "Content-Type": "application/json"},
                      json=parameters or {})
    r.raise_for_status()
    audit_id = r.json()["auditId"]   # NOT executionId

    deadline = time.time() + timeout
    while time.time() < deadline:
        rec = requests.get(f"{BASE}/v1/executions/{flow_id}",
                           headers=H, params={"auditId": audit_id}).json()
        if rec["status"] in TERMINAL:
            return rec
        time.sleep(5)
    raise TimeoutError(f"Flow {flow_id} (audit {audit_id}) still running after {timeout}s")

print(run_and_wait(12345, parameters={"runDate": "2026-05-09"}))
Bash
run_and_wait () {
  local flow_id="$1" body="${2:-{}}"
  local audit_id status
  audit_id=$(curl -s -X POST "$ETLWORKS_URL/v1/flows/$flow_id/run" \
    -H "Authorization: Bearer $ETLWORKS_API_KEY" \
    -H "Content-Type: application/json" -d "$body" | jq -r '.auditId')
  while :; do
    status=$(curl -s "$ETLWORKS_URL/v1/executions/$flow_id?auditId=$audit_id" \
      -H "Authorization: Bearer $ETLWORKS_API_KEY" | jq -r '.status')
    case "$status" in success|warning|error|canceled) echo "$status"; return ;; esac
    sleep 5
  done
}

run_and_wait 12345 '{"runDate":"2026-05-09"}'
PowerShell
function Invoke-Flow ($FlowId, $Parameters = @{}) {
  $headers = @{ "Authorization" = "Bearer $env:ETLWORKS_API_KEY" }
  # Body is the parameters map directly — not wrapped in another object.
  $body    = $Parameters | ConvertTo-Json -Compress
  $started = Invoke-RestMethod -Method Post -Uri "$env:ETLWORKS_URL/v1/flows/$FlowId/run" `
              -Headers $headers -ContentType "application/json" -Body $body
  $auditId = $started.auditId
  $terminal = "success","warning","error","canceled"
  do {
    Start-Sleep 5
    $rec = Invoke-RestMethod -Uri "$env:ETLWORKS_URL/v1/executions/$FlowId`?auditId=$auditId" -Headers $headers
    Write-Host "  $($rec.status)"
  } while ($rec.status -notin $terminal)
  $rec
}
Invoke-Flow -FlowId 12345 -Parameters @{ runDate = "2026-05-09" }

Create a connection

Add a Postgres connection programmatically — useful when provisioning environments from CI/CD or Terraform-style scripts.

Connection types are metadata-driven

The connectionType discriminator and the keys inside properties are not arbitrary strings — they're defined per-connector in the platform's connector library (under src/main/resources/di/…/connections/<type>.json). For PostgreSQL, the type is connection.db.postgres and the property keys are field.connection.template.host, field.connection.template.port, field.connection.template.database, etc. The same pattern holds for every connector.

Easiest way to discover the right shape for any connector: create the connection once in the Etlworks UI, then GET /v1/connections/{id} to see the exact connectionType and properties keys it uses. Copy that shape into your script and parameterize the values.

Shell
curl -s -X POST "$ETLWORKS_URL/v1/connections" \
  -H "Authorization: Bearer $ETLWORKS_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "warehouse-prod",
    "description": "Production warehouse",
    "connectionType": "connection.db.postgres",
    "properties": {
      "field.connection.template.host": "warehouse.internal",
      "field.connection.template.port": "5432",
      "field.connection.template.database": "analytics",
      "field.connection.user": "etlworks",
      "field.connection.password": "{{secret:warehouse_pw}}"
    },
    "tags": ["production", "warehouse"]
  }'

The {{secret:…}} placeholder pulls from your tenant's secret store at runtime — the literal value is never sent over the API. Test the connection without saving via POST /v1/connections/test with the same body.

Schedule a flow

Schedule an existing flow. The body matches the FlowSchedule model: flowId, name, the schedule string (the platform's scheduling syntax — the UI's schedule picker is the source of truth for valid expressions), and enabled. Note: pause/resume use /disable and /enable, not /pause//resume.

Shell
curl -s -X POST "$ETLWORKS_URL/v1/schedules" \
  -H "Authorization: Bearer $ETLWORKS_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "Daily warehouse refresh",
    "description": "Refresh the warehouse from staging every day at 4am",
    "flowId": 12345,
    "schedule": "0 0 4 * * ?",
    "enabled": true,
    "tags": ["production"]
  }'

# Pause:
curl -s -X PUT "$ETLWORKS_URL/v1/schedules/$SCHEDULE_ID/disable" \
  -H "Authorization: Bearer $ETLWORKS_API_KEY"

# Resume:
curl -s -X PUT "$ETLWORKS_URL/v1/schedules/$SCHEDULE_ID/enable" \
  -H "Authorization: Bearer $ETLWORKS_API_KEY"

Query the audit log

Pull every change made to flows in the last 7 days — useful for compliance reports or debugging "what changed?" questions.

Python
from datetime import datetime, timedelta, timezone

since = (datetime.now(timezone.utc) - timedelta(days=7)).isoformat()

events = requests.get(
    f"{BASE}/v1/audit",
    headers=H,
    params={"resourceType": "flow", "since": since, "limit": 200},
).json()

for e in events:
    print(f"{e['timestamp']}  {e['user']}  {e['action']:<8}  flow {e['resourceId']}")

Export and import (env promotion)

Export a flow plus its connections from staging, import into production. The /v1/io endpoints bundle resources into a self-contained payload you can store, diff, or apply to another tenant.

Shell
# Export flow + dependencies
curl -s -X POST "$ETLWORKS_URL/v1/io/export" \
  -H "Authorization: Bearer $ETLWORKS_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"flowIds":[12345], "includeConnections":true}' \
  > flow-bundle.json

# Import into another tenant (preview mode first)
curl -s -X POST "$ETLWORKS_PROD_URL/v1/io/import/preview" \
  -H "Authorization: Bearer $PROD_API_KEY" \
  -H "Content-Type: application/json" \
  --data-binary @flow-bundle.json

# Apply if the preview looks right
curl -s -X POST "$ETLWORKS_PROD_URL/v1/io/import" \
  -H "Authorization: Bearer $PROD_API_KEY" \
  -H "Content-Type: application/json" \
  --data-binary @flow-bundle.json

Bulk-tag resources

Tags live on the resource itself (the Flow's tags field is List<String>). To add a tag, GET the resource, modify tags, PUT it back. Refetching first matters: PUT replaces the whole flow, so you'd clobber other fields if you hand-built a partial body.

Python
summaries = requests.get(f"{BASE}/v1/flows", headers=H,
                          params={"q": "warehouse", "limit": 200}).json()

for s in summaries:
    flow = requests.get(f"{BASE}/v1/flows/{s['id']}", headers=H).json()
    flow["tags"] = sorted(set((flow.get("tags") or []) + ["warehouse-pipeline"]))
    requests.put(f"{BASE}/v1/flows/{s['id']}",
                 headers={**H, "Content-Type": "application/json"},
                 json=flow).raise_for_status()
    print(f"  tagged: {flow['name']}")

Build a metrics dashboard

Pull last-24h flow execution counts, error rates, and average duration — the data behind any internal "is the platform healthy?" dashboard.

Shell
curl -s "$ETLWORKS_URL/v1/metrics/executions/summary?windowHours=24" \
  -H "Authorization: Bearer $ETLWORKS_API_KEY"
# → { "total": 1842, "succeeded": 1809, "failed": 33,
#       "avgDurationMs": 12480, "p95DurationMs": 64210 }

Subscribe to flow events (outbound webhooks)

Etlworks fires webhooks — it doesn't receive them. Register a webhook to be POSTed to your URL when the listed events fire on the flows you care about. Body matches the Webhook model.

Need inbound triggers (something else → run a flow)?

That's a listener connection, not a webhook. Create an HTTP listener (or queue listener for Kafka/SQS/MQTT) via POST /v1/connections with the appropriate connectionType, then bind a flow to it. The listener URL Etlworks returns is what your upstream system POSTs to.

Shell
curl -s -X POST "$ETLWORKS_URL/v1/webhooks" \
  -H "Authorization: Bearer $ETLWORKS_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "warehouse-flow-events",
    "description": "Notify ops on warehouse-pipeline failures and successes",
    "events": ["FLOW_FAILED","FLOW_SUCCESS"],
    "url": "https://your-app.example.com/etlworks-events",
    "method": "POST",
    "secret": "use-this-for-hmac-verification",
    "flowIncludes": [12345],
    "enabled": true
  }'

Etlworks signs each delivery with the secret (HMAC) so your endpoint can verify authenticity. Filter to specific flows via flowIncludes, or send all flow events except a deny-list with flowExcludes.


Pattern: paginate a large list

For paginated endpoints (/v1/audit, execution history, metrics — the per-group reference pages document which), drive offset + limit until you've consumed X-Total-Count items or a page comes back shorter than limit. Note: some list endpoints like GET /v1/flows return the full scope unpaginated — this helper is safe to use anyway, since a non-paginated endpoint will return everything on the first call and stop.

Python
def paginate(path, params=None, page_size=200):
    offset, params = 0, dict(params or {})
    while True:
        params.update(limit=page_size, offset=offset)
        page = requests.get(f"{BASE}{path}", headers=H, params=params).json()
        if not page:
            return
        for item in page:
            yield item
        if len(page) < page_size:
            return
        offset += page_size

for flow in paginate("/v1/flows"):
    print(flow["id"], flow["name"])

Pattern: retry with exponential backoff

Wraps a request in retry-on-5xx and retry-on-429 logic, honoring the Retry-After header.

Python
import time, random

def call_with_retry(method, url, attempts=5, **kwargs):
    for i in range(attempts):
        r = requests.request(method, url, headers=H, **kwargs)
        if r.status_code < 500 and r.status_code != 429:
            r.raise_for_status()
            return r.json() if r.content else None
        delay = int(r.headers.get("Retry-After", 0)) or min(60, (2 ** i) + random.random())
        time.sleep(delay)
    raise RuntimeError(f"Gave up after {attempts} attempts: {url}")

Pattern: parallel fan-out

Run twenty flows at once, wait for all of them. Useful for nightly batch jobs that have no shared dependency.

Python
from concurrent.futures import ThreadPoolExecutor

flow_ids = [12345, 12346, 12347, 12348, …]

with ThreadPoolExecutor(max_workers=8) as pool:
    results = list(pool.map(lambda fid: run_and_wait(fid), flow_ids))

failed = [r for r in results if r["status"] != "success"]
print(f"  {len(results)} run, {len(failed)} failed")