API Examples
All API endpoints are authenticated using Bearer tokens which you can find under Settings
on the platform.
import argparse
import json
import os
import requests
import time
from pathlib import Path
def run_agent_async(api_key, agent_id, file_path):
"""Run an agent asynchronously with a file input."""
url = f"https://api.roe-ai.com/v1/agents/run/{agent_id}/async/"
headers = {"Authorization": f"Bearer {api_key}"}
# Upload file directly
with open(file_path, "rb") as f:
files = {"pdf_file": (os.path.basename(file_path), f, "application/pdf")}
response = requests.post(url, headers=headers, files=files)
if response.status_code == 200:
job_id = response.json()
print(f"Started job {job_id} for file: {os.path.basename(file_path)}")
return job_id
else:
print(
f"Failed to start job for {file_path}: {response.status_code} - {response.text}"
)
return None
def get_job_status(api_key, job_id):
"""Get the status of a job."""
url = f"https://api.roe-ai.com/v1/agents/jobs/{job_id}/status/"
headers = {"Authorization": f"Bearer {api_key}"}
response = requests.get(url, headers=headers)
if response.status_code == 200:
return response.json()
else:
print(f"Failed to get status for job {job_id}: {response.status_code}")
return None
def is_job_complete(status_code):
"""Check if job is in terminal state."""
terminal_states = {3, 4, 5, 6} # 3: SUCCESS, 4: FAILURE, 5: CANCELLED, 6: CACHED
return status_code in terminal_states
def poll_jobs_until_complete(api_key, job_ids, poll_interval=5):
"""Poll job statuses until all jobs are complete."""
job_statuses = {}
completed_jobs = set()
print(f"Polling {len(job_ids)} jobs every {poll_interval} seconds...")
while len(completed_jobs) < len(job_ids):
for job_id in job_ids:
if job_id in completed_jobs:
continue
status_info = get_job_status(api_key, job_id)
if status_info:
status_code = status_info.get("status")
job_statuses[job_id] = status_info
if is_job_complete(status_code):
completed_jobs.add(job_id)
print(f"Job {job_id} completed with status {status_code}")
else:
print(f"Job {job_id} still running (status: {status_code})")
if len(completed_jobs) < len(job_ids):
time.sleep(poll_interval)
print("All jobs completed!")
return job_statuses
def get_files_from_folder(folder_path):
"""Get all files from the specified folder."""
folder = Path(folder_path)
if not folder.exists():
print(f"Folder does not exist: {folder_path}")
return []
files = [f for f in folder.iterdir() if f.is_file()]
print(f"Found {len(files)} files in {folder_path}")
return files
def main():
parser = argparse.ArgumentParser(
description="Run Roe AI agent jobs on files in a folder"
)
parser.add_argument("--api-key", required=True, help="Roe AI API key")
parser.add_argument("--agent-id", required=True, help="Agent ID to run")
parser.add_argument("--folder", required=True, help="Folder containing input files")
args = parser.parse_args()
# Get files from folder
files = get_files_from_folder(args.folder)
if not files:
print("No files found to process")
return
print(f"Starting jobs for {len(files)} files...")
# Start all jobs
job_ids = []
for file_path in files:
job_id = run_agent_async(args.api_key, args.agent_id, file_path)
if job_id:
job_ids.append(job_id)
if not job_ids:
print("No jobs were started successfully")
return
# Poll until all jobs complete
final_results = poll_jobs_until_complete(args.api_key, job_ids)
# Print results
print("\nFinal Results:")
print("-" * 50)
for job_id, status_info in final_results.items():
print(f"Job ID: {job_id}")
print(f" Status: {status_info.get('status')}")
print(f" Timestamp: {status_info.get('timestamp')}")
# Print job status map
print("\nJob Status Map:")
job_status_map = {
job_id: info.get("status") for job_id, info in final_results.items()
}
print(json.dumps(job_status_map, indent=2))
if __name__ == "__main__":
main()