AI Agent API

Code examples

Ready-to-run examples in Python, Bash, PowerShell, and cURL. Copy-paste and adapt for your integration.

Setup

All examples assume these environment variables are set:

ETLWORKS_URL — your Etlworks instance URL
ETLWORKS_API_KEY — your API key

Python: direct tool calling

Call tools directly when you know exactly what you need. No LLM involved — fast, deterministic, and free for non-agentic tools.

Python
from etlworks_agent import EtlworksAgent

agent = EtlworksAgent()

# Search the knowledge base
kb_result = agent.execute_tool("search_knowledge_base", {
    "query": "how to create a REST API connection"
})
print("KB Search:", kb_result["result"][:200])

# List available CLI commands
cmds = agent.execute_tool("host_list_commands", {})
print("CLI Commands:", cmds["result"])

# Search for templates
templates = agent.execute_tool("search_templates", {
    "query": "salesforce to database"
})
print("Templates:", templates["result"][:200])

# Execute a CLI command
cli_result = agent.execute_tool("host_cli", {
    "command": "list connections"
})
print("Connections:", cli_result["result"])

Python: one-shot chat

Ask the agent a question and let it decide which tools to use.

Python
from etlworks_agent import EtlworksAgent

agent = EtlworksAgent()

# Simple question
result = agent.chat("How do I create a REST API connection to load JSON data?")
print(result["response"])
print("Tools used:", result.get("tools_used", []))
print("Tokens:", result["usage"])

# Disable tools for a pure Q&A response
result = agent.chat(
    "What is Etlworks Integrator?",
    include_tools=False
)
print(result["response"])

# Custom system prompt
result = agent.chat(
    "List all connection types",
    system_prompt="You are a concise technical assistant. Answer in bullet points only."
)
print(result["response"])

Python: streaming response

Display tokens as they arrive — ideal for building chat UIs or forwarding to another agent.

Python
from etlworks_agent import EtlworksAgent

agent = EtlworksAgent()

# Stream tokens to stdout
full_response = ""
for event in agent.chat_stream("How do I schedule a flow to run every hour?"):
    if event["type"] == "token":
        print(event["content"], end="", flush=True)
        full_response += event["content"]
    elif event["type"] == "tool_start":
        print(f"\n  [Calling {event['tool']}...]", end="", flush=True)
    elif event["type"] == "tool_end":
        print(" done", end="", flush=True)
    elif event["type"] == "usage":
        pass  # token usage stats
    elif event["type"] == "error":
        print(f"\nError: {event['message']}")
    elif event["type"] == "end":
        print()  # final newline

print(f"\nFull response length: {len(full_response)} chars")

Python: multi-turn session

Persistent sessions let the agent remember context across multiple messages.

Python
from etlworks_agent import EtlworksAgent

agent = EtlworksAgent()

# Create a session
session_id = agent.create_session()
print(f"Session: {session_id}")

# Turn 1: Ask about creating a connection
r1 = agent.session_chat(session_id, "How do I create a REST API connection?")
print(f"Agent: {r1['response'][:200]}...")

# Turn 2: Follow up (agent remembers Turn 1)
r2 = agent.session_chat(session_id, "What authentication options does it support?")
print(f"Agent: {r2['response'][:200]}...")

# Turn 3: Reference earlier context
r3 = agent.session_chat(session_id, "Now schedule the connection to run daily at 8am")
print(f"Agent: {r3['response'][:200]}...")

# Review the full conversation
messages = agent.session_messages(session_id)
for msg in messages:
    role = msg["role"].upper()
    content = msg["content"][:80]
    print(f"  [{role}] {content}...")

Bash: direct tool calling

Bash
#!/bin/bash
# Using the etlworks-agent.sh client script

# Search the knowledge base
./etlworks-agent.sh tool search_knowledge_base '{"query": "REST connection"}'

# List CLI commands
./etlworks-agent.sh tool host_list_commands '{}'

