teleo-infrastructure/diagnostics/review_queue_routes.py
m3taversal 681afad506
Some checks failed
CI / lint-and-test (push) Has been cancelled
Consolidate pipeline code from teleo-codex + VPS into single repo
Sources merged:
- teleo-codex/ops/pipeline-v2/ (11 newer lib files, 5 new lib modules)
- teleo-codex/ops/ (agent-state, diagnostics expansion, systemd units, ops scripts)
- VPS /opt/teleo-eval/telegram/ (10 new bot files, agent configs)
- VPS /opt/teleo-eval/pipeline/ops/ (vector-gc, backfill-descriptions)
- VPS /opt/teleo-eval/sync-mirror.sh (Bug 2 + Step 2.5 fixes)

Non-trivial merges:
- connect.py: kept codex threshold (0.65) + added infra domain parameter
- watchdog.py: kept infra version (stale_pr integration, superset of codex)
- deploy.sh: codex rsync version (interim, until VPS git clone migration)
- diagnostics/app.py: codex decomposed dashboard (14 new route modules)

81 files changed, +17105/-200 lines

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 16:52:26 +01:00

64 lines
1.9 KiB
Python

"""Route handlers for /api/review-queue endpoint.
Import into app.py and register routes in create_app().
"""
import logging
from aiohttp import web
from review_queue import fetch_review_queue
logger = logging.getLogger("argus.review_queue")
async def handle_review_queue(request):
"""GET /api/review-queue — PR review pipeline view.
Query params:
status: filter by status (broken, needs-review, approved-awaiting-merge, changes-requested)
author: filter by agent/author name
domain: filter by domain
Returns JSON with queue items sorted by display priority:
broken (flagged) > needs-review (by age) > approved-awaiting-merge
"""
token = request.app.get("_forgejo_token")
try:
queue = await fetch_review_queue(forgejo_token=token)
except Exception as e:
logger.error("Review queue fetch failed: %s", e)
return web.json_response({"error": str(e)}, status=500)
# Apply filters
status_filter = request.query.get("status")
if status_filter:
queue = [item for item in queue if item["status"] == status_filter]
author_filter = request.query.get("author")
if author_filter:
queue = [item for item in queue if item["author"] == author_filter]
domain_filter = request.query.get("domain")
if domain_filter:
queue = [item for item in queue if item["domain"] == domain_filter]
# Summary stats
status_counts = {}
for item in queue:
status_counts[item["status"]] = status_counts.get(item["status"], 0) + 1
return web.json_response({
"queue": queue,
"total": len(queue),
"status_counts": status_counts,
})
def register_review_queue_routes(app, forgejo_token=None):
"""Register review queue routes on the app.
forgejo_token: optional Forgejo API token for authenticated requests
"""
app["_forgejo_token"] = forgejo_token
app.router.add_get("/api/review-queue", handle_review_queue)