Code examples
Ready-to-run examples in Python, Bash, PowerShell, and cURL. Copy-paste and adapt for your integration.
All examples assume these environment variables are set:
ETLWORKS_URL — your Etlworks instance URL
ETLWORKS_API_KEY — your API key
Python: direct tool calling
Call tools directly when you know exactly what you need. No LLM involved — fast, deterministic, and free for non-agentic tools.
from etlworks_agent import EtlworksAgent
agent = EtlworksAgent()
# Search the knowledge base
kb_result = agent.execute_tool("search_knowledge_base", {
"query": "how to create a REST API connection"
})
print("KB Search:", kb_result["result"][:200])
# List available CLI commands
cmds = agent.execute_tool("host_list_commands", {})
print("CLI Commands:", cmds["result"])
# Search for templates
templates = agent.execute_tool("search_templates", {
"query": "salesforce to database"
})
print("Templates:", templates["result"][:200])
# Execute a CLI command
cli_result = agent.execute_tool("host_cli", {
"command": "list connections"
})
print("Connections:", cli_result["result"])
Python: one-shot chat
Ask the agent a question and let it decide which tools to use.
from etlworks_agent import EtlworksAgent
agent = EtlworksAgent()
# Simple question
result = agent.chat("How do I create a REST API connection to load JSON data?")
print(result["response"])
print("Tools used:", result.get("tools_used", []))
print("Tokens:", result["usage"])
# Disable tools for a pure Q&A response
result = agent.chat(
"What is Etlworks Integrator?",
include_tools=False
)
print(result["response"])
# Custom system prompt
result = agent.chat(
"List all connection types",
system_prompt="You are a concise technical assistant. Answer in bullet points only."
)
print(result["response"])
Python: streaming response
Display tokens as they arrive — ideal for building chat UIs or forwarding to another agent.
from etlworks_agent import EtlworksAgent
agent = EtlworksAgent()
# Stream tokens to stdout
full_response = ""
for event in agent.chat_stream("How do I schedule a flow to run every hour?"):
if event["type"] == "token":
print(event["content"], end="", flush=True)
full_response += event["content"]
elif event["type"] == "tool_start":
print(f"\n [Calling {event['tool']}...]", end="", flush=True)
elif event["type"] == "tool_end":
print(" done", end="", flush=True)
elif event["type"] == "usage":
pass # token usage stats
elif event["type"] == "error":
print(f"\nError: {event['message']}")
elif event["type"] == "end":
print() # final newline
print(f"\nFull response length: {len(full_response)} chars")
Python: multi-turn session
Persistent sessions let the agent remember context across multiple messages.
from etlworks_agent import EtlworksAgent
agent = EtlworksAgent()
# Create a session
session_id = agent.create_session()
print(f"Session: {session_id}")
# Turn 1: Ask about creating a connection
r1 = agent.session_chat(session_id, "How do I create a REST API connection?")
print(f"Agent: {r1['response'][:200]}...")
# Turn 2: Follow up (agent remembers Turn 1)
r2 = agent.session_chat(session_id, "What authentication options does it support?")
print(f"Agent: {r2['response'][:200]}...")
# Turn 3: Reference earlier context
r3 = agent.session_chat(session_id, "Now schedule the connection to run daily at 8am")
print(f"Agent: {r3['response'][:200]}...")
# Review the full conversation
messages = agent.session_messages(session_id)
for msg in messages:
role = msg["role"].upper()
content = msg["content"][:80]
print(f" [{role}] {content}...")
Bash: direct tool calling
#!/bin/bash
# Using the etlworks-agent.sh client script
# Search the knowledge base
./etlworks-agent.sh tool search_knowledge_base '{"query": "REST connection"}'
# List CLI commands
./etlworks-agent.sh tool host_list_commands '{}'
# Execute a CLI command
./etlworks-agent.sh tool host_cli '{"command": "list connections"}'
# Search for templates
./etlworks-agent.sh tool search_templates '{"query": "salesforce"}'
Bash: one-shot chat
#!/bin/bash
# Complete response (waits for full answer)
./etlworks-agent.sh chat "How do I create a REST API connection?"
# Extract just the response text
./etlworks-agent.sh chat "How do I create a connection?" | jq -r '.response'
Bash: streaming response
#!/bin/bash
# Streaming (tokens print as they arrive)
./etlworks-agent.sh chat-stream "How do I schedule a flow to run every hour?"
Bash: multi-turn session
#!/bin/bash
# Create a session (returns session ID)
SESSION=$(./etlworks-agent.sh session-create)
echo "Session: $SESSION"
# First question
./etlworks-agent.sh session-chat "$SESSION" "How do I create a REST connection?"
# Follow-up (agent remembers context)
./etlworks-agent.sh session-chat "$SESSION" "What auth options does it support?"
# Review conversation history
./etlworks-agent.sh session-messages "$SESSION"
PowerShell: direct tool calling
# Using the etlworks-agent.ps1 client script
# Search the knowledge base
.\etlworks-agent.ps1 tool search_knowledge_base '{"query": "REST connection"}'
# List CLI commands
.\etlworks-agent.ps1 tool host_list_commands '{}'
# Execute a CLI command
.\etlworks-agent.ps1 tool host_cli '{"command": "list connections"}'
# Search for templates
.\etlworks-agent.ps1 tool search_templates '{"query": "salesforce"}'
PowerShell: one-shot chat
# Complete response (waits for full answer)
.\etlworks-agent.ps1 chat "How do I create a REST API connection?"
# Using Invoke-RestMethod directly (no script needed)
$headers = @{ "Authorization" = "Bearer $env:ETLWORKS_API_KEY" }
$body = '{"message": "How do I create a connection?"}'
$result = Invoke-RestMethod `
-Uri "$env:ETLWORKS_URL/rest/v1/ai-agent/api/chat" `
-Method POST `
-Headers $headers `
-Body ([System.Text.Encoding]::UTF8.GetBytes($body)) `
-ContentType "application/json"
Write-Output $result.response
PowerShell: streaming response
# Streaming (tokens print as they arrive)
.\etlworks-agent.ps1 chat-stream "How do I schedule a flow to run every hour?"
PowerShell: multi-turn session
# Create a session (captures session ID into a variable)
$session = .\etlworks-agent.ps1 session-create
Write-Output "Session: $session"
# First question
.\etlworks-agent.ps1 session-chat $session "How do I create a REST connection?"
# Follow-up (agent remembers context)
.\etlworks-agent.ps1 session-chat $session "What auth options does it support?"
# Streaming in a session
.\etlworks-agent.ps1 session-chat-stream $session "Now schedule it to run daily"
# Review conversation history
.\etlworks-agent.ps1 session-messages $session
cURL: list & call tools
# List available tools
curl -s "$ETLWORKS_URL/rest/v1/ai-agent/api/tools" \
-H "Authorization: Bearer $ETLWORKS_API_KEY" | jq '.tools[].name'
# Execute a tool
curl -s "$ETLWORKS_URL/rest/v1/ai-agent/api/tools/search_knowledge_base/execute" \
-H "Authorization: Bearer $ETLWORKS_API_KEY" \
-H "Content-Type: application/json" \
-d '{"query": "how to create a REST connection"}' | jq '.result'
# Execute CLI command
curl -s "$ETLWORKS_URL/rest/v1/ai-agent/api/tools/host_cli/execute" \
-H "Authorization: Bearer $ETLWORKS_API_KEY" \
-H "Content-Type: application/json" \
-d '{"command": "list connections"}' | jq '.result'
cURL: chat
# One-shot chat
curl -s "$ETLWORKS_URL/rest/v1/ai-agent/api/chat" \
-H "Authorization: Bearer $ETLWORKS_API_KEY" \
-H "Content-Type: application/json" \
-d '{"message": "How do I create a REST API connection?"}' | jq '.response'
# Chat with tools disabled
curl -s "$ETLWORKS_URL/rest/v1/ai-agent/api/chat" \
-H "Authorization: Bearer $ETLWORKS_API_KEY" \
-H "Content-Type: application/json" \
-d '{"message": "What is Etlworks?", "include_tools": false}' | jq '.response'
# Multi-turn session
SESSION_ID=$(curl -s "$ETLWORKS_URL/rest/v1/ai-agent/api/sessions" \
-H "Authorization: Bearer $ETLWORKS_API_KEY" -X POST | jq -r '.session_id')
curl -s "$ETLWORKS_URL/rest/v1/ai-agent/api/sessions/$SESSION_ID/chat" \
-H "Authorization: Bearer $ETLWORKS_API_KEY" \
-H "Content-Type: application/json" \
-d '{"message": "How do I create a REST connection?"}' | jq '.response'
curl -s "$ETLWORKS_URL/rest/v1/ai-agent/api/sessions/$SESSION_ID/chat" \
-H "Authorization: Bearer $ETLWORKS_API_KEY" \
-H "Content-Type: application/json" \
-d '{"message": "Now schedule it to run hourly"}' | jq '.response'
cURL: streaming
# Stream response (use -N to disable buffering)
curl -sN "$ETLWORKS_URL/rest/v1/ai-agent/api/chat/stream" \
-H "Authorization: Bearer $ETLWORKS_API_KEY" \
-H "Content-Type: application/json" \
-d '{"message": "What are the main features of Etlworks?"}'
Integration pattern: subagent
Use the Etlworks agent as a specialist subagent within a larger orchestration system. Your main agent delegates data-integration tasks to Etlworks while handling other domains itself.
from etlworks_agent import EtlworksAgent
# Initialize the Etlworks subagent
etlworks = EtlworksAgent()
def orchestrator_agent(task):
"""
Main orchestrator that routes tasks to specialized subagents.
"""
if "data integration" in task.lower() or "etl" in task.lower():
# Delegate to Etlworks agent
result = etlworks.chat(task)
return result["response"]
elif "search docs" in task.lower():
# Use Etlworks KB search directly (fast, no LLM)
kb = etlworks.execute_tool("search_knowledge_base", {"query": task})
return kb["result"]
elif "run command" in task.lower():
# Extract command and execute via CLI tool
command = task.replace("run command:", "").strip()
result = etlworks.execute_tool("host_cli", {"command": command})
return result["result"]
else:
# Handle with a different subagent or LLM
return "Task not handled: " + task
# Example usage
print(orchestrator_agent("data integration: set up daily REST to PostgreSQL sync"))
print(orchestrator_agent("search docs: how to configure authentication"))
print(orchestrator_agent("run command: list flows"))
Integration pattern: orchestrator routing
A more sophisticated orchestrator that combines Etlworks with other services and maintains conversational state.
from etlworks_agent import EtlworksAgent
class DataOpsOrchestrator:
"""
Orchestrator that combines Etlworks with other services
for end-to-end data operations.
"""
def __init__(self):
self.etlworks = EtlworksAgent()
self.session = None # lazy session creation
def ensure_session(self):
if self.session is None:
self.session = self.etlworks.create_session()
return self.session
def search_documentation(self, query):
"""Fast KB search — no LLM, no billing."""
result = self.etlworks.execute_tool("search_knowledge_base", {"query": query})
return result["result"] if result["success"] else None
def find_template(self, description):
"""Search for a pre-built integration template."""
result = self.etlworks.execute_tool("search_templates", {"query": description})
return result["result"] if result["success"] else None
def ask_agent(self, question):
"""
Ask the agent a question with session context.
The agent remembers prior questions in this session.
"""
session = self.ensure_session()
result = self.etlworks.session_chat(session, question)
return result["response"]
def run_cli_command(self, command):
"""Execute a CLI command on the Etlworks host."""
result = self.etlworks.execute_tool("host_cli", {"command": command})
return result["result"] if result["success"] else result.get("error")
def setup_integration(self, source, target, schedule):
"""
High-level: ask the agent to set up a complete integration.
Uses a session so the agent builds on each step.
"""
session = self.ensure_session()
# Step 1: Find a template
r1 = self.etlworks.session_chat(session,
f"Find a template for integrating {source} with {target}")
# Step 2: Import it
r2 = self.etlworks.session_chat(session,
"Import the best matching template")
# Step 3: Schedule it
r3 = self.etlworks.session_chat(session,
f"Schedule it to run {schedule}")
return r3["response"]
# Usage
ops = DataOpsOrchestrator()
# Quick documentation lookup
docs = ops.search_documentation("connection pooling settings")
print(docs)
# Complex multi-step setup
result = ops.setup_integration("Salesforce", "PostgreSQL", "every 6 hours")
print(result)
Integration pattern: pipeline automation
Use the API in CI/CD pipelines or automation scripts to manage Etlworks programmatically.
from etlworks_agent import EtlworksAgent
import sys
agent = EtlworksAgent()
def validate_connections():
"""Check that all connections are healthy."""
result = agent.execute_tool("host_cli", {"command": "test connections"})
if "failed" in result["result"].lower():
print("FAIL: Some connections are unhealthy")
print(result["result"])
return False
print("PASS: All connections healthy")
return True
def list_failed_flows():
"""Find recently failed flows."""
result = agent.execute_tool("host_cli", {"command": "list flows -status failed"})
return result["result"]
def get_agent_diagnosis(issue):
"""Ask the agent to diagnose an issue."""
result = agent.chat(f"Diagnose this issue and suggest a fix: {issue}")
return result["response"]
# CI/CD health check
if not validate_connections():
failures = list_failed_flows()
diagnosis = get_agent_diagnosis(failures)
print("\nAgent diagnosis:")
print(diagnosis)
sys.exit(1)
Integration pattern: custom chatbot
Build a custom chat interface that uses the Etlworks agent as its backend. The streaming API provides real-time token delivery for a responsive UI.
from etlworks_agent import EtlworksAgent
class EtlworksChatbot:
"""
Simple chatbot wrapper. Use in a Flask/FastAPI app
to build a custom chat UI.
"""
def __init__(self):
self.agent = EtlworksAgent()
self.sessions = {} # user_id -> session_id
def get_session(self, user_id):
if user_id not in self.sessions:
self.sessions[user_id] = self.agent.create_session()
return self.sessions[user_id]
def reply(self, user_id, message):
"""Get a complete reply for a user message."""
session = self.get_session(user_id)
result = self.agent.session_chat(session, message)
return result["response"]
def reply_stream(self, user_id, message):
"""
Stream reply tokens. Yields strings.
Use with Flask streaming response or FastAPI StreamingResponse.
"""
session = self.get_session(user_id)
for event in self.agent.session_chat_stream(session, message):
if event["type"] == "token":
yield event["content"]
def get_history(self, user_id):
"""Get conversation history for a user."""
if user_id not in self.sessions:
return []
return self.agent.session_messages(self.sessions[user_id])
# Example: Flask integration
# from flask import Flask, Response, request, jsonify
# app = Flask(__name__)
# chatbot = EtlworksChatbot()
#
# @app.route("/chat", methods=["POST"])
# def chat():
# user_id = request.json["user_id"]
# message = request.json["message"]
# return jsonify({"reply": chatbot.reply(user_id, message)})
#
# @app.route("/chat/stream", methods=["POST"])
# def chat_stream():
# user_id = request.json["user_id"]
# message = request.json["message"]
# return Response(chatbot.reply_stream(user_id, message), content_type="text/plain")