aurask/api/aurask/knowledge_base.py

114 lines
4.6 KiB
Python

"""AnythingLLM workspace binding and document intake facade."""
from __future__ import annotations
import hashlib
from math import ceil
from aurask.audit import AuditService
from aurask.errors import ForbiddenError, NotFoundError, QuotaError, ValidationError
from aurask.ids import new_id, now_iso
from aurask.plans import PLAN_CATALOG
from aurask.quota import QuotaService
from aurask.repository import JsonStore
ALLOWED_CONTENT_TYPES = {
"text/plain",
"text/markdown",
"text/csv",
"application/pdf",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
}
class AnythingLLMAdapter:
"""MVP adapter that records the external contract without exposing AnythingLLM."""
def create_workspace(self, tenant_id: str, name: str) -> str:
slug = name.lower().replace(" ", "-")[:40]
return f"anythingllm-{tenant_id}-{slug}"
def ingest_document(self, workspace_external_id: str, document: dict) -> dict:
return {
"external_document_id": f"{workspace_external_id}-{document['id']}",
"status": "queued",
}
class KnowledgeBaseService:
def __init__(self, store: JsonStore, quota: QuotaService, audit: AuditService, adapter: AnythingLLMAdapter | None = None) -> None:
self.store = store
self.quota = quota
self.audit = audit
self.adapter = adapter or AnythingLLMAdapter()
def create_workspace(self, tenant_id: str, user_id: str, name: str) -> dict:
if not name:
raise ValidationError("workspace name is required")
account = self.quota.get_account(tenant_id)
existing_count = len([workspace for workspace in self.store.list("workspaces") if workspace["tenant_id"] == tenant_id])
if existing_count >= account["knowledge_bases"]:
raise QuotaError("knowledge base quota exceeded", details={"allowed": account["knowledge_bases"]})
workspace = {
"id": new_id("ws"),
"tenant_id": tenant_id,
"created_by": user_id,
"name": name,
"external_workspace_id": self.adapter.create_workspace(tenant_id, name),
"status": "active",
"created_at": now_iso(),
}
self.store.put("workspaces", workspace["id"], workspace)
self.audit.record("knowledge.workspace_created", tenant_id=tenant_id, user_id=user_id, resource_type="workspace", resource_id=workspace["id"])
return workspace
def upload_document(
self,
tenant_id: str,
user_id: str,
workspace_id: str,
*,
filename: str,
size_bytes: int,
content_type: str,
content_preview: str = "",
) -> dict:
workspace = self.store.get("workspaces", workspace_id)
if not workspace:
raise NotFoundError("workspace not found")
if workspace["tenant_id"] != tenant_id:
raise ForbiddenError("workspace does not belong to tenant")
if content_type not in ALLOWED_CONTENT_TYPES:
raise ValidationError("unsupported content type")
if size_bytes <= 0:
raise ValidationError("document size must be positive")
account = self.quota.get_account(tenant_id)
plan = PLAN_CATALOG[account["plan_code"]]
max_bytes = plan.max_single_file_mb * 1024 * 1024
if size_bytes > max_bytes:
raise QuotaError("single file upload limit exceeded", details={"max_mb": plan.max_single_file_mb})
size_mb = max(1, ceil(size_bytes / 1024 / 1024))
document_id = new_id("doc")
digest = hashlib.sha256(f"{tenant_id}:{workspace_id}:{filename}:{size_bytes}:{content_preview}".encode("utf-8")).hexdigest()
document = {
"id": document_id,
"tenant_id": tenant_id,
"workspace_id": workspace_id,
"uploaded_by": user_id,
"filename": filename,
"size_bytes": size_bytes,
"size_mb": size_mb,
"content_type": content_type,
"content_hash": digest,
"storage_path": f"{tenant_id}/{workspace_id}/{document_id}/{filename}",
"status": "validated",
"created_at": now_iso(),
}
self.quota.consume_storage(tenant_id, size_mb, document_id=document_id)
ingest_result = self.adapter.ingest_document(workspace["external_workspace_id"], document)
document.update(ingest_result)
self.store.put("documents", document_id, document)
self.audit.record("knowledge.document_uploaded", tenant_id=tenant_id, user_id=user_id, resource_type="document", resource_id=document_id)
return document