teleo-infrastructure/scripts/migrate-entity-schema.py
m3taversal d2aec7fee3
Some checks are pending
CI / lint-and-test (push) Waiting to run
feat: reorganize repo with clear directory boundaries and agent ownership
Move scattered root-level files into categorized directories:
- deploy/ — deployment + mirror scripts (Ship)
- scripts/ — one-off backfills + migrations (Ship)
- research/ — nightly research + prompts (Ship)
- docs/ — all operational documentation (shared)

Delete 3 dead cron scripts replaced by pipeline daemon:
- batch-extract-50.sh, evaluate-trigger.sh, extract-cron.sh

Add CODEOWNERS mapping every path to its owning agent.
Add README with directory structure, ownership table, and VPS layout.
Update deploy.sh paths to match new structure.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-14 18:20:13 +01:00

100 lines
3.5 KiB
Python

#!/usr/bin/env python3
"""Entity schema migration — separate decisions from entities.
Step 1: Move decision_market entities to decisions/{domain}/
Step 2: Update frontmatter (type: entity → type: decision)
Step 3: Update pipeline config (TYPE_SCHEMAS, entity paths)
Run from the repo root:
cd /opt/teleo-eval/workspaces/main # or extract/
python3 /opt/teleo-eval/pipeline/migrate-entity-schema.py [--dry-run]
Epimetheus. Reviewed by Leo (architecture), Rio (taxonomy), Ganymede (migration path).
"""
import argparse
import glob
import os
import re
import shutil
from pathlib import Path
def find_decision_markets(repo_root: str) -> list[dict]:
"""Find all decision_market entity files."""
decisions = []
for filepath in glob.glob(os.path.join(repo_root, "entities", "*", "*.md")):
try:
content = open(filepath).read()
except Exception:
continue
if "entity_type: decision_market" in content:
domain = Path(filepath).parent.name
filename = Path(filepath).name
decisions.append({
"source": filepath,
"domain": domain,
"filename": filename,
"dest": os.path.join(repo_root, "decisions", domain, filename),
})
return decisions
def update_frontmatter_type(content: str) -> str:
"""Change type: entity to type: decision for decision files."""
content = re.sub(r"^type:\s*entity\s*$", "type: decision", content, count=1, flags=re.MULTILINE)
return content
def migrate(repo_root: str, dry_run: bool = False):
"""Run the migration."""
decisions = find_decision_markets(repo_root)
print(f"Found {len(decisions)} decision_market files to migrate")
# Group by domain
by_domain: dict[str, list] = {}
for d in decisions:
by_domain.setdefault(d["domain"], []).append(d)
for domain, files in by_domain.items():
print(f"\n {domain}: {len(files)} decisions")
dest_dir = os.path.join(repo_root, "decisions", domain)
if not dry_run:
os.makedirs(dest_dir, exist_ok=True)
for f in files:
print(f" {f['filename']}")
if not dry_run:
# Read, update frontmatter, write to new location
content = open(f["source"]).read()
content = update_frontmatter_type(content)
with open(f["dest"], "w") as out:
out.write(content)
# Remove original
os.remove(f["source"])
# Summary
remaining_entities = glob.glob(os.path.join(repo_root, "entities", "*", "*.md"))
remaining_by_domain: dict[str, int] = {}
for f in remaining_entities:
d = Path(f).parent.name
remaining_by_domain[d] = remaining_by_domain.get(d, 0) + 1
print(f"\n{'='*60}")
print(f" MIGRATION {'(DRY RUN) ' if dry_run else ''}COMPLETE")
print(f" Decisions moved: {len(decisions)}")
print(f" Entities remaining: {len(remaining_entities)}")
for domain, count in sorted(remaining_by_domain.items()):
print(f" {domain}: {count}")
print(f" Decision directories created: {list(by_domain.keys())}")
print(f"{'='*60}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Migrate decision_market entities to decisions/")
parser.add_argument("--repo-root", default=".", help="Repository root")
parser.add_argument("--dry-run", action="store_true", help="Show what would change without changing")
args = parser.parse_args()
migrate(args.repo_root, args.dry_run)