Run a batch of jobs and get the job statuses

All API endpoints are authenticated using Bearer tokens which you can find under Settings on the platform.

batch_job_runner.py
import argparse
import json
import os
import requests
import time
from pathlib import Path


def run_agent_async(api_key, agent_id, file_path):
    """Run an agent asynchronously with a file input."""
    url = f"https://api.roe-ai.com/v1/agents/run/{agent_id}/async/"
    headers = {"Authorization": f"Bearer {api_key}"}

    # Upload file directly
    with open(file_path, "rb") as f:
        files = {"pdf_file": (os.path.basename(file_path), f, "application/pdf")}
        response = requests.post(url, headers=headers, files=files)

    if response.status_code == 200:
        job_id = response.json()
        print(f"Started job {job_id} for file: {os.path.basename(file_path)}")
        return job_id
    else:
        print(
            f"Failed to start job for {file_path}: {response.status_code} - {response.text}"
        )
        return None


def get_job_status(api_key, job_id):
    """Get the status of a job."""
    url = f"https://api.roe-ai.com/v1/agents/jobs/{job_id}/status/"
    headers = {"Authorization": f"Bearer {api_key}"}

    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        return response.json()
    else:
        print(f"Failed to get status for job {job_id}: {response.status_code}")
        return None


def is_job_complete(status_code):
    """Check if job is in terminal state."""
    terminal_states = {3, 4, 5, 6}  # 3: SUCCESS, 4: FAILURE, 5: CANCELLED, 6: CACHED
    return status_code in terminal_states


def poll_jobs_until_complete(api_key, job_ids, poll_interval=5):
    """Poll job statuses until all jobs are complete."""
    job_statuses = {}
    completed_jobs = set()

    print(f"Polling {len(job_ids)} jobs every {poll_interval} seconds...")

    while len(completed_jobs) < len(job_ids):
        for job_id in job_ids:
            if job_id in completed_jobs:
                continue

            status_info = get_job_status(api_key, job_id)
            if status_info:
                status_code = status_info.get("status")
                job_statuses[job_id] = status_info

                if is_job_complete(status_code):
                    completed_jobs.add(job_id)
                    print(f"Job {job_id} completed with status {status_code}")
                else:
                    print(f"Job {job_id} still running (status: {status_code})")

        if len(completed_jobs) < len(job_ids):
            time.sleep(poll_interval)

    print("All jobs completed!")
    return job_statuses


def get_files_from_folder(folder_path):
    """Get all files from the specified folder."""
    folder = Path(folder_path)
    if not folder.exists():
        print(f"Folder does not exist: {folder_path}")
        return []

    files = [f for f in folder.iterdir() if f.is_file()]
    print(f"Found {len(files)} files in {folder_path}")
    return files


def main():
    parser = argparse.ArgumentParser(
        description="Run Roe AI agent jobs on files in a folder"
    )
    parser.add_argument("--api-key", required=True, help="Roe AI API key")
    parser.add_argument("--agent-id", required=True, help="Agent ID to run")
    parser.add_argument("--folder", required=True, help="Folder containing input files")

    args = parser.parse_args()

    # Get files from folder
    files = get_files_from_folder(args.folder)
    if not files:
        print("No files found to process")
        return

    print(f"Starting jobs for {len(files)} files...")

    # Start all jobs
    job_ids = []
    for file_path in files:
        job_id = run_agent_async(args.api_key, args.agent_id, file_path)
        if job_id:
            job_ids.append(job_id)

    if not job_ids:
        print("No jobs were started successfully")
        return

    # Poll until all jobs complete
    final_results = poll_jobs_until_complete(args.api_key, job_ids)

    # Print results
    print("\nFinal Results:")
    print("-" * 50)
    for job_id, status_info in final_results.items():
        print(f"Job ID: {job_id}")
        print(f"  Status: {status_info.get('status')}")
        print(f"  Timestamp: {status_info.get('timestamp')}")

    # Print job status map
    print("\nJob Status Map:")
    job_status_map = {
        job_id: info.get("status") for job_id, info in final_results.items()
    }
    print(json.dumps(job_status_map, indent=2))


if __name__ == "__main__":
    main()