# Execute a CLI command
./etlworks-agent.sh tool host_cli '{"command": "list connections"}'

# Search for templates
./etlworks-agent.sh tool search_templates '{"query": "salesforce"}'

Bash: one-shot chat

Bash
#!/bin/bash
# Complete response (waits for full answer)
./etlworks-agent.sh chat "How do I create a REST API connection?"

# Extract just the response text
./etlworks-agent.sh chat "How do I create a connection?" | jq -r '.response'

Bash: streaming response

Bash
#!/bin/bash
# Streaming (tokens print as they arrive)
./etlworks-agent.sh chat-stream "How do I schedule a flow to run every hour?"

Bash: multi-turn session

Bash
#!/bin/bash
# Create a session (returns session ID)
SESSION=$(./etlworks-agent.sh session-create)
echo "Session: $SESSION"

# First question
./etlworks-agent.sh session-chat "$SESSION" "How do I create a REST connection?"

# Follow-up (agent remembers context)
./etlworks-agent.sh session-chat "$SESSION" "What auth options does it support?"

# Review conversation history
./etlworks-agent.sh session-messages "$SESSION"

PowerShell: direct tool calling

PowerShell
# Using the etlworks-agent.ps1 client script

# Search the knowledge base
.\etlworks-agent.ps1 tool search_knowledge_base '{"query": "REST connection"}'

# List CLI commands
.\etlworks-agent.ps1 tool host_list_commands '{}'

# Execute a CLI command
.\etlworks-agent.ps1 tool host_cli '{"command": "list connections"}'

# Search for templates
.\etlworks-agent.ps1 tool search_templates '{"query": "salesforce"}'

PowerShell: one-shot chat

PowerShell
# Complete response (waits for full answer)
.\etlworks-agent.ps1 chat "How do I create a REST API connection?"

# Using Invoke-RestMethod directly (no script needed)
$headers = @{ "Authorization" = "Bearer $env:ETLWORKS_API_KEY" }
$body = '{"message": "How do I create a connection?"}'

