aurask/api/aurask/orchestrator.py

191 lines
7.7 KiB
Python

"""Template-first workflow orchestration and TBU settlement."""
from __future__ import annotations
from aurask.audit import AuditService
from aurask.errors import ForbiddenError, NotFoundError, ValidationError
from aurask.ids import new_id, now_iso
from aurask.plans import PLAN_CATALOG
from aurask.quota import QuotaService
from aurask.repository import JsonStore
DEFAULT_TEMPLATES: list[dict] = [
{
"id": "tpl_customer_support",
"name": "Customer Support Digital Employee",
"category": "support",
"description": "Answers customer questions from an approved knowledge base.",
"default_estimated_tbu": 80,
"requires_workspace": True,
"allow_python": False,
"allow_custom_components": False,
"allowed_http_domains": [],
"status": "active",
},
{
"id": "tpl_knowledge_qa",
"name": "Knowledge Base Q&A",
"category": "rag",
"description": "Runs a RAG answer against a tenant-bound AnythingLLM workspace.",
"default_estimated_tbu": 60,
"requires_workspace": True,
"allow_python": False,
"allow_custom_components": False,
"allowed_http_domains": [],
"status": "active",
},
{
"id": "tpl_email_assistant",
"name": "Email Draft Assistant",
"category": "productivity",
"description": "Drafts English email responses from structured inputs.",
"default_estimated_tbu": 50,
"requires_workspace": False,
"allow_python": False,
"allow_custom_components": False,
"allowed_http_domains": [],
"status": "active",
},
{
"id": "tpl_spreadsheet_processor",
"name": "Spreadsheet Processing Assistant",
"category": "data",
"description": "Summarizes CSV-like tabular data without arbitrary code execution.",
"default_estimated_tbu": 70,
"requires_workspace": False,
"allow_python": False,
"allow_custom_components": False,
"allowed_http_domains": [],
"status": "active",
},
]
class LangflowRuntimeAdapter:
"""MVP Langflow facade.
It only executes approved templates and returns deterministic output. A real
Langflow Runtime Pool can replace this adapter without changing gateway,
quota, or audit boundaries.
"""
def execute(self, template: dict, inputs: dict, *, estimated_tbu: int) -> dict:
if inputs.get("simulate_failure"):
raise RuntimeError("simulated runtime failure")
actual_tbu = max(1, int(estimated_tbu * 0.9))
return {
"status": "completed",
"actual_tbu": actual_tbu,
"output": {
"template_id": template["id"],
"summary": f"Executed approved template: {template['name']}",
"redacted_input_keys": sorted(inputs.keys()),
},
}
class WorkflowOrchestrator:
def __init__(self, store: JsonStore, quota: QuotaService, audit: AuditService, runtime: LangflowRuntimeAdapter | None = None) -> None:
self.store = store
self.quota = quota
self.audit = audit
self.runtime = runtime or LangflowRuntimeAdapter()
def seed_templates(self) -> None:
for template in DEFAULT_TEMPLATES:
self.store.put("workflow_templates", template["id"], template)
def list_templates(self) -> list[dict]:
return [template for template in self.store.list("workflow_templates") if template["status"] == "active"]
def run_template(
self,
tenant_id: str,
user_id: str,
template_id: str,
*,
workspace_id: str | None = None,
inputs: dict | None = None,
) -> dict:
inputs = inputs or {}
template = self.store.get("workflow_templates", template_id)
if not template or template["status"] != "active":
raise NotFoundError("workflow template not found")
self._validate_template_is_safe(template)
if template["requires_workspace"]:
self._require_workspace(tenant_id, workspace_id)
account = self.quota.get_account(tenant_id)
plan = PLAN_CATALOG[account["plan_code"]]
if account["active_workflow_runs"] >= plan.max_concurrent_runs:
raise ValidationError("workflow concurrency limit exceeded")
estimated_tbu = self._estimate_tbu(template, inputs)
run = {
"id": new_id("run"),
"tenant_id": tenant_id,
"user_id": user_id,
"template_id": template_id,
"workspace_id": workspace_id,
"status": "reserved",
"estimated_tbu": estimated_tbu,
"created_at": now_iso(),
}
self.store.put("workflow_runs", run["id"], run)
reservation = self.quota.reserve_tbu(tenant_id, estimated_tbu, workflow_run_id=run["id"])
account["active_workflow_runs"] += 1
self.store.put("quota_accounts", tenant_id, account)
try:
result = self.runtime.execute(template, inputs, estimated_tbu=estimated_tbu)
self.quota.settle_reservation(reservation["id"], result["actual_tbu"])
run.update(
{
"status": "completed",
"actual_tbu": result["actual_tbu"],
"output": result["output"],
"completed_at": now_iso(),
}
)
self._record_usage(run, result["actual_tbu"])
except Exception as exc:
provider_billed_tbu = int(inputs.get("provider_billed_tbu", 0) or 0)
self.quota.refund_reservation(reservation["id"], provider_billed_tbu=provider_billed_tbu)
run.update({"status": "failed", "error_type": exc.__class__.__name__, "error_message": str(exc), "completed_at": now_iso()})
finally:
final_account = self.quota.get_account(tenant_id)
final_account["active_workflow_runs"] = max(0, final_account["active_workflow_runs"] - 1)
self.store.put("quota_accounts", tenant_id, final_account)
self.store.put("workflow_runs", run["id"], run)
self.audit.record("workflow.run_finished", tenant_id=tenant_id, user_id=user_id, resource_type="workflow_run", resource_id=run["id"], metadata={"status": run["status"]})
return run
def _validate_template_is_safe(self, template: dict) -> None:
if template.get("allow_python") or template.get("allow_custom_components"):
raise ForbiddenError("unsafe template capabilities are not allowed for shared runtime")
def _require_workspace(self, tenant_id: str, workspace_id: str | None) -> dict:
if not workspace_id:
raise ValidationError("workspace_id is required for this template")
workspace = self.store.get("workspaces", workspace_id)
if not workspace:
raise NotFoundError("workspace not found")
if workspace["tenant_id"] != tenant_id:
raise ForbiddenError("workspace does not belong to tenant")
return workspace
def _estimate_tbu(self, template: dict, inputs: dict) -> int:
input_size = sum(len(str(value)) for value in inputs.values() if not isinstance(value, bool))
return max(template["default_estimated_tbu"], template["default_estimated_tbu"] + input_size // 600)
def _record_usage(self, run: dict, actual_tbu: int) -> dict:
usage = {
"id": new_id("usage"),
"tenant_id": run["tenant_id"],
"user_id": run["user_id"],
"workflow_run_id": run["id"],
"template_id": run["template_id"],
"tbu": actual_tbu,
"provider_units": round(actual_tbu / 1.25, 4),
"created_at": now_iso(),
}
return self.store.put("usage_records", usage["id"], usage)