ROE_API_KEY and ROE_ORGANIZATION_ID are set.
The first example provisions a policy and an agent from scratch; the later two
reuse an existing agent id so they stay focused on the run-and-fetch calls.
Create a policy and run a policy-aware agent
from roe import RoeClient
client = RoeClient()
policy = client.policies.create(
name="AML Investigation Policy",
content={
"guidelines": {
"categories": [
{
"title": "Transaction Patterns",
"rules": [
{
"title": "Structuring below reporting thresholds",
"flag": "RED_FLAG",
"description": "Deposits just under CTR thresholds in a short window.",
}
],
}
]
},
"dispositions": {
"classifications": [
{"name": "SAR", "description": "File a Suspicious Activity Report."},
{"name": "DISMISS", "description": "Close as non-suspicious."},
]
},
},
)
agent = client.agents.create(
name="AML Investigation Agent",
engine_class_id="AMLInvestigationEngine",
input_definitions=[
{
"key": "alert_data",
"data_type": "text/plain",
"description": "Alert to investigate.",
}
],
engine_config={
"policy_version_id": str(policy.current_version_id),
"alert_data": "${alert_data}",
},
)
job = client.agents.run(
agent_id=str(agent.id),
timeout_seconds=300,
alert_data="Customer made 9 cash deposits of $9,500 over three days.",
)
result = job.wait(interval=5.0, timeout=300)
for output in result.outputs:
print(f"{output.key}: {output.value}")
Run an agent and download a saved reference
import json
import os
from pathlib import Path
from roe import RoeClient
client = RoeClient()
agent_id = os.environ["ROE_URL_AGENT_ID"]
job = client.agents.run(
agent_id=agent_id,
timeout_seconds=300,
url="https://www.roe-ai.com/",
metadata={"use_case": "website-scan"},
)
result = job.wait(interval=5.0, timeout=300)
for output in result.outputs:
try:
payload = json.loads(output.value)
except json.JSONDecodeError:
continue
for ref in payload.get("references", []):
resource_id = ref.get("resource_id")
if resource_id:
content = client.agents.jobs.download_reference(job.id, resource_id)
Path(f"{resource_id}.bin").write_bytes(content)
Run a batch of inputs
import os
from roe import RoeClient
client = RoeClient()
agent_id = os.environ["ROE_TEXT_AGENT_ID"]
batch = client.agents.run_many(
agent_id=agent_id,
batch_inputs=[
{"text": "Summarize the customer complaint."},
{"text": "Extract the requested follow-up action."},
],
timeout_seconds=300,
)
results = batch.wait(interval=5.0, timeout=300)
for job_result in results:
if job_result is None:
continue
for output in job_result.result or []:
print(f"{output.key}: {output.value}")