$result = Invoke-RestMethod `
    -Uri "$env:ETLWORKS_URL/rest/v1/ai-agent/api/chat" `
    -Method POST `
    -Headers $headers `
    -Body ([System.Text.Encoding]::UTF8.GetBytes($body)) `
    -ContentType "application/json"

Write-Output $result.response

PowerShell: streaming response

PowerShell
# Streaming (tokens print as they arrive)
.\etlworks-agent.ps1 chat-stream "How do I schedule a flow to run every hour?"

PowerShell: multi-turn session

PowerShell
# Create a session (captures session ID into a variable)
$session = .\etlworks-agent.ps1 session-create
Write-Output "Session: $session"

# First question
.\etlworks-agent.ps1 session-chat $session "How do I create a REST connection?"

# Follow-up (agent remembers context)
.\etlworks-agent.ps1 session-chat $session "What auth options does it support?"

# Streaming in a session
.\etlworks-agent.ps1 session-chat-stream $session "Now schedule it to run daily"

# Review conversation history
.\etlworks-agent.ps1 session-messages $session

cURL: list & call tools

Shell
# List available tools
curl -s "$ETLWORKS_URL/rest/v1/ai-agent/api/tools" \
  -H "Authorization: Bearer $ETLWORKS_API_KEY" | jq '.tools[].name'

# Execute a tool
curl -s "$ETLWORKS_URL/rest/v1/ai-agent/api/tools/search_knowledge_base/execute" \
  -H "Authorization: Bearer $ETLWORKS_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"query": "how to create a REST connection"}' | jq '.result'

# Execute CLI command
curl -s "$ETLWORKS_URL/rest/v1/ai-agent/api/tools/host_cli/execute" \
  -H "Authorization: Bearer $ETLWORKS_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"command": "list connections"}' | jq '.result'

cURL: chat

Shell
# One-shot chat
curl -s "$ETLWORKS_URL/rest/v1/ai-agent/api/chat" \
  -H "Authorization: Bearer $ETLWORKS_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"message": "How do I create a REST API connection?"}' | jq '.response'

# Chat with tools disabled
curl -s "$ETLWORKS_URL/rest/v1/ai-agent/api/chat" \
  -H "Authorization: Bearer $ETLWORKS_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"message": "What is Etlworks?", "include_tools": false}' | jq '.response'

# Multi-turn session
SESSION_ID=$(curl -s "$ETLWORKS_URL/rest/v1/ai-agent/api/sessions" \
  -H "Authorization: Bearer $ETLWORKS_API_KEY" -X POST | jq -r '.session_id')

curl -s "$ETLWORKS_URL/rest/v1/ai-agent/api/sessions/$SESSION_ID/chat" \
  -H "Authorization: Bearer $ETLWORKS_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"message": "How do I create a REST connection?"}' | jq '.response'

curl -s "$ETLWORKS_URL/rest/v1/ai-agent/api/sessions/$SESSION_ID/chat" \
  -H "Authorization: Bearer $ETLWORKS_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"message": "Now schedule it to run hourly"}' | jq '.response'

cURL: streaming

Shell
# Stream response (use -N to disable buffering)
curl -sN "$ETLWORKS_URL/rest/v1/ai-agent/api/chat/stream" \
  -H "Authorization: Bearer $ETLWORKS_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"message": "What are the main features of Etlworks?"}'

Integration pattern: subagent

Use the Etlworks agent as a specialist subagent within a larger orchestration system. Your main agent delegates data-integration tasks to Etlworks while handling other domains itself.

Python
from etlworks_agent import EtlworksAgent

# Initialize the Etlworks subagent
etlworks = EtlworksAgent()

def orchestrator_agent(task):
    """
    Main orchestrator that routes tasks to specialized subagents.
    """
    if "data integration" in task.lower() or "etl" in task.lower():
        # Delegate to Etlworks agent
        result = etlworks.chat(task)
        return result["response"]

    elif "search docs" in task.lower():
        # Use Etlworks KB search directly (fast, no LLM)
        kb = etlworks.execute_tool("search_knowledge_base", {"query": task})
        return kb["result"]

    elif "run command" in task.lower():
        # Extract command and execute via CLI tool
        command = task.replace("run command:", "").strip()
        result = etlworks.execute_tool("host_cli", {"command": command})
        return result["result"]

    else:
        # Handle with a different subagent or LLM
        return "Task not handled: " + task

# Example usage
print(orchestrator_agent("data integration: set up daily REST to PostgreSQL sync"))
print(orchestrator_agent("search docs: how to configure authentication"))
print(orchestrator_agent("run command: list flows"))

Integration pattern: orchestrator routing

A more sophisticated orchestrator that combines Etlworks with other services and maintains conversational state.

Python
from etlworks_agent import EtlworksAgent

class DataOpsOrchestrator:
    """
    Orchestrator that combines Etlworks with other services
    for end-to-end data operations.
    """

    def __init__(self):
        self.etlworks = EtlworksAgent()
        self.session = None  # lazy session creation

    def ensure_session(self):
        if self.session is None:
            self.session = self.etlworks.create_session()
        return self.session

    def search_documentation(self, query):
        """Fast KB search — no LLM, no billing."""
        result = self.etlworks.execute_tool("search_knowledge_base", {"query": query})
        return result["result"] if result["success"] else None

    def find_template(self, description):
        """Search for a pre-built integration template."""
        result = self.etlworks.execute_tool("search_templates", {"query": description})
        return result["result"] if result["success"] else None

    def ask_agent(self, question):
        """
        Ask the agent a question with session context.
        The agent remembers prior questions in this session.
        """
        session = self.ensure_session()
        result = self.etlworks.session_chat(session, question)
        return result["response"]

    def run_cli_command(self, command):
        """Execute a CLI command on the Etlworks host."""
        result = self.etlworks.execute_tool("host_cli", {"command": command})
        return result["result"] if result["success"] else result.get("error")

    def setup_integration(self, source, target, schedule):
        """
        High-level: ask the agent to set up a complete integration.
        Uses a session so the agent builds on each step.
        """
        session = self.ensure_session()

        # Step 1: Find a template
        r1 = self.etlworks.session_chat(session,
            f"Find a template for integrating {source} with {target}")

        # Step 2: Import it
        r2 = self.etlworks.session_chat(session,
            "Import the best matching template")

        # Step 3: Schedule it
        r3 = self.etlworks.session_chat(session,
            f"Schedule it to run {schedule}")

        return r3["response"]

# Usage
ops = DataOpsOrchestrator()

# Quick documentation lookup
docs = ops.search_documentation("connection pooling settings")
print(docs)

# Complex multi-step setup
result = ops.setup_integration("Salesforce", "PostgreSQL", "every 6 hours")
print(result)

Integration pattern: pipeline automation

Use the API in CI/CD pipelines or automation scripts to manage Etlworks programmatically.

Python
from etlworks_agent import EtlworksAgent
import sys

agent = EtlworksAgent()

def validate_connections():
    """Check that all connections are healthy."""
    result = agent.execute_tool("host_cli", {"command": "test connections"})
    if "failed" in result["result"].lower():
        print("FAIL: Some connections are unhealthy")
        print(result["result"])
        return False
    print("PASS: All connections healthy")
    return True

def list_failed_flows():
    """Find recently failed flows."""
    result = agent.execute_tool("host_cli", {"command": "list flows -status failed"})
    return result["result"]

def get_agent_diagnosis(issue):
    """Ask the agent to diagnose an issue."""
    result = agent.chat(f"Diagnose this issue and suggest a fix: {issue}")
    return result["response"]

# CI/CD health check
if not validate_connections():
    failures = list_failed_flows()
    diagnosis = get_agent_diagnosis(failures)
    print("\nAgent diagnosis:")
    print(diagnosis)
    sys.exit(1)

Integration pattern: custom chatbot

Build a custom chat interface that uses the Etlworks agent as its backend. The streaming API provides real-time token delivery for a responsive UI.

Python
from etlworks_agent import EtlworksAgent

class EtlworksChatbot:
    """
    Simple chatbot wrapper. Use in a Flask/FastAPI app
    to build a custom chat UI.
    """

    def __init__(self):
        self.agent = EtlworksAgent()
        self.sessions = {}  # user_id -> session_id

    def get_session(self, user_id):
        if user_id not in self.sessions:
            self.sessions[user_id] = self.agent.create_session()
        return self.sessions[user_id]

    def reply(self, user_id, message):
        """Get a complete reply for a user message."""
        session = self.get_session(user_id)
        result = self.agent.session_chat(session, message)
        return result["response"]

    def reply_stream(self, user_id, message):
        """
        Stream reply tokens. Yields strings.
        Use with Flask streaming response or FastAPI StreamingResponse.
        """
        session = self.get_session(user_id)
        for event in self.agent.session_chat_stream(session, message):
            if event["type"] == "token":
                yield event["content"]

    def get_history(self, user_id):
        """Get conversation history for a user."""
        if user_id not in self.sessions:
            return []
        return self.agent.session_messages(self.sessions[user_id])

# Example: Flask integration
# from flask import Flask, Response, request, jsonify
# app = Flask(__name__)
# chatbot = EtlworksChatbot()
#
# @app.route("/chat", methods=["POST"])
# def chat():
#     user_id = request.json["user_id"]
#     message = request.json["message"]
#     return jsonify({"reply": chatbot.reply(user_id, message)})
#
# @app.route("/chat/stream", methods=["POST"])
# def chat_stream():
#     user_id = request.json["user_id"]
#     message = request.json["message"]
#     return Response(chatbot.reply_stream(user_id, message), content_type="text/plain")