Some checks failed
CI / lint-and-test (pull_request) Has been cancelled
Atomic extract-and-connect (lib/connect.py): - After extraction writes claim files, each new claim is embedded via OpenRouter, searched against Qdrant, and top-5 neighbors (cosine > 0.55) are added as `related` edges in the claim's frontmatter - Edges written on NEW claim only — avoids merge conflicts - Cross-domain connections enabled, non-fatal on Qdrant failure - Wired into openrouter-extract-v2.py post-extraction step Stale PR monitor (lib/stale_pr.py): - Every watchdog cycle checks open extract/* PRs - If open >30 min AND 0 claim files → auto-close with comment - After 2 stale closures → marks source as extraction_failed - Wired into watchdog.py as check #6 Response audit system: - response_audit table (migration v8), persistent audit conn in bot.py - 90-day retention cleanup, tool_calls JSON column - Confidence tag stripping, systemd ReadWritePaths for pipeline.db Supporting infrastructure: - reweave.py: nightly edge reconnection for orphan claims - reconcile-sources.py: source status reconciliation - backfill-domains.py: domain classification backfill - ops/reconcile-source-status.sh: operational reconciliation script - Attribution improvements, post-extract enrichments, merge improvements Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
220 lines
7.4 KiB
Python
220 lines
7.4 KiB
Python
"""Stale PR monitor — auto-close extraction PRs that produced no claims.
|
|
|
|
Catches the failure mode where batch-extract creates a PR but extraction
|
|
produces only source-file updates (no actual claims). These PRs sit open
|
|
indefinitely, consuming merge queue bandwidth and confusing metrics.
|
|
|
|
Rules:
|
|
- PR branch starts with "extract/"
|
|
- PR is open for >30 minutes
|
|
- PR diff contains 0 files in domains/*/ or decisions/*/
|
|
→ Auto-close with comment, log to audit_log as stale_extraction_closed
|
|
|
|
- If same source branch has been stale-closed 2+ times
|
|
→ Mark source as extraction_failed in pipeline.db sources table
|
|
|
|
Called from the pipeline daemon (piggyback on validate_cycle interval)
|
|
or standalone via: python3 -m lib.stale_pr
|
|
|
|
Owner: Epimetheus
|
|
"""
|
|
|
|
import logging
|
|
import json
|
|
import os
|
|
import re
|
|
import sqlite3
|
|
import urllib.request
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
from . import config
|
|
|
|
logger = logging.getLogger("pipeline.stale_pr")
|
|
|
|
STALE_THRESHOLD_MINUTES = 30
|
|
MAX_STALE_FAILURES = 2 # After this many stale closures, mark source as failed
|
|
|
|
|
|
def _forgejo_api(method: str, path: str, body: dict | None = None) -> dict | list | None:
|
|
"""Call Forgejo API. Returns parsed JSON or None on failure."""
|
|
token_file = config.FORGEJO_TOKEN_FILE
|
|
if not token_file.exists():
|
|
logger.error("No Forgejo token at %s", token_file)
|
|
return None
|
|
token = token_file.read_text().strip()
|
|
|
|
url = f"{config.FORGEJO_URL}/api/v1/{path}"
|
|
data = json.dumps(body).encode() if body else None
|
|
req = urllib.request.Request(
|
|
url,
|
|
data=data,
|
|
headers={
|
|
"Authorization": f"token {token}",
|
|
"Content-Type": "application/json",
|
|
},
|
|
method=method,
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=15) as resp:
|
|
return json.loads(resp.read())
|
|
except Exception as e:
|
|
logger.warning("Forgejo API %s %s failed: %s", method, path, e)
|
|
return None
|
|
|
|
|
|
def _pr_has_claim_files(pr_number: int) -> bool:
|
|
"""Check if a PR's diff contains any files in domains/ or decisions/."""
|
|
diff_data = _forgejo_api("GET", f"repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls/{pr_number}/files")
|
|
if not diff_data or not isinstance(diff_data, list):
|
|
return False
|
|
|
|
for file_entry in diff_data:
|
|
filename = file_entry.get("filename", "")
|
|
if filename.startswith("domains/") or filename.startswith("decisions/"):
|
|
# Check it's a .md file, not a directory marker
|
|
if filename.endswith(".md"):
|
|
return True
|
|
return False
|
|
|
|
|
|
def _close_pr(pr_number: int, reason: str) -> bool:
|
|
"""Close a PR with a comment explaining why."""
|
|
# Add comment
|
|
_forgejo_api("POST",
|
|
f"repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/issues/{pr_number}/comments",
|
|
{"body": f"Auto-closed by stale PR monitor: {reason}\n\nPentagon-Agent: Epimetheus"},
|
|
)
|
|
# Close PR
|
|
result = _forgejo_api("PATCH",
|
|
f"repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls/{pr_number}",
|
|
{"state": "closed"},
|
|
)
|
|
return result is not None
|
|
|
|
|
|
def _log_audit(conn: sqlite3.Connection, pr_number: int, branch: str):
|
|
"""Log stale closure to audit_log."""
|
|
try:
|
|
conn.execute(
|
|
"INSERT INTO audit_log (timestamp, stage, event, detail) VALUES (datetime('now'), ?, ?, ?)",
|
|
("monitor", "stale_extraction_closed", json.dumps({"pr": pr_number, "branch": branch})),
|
|
)
|
|
conn.commit()
|
|
except Exception as e:
|
|
logger.warning("Audit log write failed: %s", e)
|
|
|
|
|
|
def _count_stale_closures(conn: sqlite3.Connection, branch: str) -> int:
|
|
"""Count how many times this branch has been stale-closed."""
|
|
try:
|
|
row = conn.execute(
|
|
"SELECT COUNT(*) FROM audit_log WHERE event = 'stale_extraction_closed' AND detail LIKE ?",
|
|
(f'%"branch": "{branch}"%',),
|
|
).fetchone()
|
|
return row[0] if row else 0
|
|
except Exception:
|
|
return 0
|
|
|
|
|
|
def _mark_source_failed(conn: sqlite3.Connection, branch: str):
|
|
"""Mark the source as extraction_failed after repeated stale closures."""
|
|
# Extract source name from branch: extract/source-name → source-name
|
|
source_name = branch.removeprefix("extract/")
|
|
try:
|
|
conn.execute(
|
|
"UPDATE sources SET status = 'extraction_failed', last_error = 'repeated_stale_extraction', updated_at = datetime('now') WHERE path LIKE ?",
|
|
(f"%{source_name}%",),
|
|
)
|
|
conn.commit()
|
|
logger.info("Marked source %s as extraction_failed (repeated stale closures)", source_name)
|
|
except Exception as e:
|
|
logger.warning("Failed to mark source as failed: %s", e)
|
|
|
|
|
|
def check_stale_prs(conn: sqlite3.Connection) -> tuple[int, int]:
|
|
"""Check for and close stale extraction PRs.
|
|
|
|
Returns (closed_count, error_count).
|
|
"""
|
|
closed = 0
|
|
errors = 0
|
|
|
|
# Fetch all open PRs (paginated)
|
|
page = 1
|
|
all_prs = []
|
|
while True:
|
|
prs = _forgejo_api("GET",
|
|
f"repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls?state=open&limit=50&page={page}")
|
|
if not prs:
|
|
break
|
|
all_prs.extend(prs)
|
|
if len(prs) < 50:
|
|
break
|
|
page += 1
|
|
|
|
now = datetime.now(timezone.utc)
|
|
|
|
for pr in all_prs:
|
|
branch = pr.get("head", {}).get("ref", "")
|
|
if not branch.startswith("extract/"):
|
|
continue
|
|
|
|
# Check age
|
|
created_str = pr.get("created_at", "")
|
|
if not created_str:
|
|
continue
|
|
try:
|
|
# Forgejo returns ISO format with Z suffix
|
|
created = datetime.fromisoformat(created_str.replace("Z", "+00:00"))
|
|
except ValueError:
|
|
continue
|
|
|
|
age_minutes = (now - created).total_seconds() / 60
|
|
if age_minutes < STALE_THRESHOLD_MINUTES:
|
|
continue
|
|
|
|
pr_number = pr["number"]
|
|
|
|
# Check if PR has claim files
|
|
if _pr_has_claim_files(pr_number):
|
|
continue # PR has claims — not stale
|
|
|
|
# PR is stale — close it
|
|
logger.info("Stale PR #%d: branch=%s, age=%.0f min, no claim files — closing",
|
|
pr_number, branch, age_minutes)
|
|
|
|
if _close_pr(pr_number, f"No claim files after {int(age_minutes)} minutes. Branch: {branch}"):
|
|
closed += 1
|
|
_log_audit(conn, pr_number, branch)
|
|
|
|
# Check for repeated failures
|
|
failure_count = _count_stale_closures(conn, branch)
|
|
if failure_count >= MAX_STALE_FAILURES:
|
|
_mark_source_failed(conn, branch)
|
|
logger.warning("Source %s marked as extraction_failed after %d stale closures",
|
|
branch, failure_count)
|
|
else:
|
|
errors += 1
|
|
logger.warning("Failed to close stale PR #%d", pr_number)
|
|
|
|
if closed:
|
|
logger.info("Stale PR monitor: closed %d PRs", closed)
|
|
|
|
return closed, errors
|
|
|
|
|
|
# Allow standalone execution
|
|
if __name__ == "__main__":
|
|
import sys
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
|
|
|
db_path = config.DB_PATH
|
|
if not db_path.exists():
|
|
print(f"ERROR: Database not found at {db_path}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
conn = sqlite3.connect(str(db_path))
|
|
conn.row_factory = sqlite3.Row
|
|
closed, errs = check_stale_prs(conn)
|
|
print(f"Stale PR monitor: {closed} closed, {errs} errors")
|
|
conn.close()
|