#!/usr/bin/env python3 """ tq.py — the TokenQuest lite agent. This is the ONLY code with access to your ~/.claude data. It is a single file, Python 3.9+ stdlib only, and it never updates itself. Read it before you run it. What it does (current build — steps 1–2 of the install story): --feed derive your Claude Code usage stats from ~/.claude/projects into a panel-grade claude-stats.json (printed or --output). Everything happens locally; NOTHING is sent anywhere by this command. --play serve the game locally: a tiny HTTP server on 127.0.0.1 that serves the panel + your locally-derived feed. Local only — it binds loopback and sends nothing anywhere. Both keep a small LOCAL ledger (~/.tokenquest/ledger.db, mode 600) so sessions survive Claude Code's transcript pruning: from the day you install, your progression is genuinely all-time. --no-ledger disables it (stateless derive). --update fetch/refresh the panel bundle (the game's display shell) from the TokenQuest server into ~/.tokenquest/bundle. The bundle is display-layer only: it runs sandboxed in your browser and never touches your data. THIS SCRIPT never updates itself. --register claim your hero: mint a handle + passphrase + device token on the public board. The passphrase is shown ONCE and never stored. --dry-run print the EXACT payload --push would send — fourteen labeled aggregate measurements (hours, prompts, token counts...), a motto, an avatar. No project names, no histograms, no text you typed, no daily series. Nothing is sent by this command. --push send that payload to the board (your standing in the Halls). The server derives your stats/level from the measurements. Run it from cron every 5 minutes if you want an always-fresh hero — --register prints the line; we never install it for you. --mirror on opt-in "view my panel anywhere": each push ALSO uploads your panel feed — scrubbed first (see scrub_feed_for_mirror: project names, the weekday-hour matrix and non-chrome-devtools mcp tool names never leave your box) — and the site serves your living panel at an UNLISTED link whose rotatable key is printed here. The stored daily series doubles as your history escrow: with a mirror on, even your montage survives a reinstall. --mirror off deletes the stored feed. Default is OFF. While --play runs with a registered hero, it pushes every 5 minutes (the banner says so; --no-push disables) — cron is only needed for freshness while NOT playing. Derivation is a faithful port of the reference extractor (tq-extract.py) minus its server-side machinery (ledger persistence, remote fragments, archive seeds, competition feeds). Scope difference by design: this reads ONE user's ~/.claude (yours), not a multi-home server corpus, and it re-derives from the transcripts that exist right now (Claude Code prunes old transcripts; the game's backfill is by-derivation, so this is the intended custody model for a player's machine). """ import argparse import glob import json import os import re import shutil import sqlite3 import sys import threading from collections import Counter, defaultdict from datetime import date, datetime, timedelta, timezone from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from zoneinfo import ZoneInfo SCHEMA_VERSION = 1 WORK_SESSION_GAP = timedelta(minutes=20) DAY = timedelta(days=1) FALLBACK_MODEL = "claude-opus-4-7" URL_RE = re.compile(r"https?://\S+") # Typed-char cleanup: drop pasted code (fenced ```...``` blocks, then `inline` spans — order matters). FENCE_RE = re.compile(r"```.*?```", re.S) INLINE_CODE_RE = re.compile(r"`[^`]*`") # Pasted-markdown drop: >= MD_DROP_MIN structural markers (## headers + **bold**) => a pasted/generated # document the user wouldn't hand-type; dropped whole from word/char/prompt counts. MD_HEADER_RE = re.compile(r"^[ \t]*#{2,6}[ \t]+\S", re.M) MD_BOLD_RE = re.compile(r"\*\*[^*\n]+\*\*") MD_DROP_MIN = 3 IGNORE_RE = re.compile(r"\bignore\b", re.IGNORECASE) SCALAR_KEYS = ("tokens_input", "tokens_output", "tokens_cache_read", "tokens_cache_create", "user_words", "user_chars", "user_chars_typed", "user_prompts", "sessions", "work_sessions", "total_active_min", "nightowl_active_min", "compactions") # Defensive caps on transcript input: never drop legitimate data (~30-40x above the largest # real lines/files observed), but bound a pathological multi-GB file from OOM-ing the run. MAX_TRANSCRIPT_BYTES = 256 * 1024 * 1024 MAX_LINE_BYTES = 16 * 1024 * 1024 # Hypothetical pay-as-you-go pricing (USD per 1M tokens) — cost is a fun stat, not a bill. # Embedded so the agent stays a single file; override with --pricing if rates move. PRICING_EMBEDDED = { "pricing_date": "2026-06-01", "models": { "claude-opus-4-8": {"input": 5.00, "output": 25.00, "cache_read": 0.50, "cache_create": 6.25}, "claude-opus-4-7": {"input": 5.00, "output": 25.00, "cache_read": 0.50, "cache_create": 6.25}, "claude-opus-4-6": {"input": 5.00, "output": 25.00, "cache_read": 0.50, "cache_create": 6.25}, "claude-sonnet-4-6": {"input": 3.00, "output": 15.00, "cache_read": 0.30, "cache_create": 3.75}, "claude-haiku-4-5-20251001": {"input": 1.00, "output": 5.00, "cache_read": 0.10, "cache_create": 1.25}, }, } # Local timezone, resolved in main() (--tz > /etc/localtime > fixed local offset). TZ = timezone.utc TZ_NAME = "UTC" # The agent's home: ledger + (later) device token + cached panel bundle. Mode 700. TQ_DIR = os.environ.get("TOKENQUEST_DIR") or os.path.expanduser("~/.tokenquest") # Where the panel bundle + (later) the board live. The bundle fetch is the ONLY # outbound request in this build, and it DOWNLOADS display code — it never sends # anything about you (check ensure_bundle: plain GETs, no payload, no identifiers). BASE_URL = "https://tokenquest.tehgaem.com" # panel.html is served locally as index.html; data/* is bundled ENGINE CONFIG (bar styles, # layout, combat knobs, weather, backfill mechanics). The WORLD (arcs/regions/foes/quests/...) # arrives separately via the content drip, clamped to how far your hero has marched. BUNDLE_FILES = ("panel.html", "character-page.html", "data/barstyle.json", "data/layout.json", "data/combat.json", "data/weather.json", "data/backfill.json") _warnings = set() def warn(msg): if msg not in _warnings: _warnings.add(msg) print("WARN:", msg, file=sys.stderr) def resolve_tz(name=None): if name: return ZoneInfo(name), name try: # /etc/localtime is a symlink into the IANA db on most Linux/macOS link = os.readlink("/etc/localtime") key = link.split("zoneinfo/", 1)[1] return ZoneInfo(key), key except (OSError, IndexError, KeyError): pass ltz = datetime.now().astimezone().tzinfo return ltz, str(ltz) # --------------------------------------------------------------------------- # # Helpers (ported from the reference extractor) # --------------------------------------------------------------------------- # def parse_ts(s): if not s: return None try: if s.endswith("Z"): s = s[:-1] + "+00:00" dt = datetime.fromisoformat(s) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt except (ValueError, TypeError): return None def local_date_str(dt): return dt.astimezone(TZ).date().isoformat() def load_pricing(path=None): data = PRICING_EMBEDDED if path: try: with open(path) as fh: data = json.load(fh) except Exception as e: warn("could not load pricing %s (%s); using embedded table" % (path, e)) data = PRICING_EMBEDDED return data.get("models", {}), data.get("pricing_date", "unknown") def cost_for_model_tokens(model, toks, pricing): rate = pricing.get(model) if rate is None: warn("model %r not in pricing table; using %s rates" % (model, FALLBACK_MODEL)) rate = pricing.get(FALLBACK_MODEL, {}) return { "input_usd": toks.get("input", 0) * rate.get("input", 0) / 1_000_000, "output_usd": toks.get("output", 0) * rate.get("output", 0) / 1_000_000, "cache_read_usd": toks.get("cache_read", 0) * rate.get("cache_read", 0) / 1_000_000, "cache_create_usd": toks.get("cache_create", 0) * rate.get("cache_create", 0) / 1_000_000, } def cost_from_model_tokens(mtoks, pricing): acc = {"input_usd": 0.0, "output_usd": 0.0, "cache_read_usd": 0.0, "cache_create_usd": 0.0} for m, t in mtoks.items(): c = cost_for_model_tokens(m, t, pricing) for k in acc: acc[k] += c[k] acc["total_usd"] = sum(acc.values()) return acc # --------------------------------------------------------------------------- # # Per-session parsing → raw records (cross-session dedup picks one owner per record) # --------------------------------------------------------------------------- # def group_files(projdir): """Yield (sessionId, [files], has_top): a session's transcript + its subagent files.""" files = glob.glob(os.path.join(projdir, "**", "*.jsonl"), recursive=True) groups = defaultdict(list) has_top = {} for f in files: rel = os.path.relpath(f, projdir).split(os.sep) if len(rel) == 2: # /.jsonl sid = rel[1][:-6] if rel[1].endswith(".jsonl") else rel[1] has_top[sid] = True else: # //subagents/... sid = rel[1] has_top.setdefault(sid, False) groups[sid].append(f) for sid, fl in groups.items(): yield sid, fl, has_top.get(sid, False) def parse_session_records(files): """Parse a session's files into per-record contributions, deduped WITHIN the session. assistant keyed by requestId (keep max output_tokens); user prompts keyed by uuid.""" asst = {} # rid -> {in,out,cr,cc,model,tools,date} tool_ids = {} # rid -> {block_id: tool_name} — unioned across the rid's streaming records users = {} # uuid -> {words,chars,date,hour,weekday} events = [] # [datetime] for work-session timeline compactions = 0 # count of isCompactSummary events (context-hygiene → RCL) date_min = date_max = None for f in files: try: sz = os.path.getsize(f) except OSError: sz = 0 if sz > MAX_TRANSCRIPT_BYTES: warn("skipping oversized transcript %s (%d bytes)" % (f, sz)) continue try: fh = open(f, errors="replace") except OSError as e: warn("cannot open %s (%s)" % (f, e)) continue with fh: for line in fh: if len(line) > MAX_LINE_BYTES: continue line = line.strip() if not line: continue try: r = json.loads(line) except json.JSONDecodeError: continue dt = parse_ts(r.get("timestamp")) if dt is not None: events.append(dt) d = local_date_str(dt) if date_min is None or d < date_min: date_min = d if date_max is None or d > date_max: date_max = d rtype = r.get("type") if rtype == "assistant": msg = r.get("message") or {} model = msg.get("model") if model == "": continue usage = msg.get("usage") or {} out = int(usage.get("output_tokens") or 0) rid = r.get("requestId") or r.get("uuid") if rid is None: continue # Each tool_use block arrives as its OWN streaming record (same requestId, # same cumulative output_tokens): union tool blocks across ALL of the rid's # records (deduped by block id); token fields come from the max-output record. content = msg.get("content") if isinstance(content, list): rtools = tool_ids.setdefault(rid, {}) for b in content: if isinstance(b, dict) and b.get("type") == "tool_use": rtools.setdefault(b.get("id") or object(), b.get("name") or "unknown") prev = asst.get(rid) if prev is None or out > prev["out"]: ldt = dt.astimezone(TZ) if dt else None asst[rid] = {"in": int(usage.get("input_tokens") or 0), "out": out, "cr": int(usage.get("cache_read_input_tokens") or 0), "cc": int(usage.get("cache_creation_input_tokens") or 0), "model": model or "unknown", "date": ldt.date().isoformat() if ldt else None, "datehour": ldt.strftime("%Y-%m-%dT%H") if ldt else None} elif rtype == "user": if r.get("isSidechain") is True or r.get("isMeta") is True: continue if "toolUseResult" in r or "sourceToolAssistantUUID" in r: continue # Compaction summaries are auto-generated (isCompactSummary=true), not typed input: # banked as a context-hygiene signal, skipped from word/char/prompt counts. if r.get("isCompactSummary") is True: compactions += 1 continue uuid = r.get("uuid") if not uuid or uuid in users: continue msg = r.get("message") or {} content = msg.get("content") if isinstance(content, str): text = content elif isinstance(content, list): text = " ".join(b.get("text", "") for b in content if isinstance(b, dict) and b.get("type") == "text") else: text = "" stripped = text.strip() if not stripped or stripped.startswith("<"): continue if len(MD_HEADER_RE.findall(text)) + len(MD_BOLD_RE.findall(text)) >= MD_DROP_MIN: continue clean = URL_RE.sub("", text) typed = INLINE_CODE_RE.sub("", FENCE_RE.sub("", clean)) # drop pasted code ldt = dt.astimezone(TZ) if dt else None users[uuid] = {"words": len(clean.split()), "chars": len(clean), "chars_typed": len(typed), "date": ldt.date().isoformat() if ldt else None, "datehour": ldt.strftime("%Y-%m-%dT%H") if ldt else None, "hour": ldt.hour if ldt else None, "weekday": ldt.weekday() if ldt else None} # Fold each requestId's unioned tool_use blocks onto its kept (max-output) record. for rid, rec in asst.items(): rec["tools"] = list(tool_ids.get(rid, {}).values()) return {"asst": asst, "users": users, "events": events, "compactions": compactions, "date_min": date_min, "date_max": date_max} def metrics_from_records(asst, users, events, date_min, date_max): """Aggregate one session's OWNED records into the metrics dict.""" tin = tout = tcr = tcc = 0 context_peak = 0 # high-water non-cached input_tokens on a real (out>0) turn tool_counts = Counter() model_counts = Counter() model_tokens = defaultdict(lambda: {"input": 0, "output": 0, "cache_read": 0, "cache_create": 0, "turns": 0}) daily_tokens = Counter() daily_io = Counter() # per-day input+output (cache-free) daily_agents = Counter() # per-day Agent (subagent) launches # per-hour series (local-tz "YYYY-MM-DDTHH") → {t:total, i:input, o:output, p:prompts, w:words}; # feeds the weekday×hour matrix + per-day words in combine(). hourly = defaultdict(lambda: {"t": 0, "i": 0, "o": 0, "p": 0, "w": 0}) for rec in asst.values(): if rec["out"] == 0: continue tin += rec["in"]; tout += rec["out"]; tcr += rec["cr"]; tcc += rec["cc"] if rec["in"] > context_peak: context_peak = rec["in"] mt = model_tokens[rec["model"]] mt["input"] += rec["in"]; mt["output"] += rec["out"] mt["cache_read"] += rec["cr"]; mt["cache_create"] += rec["cc"]; mt["turns"] += 1 model_counts[rec["model"]] += 1 for tn in rec["tools"]: tool_counts[tn] += 1 rec_total = rec["in"] + rec["out"] + rec["cr"] + rec["cc"] if rec["date"]: daily_tokens[rec["date"]] += rec_total daily_io[rec["date"]] += rec["in"] + rec["out"] daily_agents[rec["date"]] += sum(1 for tn in rec["tools"] if tn == "Agent") if rec.get("datehour"): h = hourly[rec["datehour"]]; h["t"] += rec_total; h["i"] += rec["in"]; h["o"] += rec["out"] user_words = user_chars = user_chars_typed = user_prompts = 0 daily_prompts = Counter() hours = [0] * 24 weekdays = [0] * 7 for u in users.values(): user_words += u["words"]; user_chars += u["chars"] user_chars_typed += u["chars_typed"]; user_prompts += 1 if u["date"]: daily_prompts[u["date"]] += 1 if u.get("datehour"): h = hourly[u["datehour"]]; h["p"] += 1; h["w"] += u["words"] if u["hour"] is not None: hours[u["hour"]] += 1 if u["weekday"] is not None: weekdays[u["weekday"]] += 1 events = sorted(events) work_sessions = 0 total_active_min = 0.0 longest_session_min = 0.0 # (start, end) of each work-session span: combine() takes the UNION across sessions so # concurrent sessions never double-count active wall-clock time. spans = [] first = prev = None for ts in events: if prev is None: first = prev = ts elif ts - prev > WORK_SESSION_GAP: dur = (prev - first).total_seconds() / 60.0 work_sessions += 1; total_active_min += dur; longest_session_min = max(longest_session_min, dur) if prev > first: spans.append((first, prev)) first = prev = ts else: prev = ts if prev is not None: dur = (prev - first).total_seconds() / 60.0 work_sessions += 1; total_active_min += dur; longest_session_min = max(longest_session_min, dur) if prev > first: spans.append((first, prev)) # per-day active minutes: each within-work-session gap, attributed to the earlier event's day. daily_active = defaultdict(float) nightowl_active_min = 0.0 # active minutes in local hours 00:00–05:59 for a, b in zip(events, events[1:]): if b - a <= WORK_SESSION_GAP: mins = (b - a).total_seconds() / 60.0 daily_active[local_date_str(a)] += mins if a.astimezone(TZ).hour < 6: nightowl_active_min += mins active_dates = [d for d, c in daily_prompts.items() if c > 0] all_dates = sorted(set(daily_prompts) | set(daily_tokens) | set(daily_active)) return { "tokens_input": tin, "tokens_output": tout, "tokens_cache_read": tcr, "tokens_cache_create": tcc, "tokens_total": tin + tout + tcr + tcc, "user_words": user_words, "user_chars": user_chars, "user_chars_typed": user_chars_typed, "user_prompts": user_prompts, "context_peak": context_peak, # max-folded (not summed) in combine() "work_sessions": work_sessions, "active_days": len(active_dates), "longest_session_min": round(longest_session_min), "total_active_min": round(total_active_min), "nightowl_active_min": round(nightowl_active_min), "active_spans": [[s.isoformat(), e.isoformat()] for s, e in spans], "tool_counts": dict(tool_counts), "model_counts": dict(model_counts), "model_tokens": {m: dict(v) for m, v in model_tokens.items()}, "daily_activity": [{"date": d, "prompts": daily_prompts.get(d, 0), "tokens": daily_tokens.get(d, 0), "tokens_io": daily_io.get(d, 0), "active_min": round(daily_active.get(d, 0)), "agents": daily_agents.get(d, 0)} for d in all_dates], "hourly": {k: dict(v) for k, v in hourly.items()}, "hours_histogram": hours, "weekdays_histogram": weekdays, "date_min": date_min, "date_max": date_max, "last_active": max(active_dates) if active_dates else date_max, } def build_session_metrics(recs, owner): """Assign an owner to each record id (existing owner wins; else smallest session key), then build per-session metrics counting only records this session owns. A record (requestId / prompt uuid) can be copied into more than one transcript by session resume — it counts once.""" pending = defaultdict(list) # unowned id -> [session keys containing it] for key, rec in recs.items(): for rid in rec["asst"]: if rid not in owner: pending[rid].append(key) for uid in rec["users"]: if uid not in owner: pending[uid].append(key) new_assign = {} for rid, keys in pending.items(): o = min(keys) owner[rid] = o new_assign[rid] = o metrics_by_key = {} for key, rec in recs.items(): oa = {rid: r for rid, r in rec["asst"].items() if owner.get(rid) == key} ou = {uid: r for uid, r in rec["users"].items() if owner.get(uid) == key} metrics_by_key[key] = metrics_from_records(oa, ou, rec["events"], rec["date_min"], rec["date_max"]) # compaction summaries are session-local artifacts — no owner dedup needed metrics_by_key[key]["compactions"] = rec.get("compactions", 0) return metrics_by_key, new_assign # --------------------------------------------------------------------------- # # Aggregation over session rows # --------------------------------------------------------------------------- # def _merge_intervals(spans): """spans: iterable of (start_dt, end_dt) aware datetimes → merged, non-overlapping, sorted.""" iv = sorted((s, e) for s, e in spans if e > s) if not iv: return [] merged = [[iv[0][0], iv[0][1]]] for s, e in iv[1:]: if s <= merged[-1][1]: if e > merged[-1][1]: merged[-1][1] = e else: merged.append([s, e]) return [(s, e) for s, e in merged] def union_active(spans): """Union of active wall-clock intervals → (total_min, {local_date: min}, nightowl_min). Overlapping/concurrent spans count ONCE; day/night edges split in UTC so DST is exact.""" total = 0.0 daily = defaultdict(float) nightowl = 0.0 for s, e in _merge_intervals(spans): total += (e - s).total_seconds() / 60.0 cur = s while cur < e: d = cur.astimezone(TZ).date() nd = d + DAY next_midnight = datetime(nd.year, nd.month, nd.day, tzinfo=TZ).astimezone(timezone.utc) night_end = datetime(d.year, d.month, d.day, 6, tzinfo=TZ).astimezone(timezone.utc) seg_end = min(e, next_midnight) daily[d.isoformat()] += (seg_end - cur).total_seconds() / 60.0 ov_end = min(seg_end, night_end) if ov_end > cur: nightowl += (ov_end - cur).total_seconds() / 60.0 cur = seg_end return total, dict(daily), nightowl def combine(rows): c = {k: 0 for k in SCALAR_KEYS} c["longest_session_min"] = 0 c["context_peak"] = 0 # max-folded — corpus-wide high-water context-fill tool = Counter(); mcount = Counter() mtok = defaultdict(lambda: {"input": 0, "output": 0, "cache_read": 0, "cache_create": 0, "turns": 0}) daily = defaultdict(lambda: {"prompts": 0, "tokens": 0, "tokens_io": 0, "active_min": 0, "words": 0, "sessions": 0, "agents": 0}) hours = [0] * 24; weekdays = [0] * 7 # weekday×hour prompt matrix (Mon=0..Sun=6 × 0..23) from each session's hourly map. weekday_hour = [[0] * 24 for _ in range(7)] date_min = date_max = last_active = None # Active-time UNION: pool spans from rows that ship them; span-less rows fall back to their # summed totals (can't be de-overlapped without the raw spans). spans_pool = [] fb_total = 0.0 fb_nightowl = 0.0 for r in rows: for k in SCALAR_KEYS: c[k] += r.get(k, 0) or 0 c["longest_session_min"] = max(c["longest_session_min"], r.get("longest_session_min", 0) or 0) c["context_peak"] = max(c["context_peak"], r.get("context_peak", 0) or 0) sp = r.get("active_spans") has_spans = bool(sp) if has_spans: for a, b in sp: sa, sb = parse_ts(a), parse_ts(b) if sa and sb and sb > sa: spans_pool.append((sa, sb)) else: fb_total += r.get("total_active_min", 0) or 0 fb_nightowl += r.get("nightowl_active_min", 0) or 0 tool.update(r.get("tool_counts", {}) or {}) mcount.update(r.get("model_counts", {}) or {}) for m, t in (r.get("model_tokens", {}) or {}).items(): for k in ("input", "output", "cache_read", "cache_create", "turns"): mtok[m][k] += t.get(k, 0) or 0 for row in (r.get("daily_activity", []) or []): cell = daily[row["date"]] cell["prompts"] += row.get("prompts", 0) or 0 cell["tokens"] += row.get("tokens", 0) or 0 cell["tokens_io"] += row.get("tokens_io", 0) or 0 cell["agents"] += row.get("agents", 0) or 0 if not has_spans: # span rows get their per-day active_min cell["active_min"] += row.get("active_min", 0) or 0 # from the union pass below cell["sessions"] += 1 # one daily_activity row per active day per session for i, v in enumerate(r.get("hours_histogram", []) or []): hours[i] += v for i, v in enumerate(r.get("weekdays_histogram", []) or []): weekdays[i] += v for k, v in (r.get("hourly", {}) or {}).items(): try: wd = date.fromisoformat(k[:10]).weekday(); hr = int(k[11:13]) except (ValueError, IndexError): continue if 0 <= wd < 7 and 0 <= hr < 24: weekday_hour[wd][hr] += v.get("p", 0) or 0 # per-day words from the same hourly map → the AVATAR today-line WORDS stat daily[k[:10]]["words"] += v.get("w", 0) or 0 dm, dx, la = r.get("date_min"), r.get("date_max"), r.get("last_active") if dm and (date_min is None or dm < date_min): date_min = dm if dx and (date_max is None or dx > date_max): date_max = dx if la and (last_active is None or la > last_active): last_active = la c["tokens_total"] = c["tokens_input"] + c["tokens_output"] + c["tokens_cache_read"] + c["tokens_cache_create"] c["tool_counts"] = dict(tool); c["model_counts"] = dict(mcount) c["model_tokens"] = {m: dict(v) for m, v in mtok.items()} # Active time: UNION of span rows + summed fallback for span-less rows. The pure sum stays as # `active_min_summed` — the mean basis for avg_session_min (concurrency irrelevant there). c["active_min_summed"] = c["total_active_min"] u_total, u_daily, u_night = union_active(spans_pool) c["total_active_min"] = round(u_total + fb_total) c["nightowl_active_min"] = round(u_night + fb_nightowl) for ds, mins in u_daily.items(): daily[ds]["active_min"] += round(mins) c["daily"] = dict(daily); c["hours"] = hours; c["weekdays"] = weekdays; c["weekday_hour"] = weekday_hour c["date_min"] = date_min; c["date_max"] = date_max; c["last_active"] = last_active c["active_days"] = sum(1 for v in daily.values() if v["prompts"] > 0) return c def longest_and_current_streak(active_set, corpus_end): if not active_set: return 0, 0 ordered = sorted(active_set) longest = run = 1 for i in range(1, len(ordered)): run = run + 1 if ordered[i] - ordered[i - 1] == DAY else 1 longest = max(longest, run) end = date.fromisoformat(corpus_end) anchor = end if end in active_set else (end - DAY if (end - DAY) in active_set else None) current = 0 d = anchor while d is not None and d in active_set: current += 1 d -= DAY return longest, current def aggregate(rows, pricing, pricing_date, generated_at): g = combine(rows) corpus_start = g["date_min"] or generated_at[:10] corpus_end = g["date_max"] or generated_at[:10] start_d = date.fromisoformat(corpus_start) end_d = date.fromisoformat(corpus_end) daily_activity = [] active_set = set() d = start_d while d <= end_d: ds = d.isoformat() cell = g["daily"].get(ds, {"prompts": 0, "tokens": 0, "tokens_io": 0, "active_min": 0, "words": 0, "sessions": 0, "agents": 0}) daily_activity.append({"date": ds, "prompts": cell["prompts"], "tokens": cell["tokens"], "tokens_io": cell.get("tokens_io", 0), "active_min": cell.get("active_min", 0), "words": cell.get("words", 0), "sessions": cell.get("sessions", 0), "agents": cell.get("agents", 0)}) if cell["prompts"] > 0: active_set.add(d) d += DAY longest_streak, current_streak = longest_and_current_streak(active_set, corpus_end) top_tools = [{"name": n, "count": c} for n, c in Counter(g["tool_counts"]).most_common()] tool_uses = sum(g["tool_counts"].values()) model_turns = Counter(g["model_counts"]) total_turns = sum(model_turns.values()) models = [{"name": m, "turns": c, "pct": round(c * 100.0 / total_turns, 1) if total_turns else 0.0} for m, c in model_turns.most_common()] favorite_model = model_turns.most_common(1)[0][0] if model_turns else None total_cost = cost_from_model_tokens(g["model_tokens"], pricing) # Group by project NAME only (same project from several servers is ONE project). groups = defaultdict(list) for r in rows: groups[r.get("name")].append(r) proj_rows = [] for nm, grp in groups.items(): pg = combine(grp) pc = cost_from_model_tokens(pg["model_tokens"], pricing) srv = ",".join(sorted({r.get("server") for r in grp if r.get("server")})) or None proj_rows.append({ "name": nm, "server": srv, "sessions": pg["sessions"], "work_sessions": pg["work_sessions"], "active_days": pg["active_days"], "tokens_total": pg["tokens_total"], "tokens_input": pg["tokens_input"], "tokens_output": pg["tokens_output"], "tokens_cache_read": pg["tokens_cache_read"], "tokens_cache_create": pg["tokens_cache_create"], "user_words": pg["user_words"], "user_prompts": pg["user_prompts"], "tool_uses": sum(pg["tool_counts"].values()), "agent_launches": pg["tool_counts"].get("Agent", 0), "cost_estimate_usd": round(pc["total_usd"], 2), "total_active_min": pg["total_active_min"], "longest_session_min": pg["longest_session_min"], "avg_session_min": round(pg["active_min_summed"] / pg["work_sessions"]) if pg["work_sessions"] else 0, "last_active": pg["last_active"], }) proj_rows.sort(key=lambda r: r["tokens_total"], reverse=True) cr, cc, ti = g["tokens_cache_read"], g["tokens_cache_create"], g["tokens_input"] cache_denom = cr + cc + ti return { "meta": { "schema_version": SCHEMA_VERSION, "generated_at": generated_at, "timezone": TZ_NAME, "corpus_start": corpus_start, "corpus_end": corpus_end, "corpus_days": (end_d - start_d).days + 1, "servers": sorted({r.get("server") for r in rows if r.get("server")}), }, "totals": { "sessions": g["sessions"], "work_sessions": g["work_sessions"], "active_days": len(active_set), "current_streak": current_streak, "longest_streak": longest_streak, "longest_session_min": g["longest_session_min"], "avg_session_min": round(g["active_min_summed"] / g["work_sessions"]) if g["work_sessions"] else 0, "total_active_min": g["total_active_min"], "nightowl_active_min": g["nightowl_active_min"], "user_words": g["user_words"], "user_chars": g["user_chars"], "user_chars_typed": g["user_chars_typed"], "user_prompts": g["user_prompts"], "tokens_input": ti, "tokens_output": g["tokens_output"], "tokens_cache_read": cr, "tokens_cache_create": cc, "tokens_total": g["tokens_total"], "cache_hit_ratio": round(cr / cache_denom, 4) if cache_denom else 0.0, "compactions": g["compactions"], "context_peak": g["context_peak"], "tool_uses": tool_uses, "agent_launches": g["tool_counts"].get("Agent", 0), "favorite_model": favorite_model, "peak_hour": g["hours"].index(max(g["hours"])) if any(g["hours"]) else 0, "peak_weekday": g["weekdays"].index(max(g["weekdays"])) if any(g["weekdays"]) else 0, }, "cost_estimate": { "total_usd": round(total_cost["total_usd"], 2), "input_usd": round(total_cost["input_usd"], 2), "output_usd": round(total_cost["output_usd"], 2), "cache_read_usd": round(total_cost["cache_read_usd"], 2), "cache_create_usd": round(total_cost["cache_create_usd"], 2), "pricing_date": pricing_date, "note": "Hypothetical pay-as-you-go cost. Actual billing is Max plan fixed price.", }, "histograms": {"hours": g["hours"], "weekdays": g["weekdays"], "weekday_hour": g["weekday_hour"]}, "daily_activity": daily_activity, "top_tools": top_tools, "models": models, "projects": proj_rows, } # --------------------------------------------------------------------------- # # Local ledger (SQLite) — sessions survive transcript pruning. # One row per session: each run re-parses the transcripts that still exist and # upserts their latest metrics; sessions not seen are RETAINED (alive=0). The # record_owner table pins each requestId/prompt-uuid to the FIRST session that # banked it, so a session-resume copy never double-counts — same design as the # reference extractor, minus its server-side modes. # --------------------------------------------------------------------------- # def open_ledger(path): d = os.path.dirname(path) if d: os.makedirs(d, exist_ok=True) try: os.chmod(d, 0o700) except OSError: pass con = sqlite3.connect(path) con.execute("""CREATE TABLE IF NOT EXISTS sessions( session_id TEXT PRIMARY KEY, server TEXT, username TEXT, project TEXT, first_seen TEXT, last_seen TEXT, last_active TEXT, alive INTEGER DEFAULT 1, metrics TEXT)""") con.execute("""CREATE TABLE IF NOT EXISTS record_owner( rec_id TEXT PRIMARY KEY, owner TEXT)""") con.commit() try: os.chmod(path, 0o600) # holds per-session usage detail — yours alone except OSError: pass return con def load_record_owner(con): return {rid: o for rid, o in con.execute("SELECT rec_id,owner FROM record_owner")} def persist_record_owner(con, new_assign): con.executemany("INSERT OR IGNORE INTO record_owner(rec_id,owner) VALUES(?,?)", list(new_assign.items())) def upsert_row(con, key, server, username, project, metrics, now): con.execute("""INSERT INTO sessions (session_id,server,username,project,first_seen,last_seen,last_active,alive,metrics) VALUES(?,?,?,?,?,?,?,1,?) ON CONFLICT(session_id) DO UPDATE SET server=excluded.server, username=excluded.username, project=excluded.project, last_seen=excluded.last_seen, last_active=excluded.last_active, alive=1, metrics=excluded.metrics""", (key, server, username, project, now, now, metrics.get("last_active"), json.dumps(metrics))) def mark_absent(con, server, seen_keys): absent = [r[0] for r in con.execute("SELECT session_id FROM sessions WHERE server=?", (server,)) if r[0] not in seen_keys] con.executemany("UPDATE sessions SET alive=0 WHERE session_id=?", [(k,) for k in absent]) def load_ledger_rows(con): rows = [] for sid, server, username, project, last_active, metrics in con.execute( "SELECT session_id,server,username,project,last_active,metrics FROM sessions"): m = json.loads(metrics) m["server"] = server; m["username"] = username; m["name"] = project m["last_active"] = m.get("last_active") or last_active rows.append(m) return rows # --------------------------------------------------------------------------- # # The player's corpus: ONE home (yours) # --------------------------------------------------------------------------- # def collect_home_records(home, server): """Parse the invoking user's ~/.claude/projects into per-session records. Returns (recs, meta) in the reference extractor's shape.""" home = os.path.abspath(os.path.expanduser(home)) user = os.path.basename(home.rstrip("/")) projdir = os.path.join(home, ".claude", "projects") if not os.path.isdir(projdir): sys.exit("no Claude Code data found (%s does not exist)" % projdir) name = user namefile = os.path.join(home, "projectname.txt") if os.path.isfile(namefile): try: content = open(namefile, errors="replace").read().strip() except Exception: content = "" if content and not IGNORE_RE.search(content): name = content recs, meta = {}, {} for sid, files, has_top in group_files(projdir): key = f"{server}:{sid}" recs[key] = parse_session_records(files) recs[key]["_sid"] = sid meta[key] = (server, user, name, 1 if has_top else 0) return recs, meta def build_feed(home, server, pricing_path, generated_at, ledger_path=None): pricing, pricing_date = load_pricing(pricing_path) recs, meta = collect_home_records(home, server) if ledger_path is None: # stateless: derive from surviving transcripts only metrics_by_key, _ = build_session_metrics(recs, {}) rows = [] for key, m in metrics_by_key.items(): srv, user, name, sess_flag = meta[key] m["session_id"] = recs[key]["_sid"] m["sessions"] = sess_flag m["server"] = srv; m["username"] = user; m["name"] = name rows.append(m) return aggregate(rows, pricing, pricing_date, generated_at) # ledger mode: upsert what exists now, retain what was pruned, aggregate ALL of it if os.path.exists(ledger_path): try: shutil.copy2(ledger_path, ledger_path + ".bak") # cheap one-step undo os.chmod(ledger_path + ".bak", 0o600) except OSError as e: warn("ledger backup failed (%s)" % e) con = open_ledger(ledger_path) owner = load_record_owner(con) metrics_by_key, new_assign = build_session_metrics(recs, owner) persist_record_owner(con, new_assign) seen = set() for key, m in metrics_by_key.items(): srv, user, name, sess_flag = meta[key] m["session_id"] = recs[key]["_sid"] m["sessions"] = sess_flag upsert_row(con, key, srv, user, name, m, generated_at) seen.add(key) mark_absent(con, server, seen) con.commit() rows = load_ledger_rows(con) con.close() feed = aggregate(rows, pricing, pricing_date, generated_at) restore_path = os.path.join(TQ_DIR, "restore.json") if os.path.exists(restore_path): # a --restore'd mirror feed heals a lost ledger (monotonic merge) try: feed = merge_feeds(json.load(open(restore_path)), feed) except Exception as e: warn("restore merge skipped (%s)" % e) return feed def write_json_atomic(path, obj, pretty=False): tmp = path + ".tmp" with open(tmp, "w") as fh: json.dump(obj, fh, indent=2 if pretty else None, separators=None if pretty else (",", ":")) os.replace(tmp, path) # --------------------------------------------------------------------------- # # Panel bundle: fetch-on-first-run + --update. Downloads DISPLAY code only # (checksummed against the server's version.json); sends nothing. The agent # (this file) never updates itself — that asymmetry is deliberate: the code # that reads your data stays exactly what you audited. # --------------------------------------------------------------------------- # def _http_get(url, timeout=15): import urllib.request req = urllib.request.Request(url, headers={"User-Agent": "tq.py"}) with urllib.request.urlopen(req, timeout=timeout) as resp: return resp.read() def _sha256(data): import hashlib return hashlib.sha256(data).hexdigest() def fetch_bundle(bundle_dir, base_url): """Download version.json + the bundle files, verify checksums, install atomically.""" version = json.loads(_http_get(base_url + "/get/version.json")) want = version.get("files", {}) os.makedirs(bundle_dir, exist_ok=True) for name in BUNDLE_FILES: data = _http_get(base_url + "/get/" + name) got = _sha256(data) if name in want and got != want[name]: sys.exit("checksum mismatch for %s (server said %s…, got %s…) — try again; " "if it persists, the server publish is mid-flight or tampered" % (name, want[name][:12], got[:12])) local = "index.html" if name == "panel.html" else name dest = os.path.join(bundle_dir, local) os.makedirs(os.path.dirname(dest), exist_ok=True) tmp = dest + ".tmp" with open(tmp, "wb") as fh: fh.write(data) os.replace(tmp, dest) write_json_atomic(os.path.join(bundle_dir, "version.json"), version) print("panel bundle %s installed to %s" % (version.get("published", "?"), bundle_dir)) def _http_post_json(url, obj, timeout=20): import urllib.request req = urllib.request.Request(url, data=json.dumps(obj).encode(), headers={"User-Agent": "tq.py", "Content-Type": "application/json"}) with urllib.request.urlopen(req, timeout=timeout) as resp: return json.loads(resp.read()) def load_agent_config(): """~/.tokenquest/config.json — written by --register (a later build); optional until then.""" try: return json.load(open(os.path.join(TQ_DIR, "config.json"))) except Exception: return {} def fetch_content(bundle_dir, base_url): """Pull the world slice from the content drip into bundle/data/. Registered heroes (handle + device token in config.json) get arcs up to their current act + 1; anonymous gets the public demo floor. What you send: your handle + token — never your stats or transcripts.""" cfg = load_agent_config() body = {} if cfg.get("handle") and cfg.get("device_token"): body = {"pack": cfg.get("pack", "tokens"), "handle": cfg["handle"], "device_token": cfg["device_token"]} resp = _http_post_json(base_url + "/board?action=content", body) if not resp.get("ok"): raise RuntimeError(resp.get("error", "content fetch failed")) ddir = os.path.join(bundle_dir, "data") os.makedirs(ddir, exist_ok=True) for name, obj in (resp.get("files") or {}).items(): if "/" in name or "\\" in name or not name.endswith(".json"): continue # belt: never let a server response name escape data/ write_json_atomic(os.path.join(ddir, name), obj) print("world content: arcs 1–%s of %s installed%s" % (resp.get("horizon", "?"), resp.get("of", "?"), "" if body else " (public floor — register to unlock your full road)"), flush=True) def bundle_update_hint(bundle_dir, base_url): """On --play start: quiet version check. Offline or any failure => silent (play works offline).""" try: remote = json.loads(_http_get(base_url + "/get/version.json", timeout=4)) local_path = os.path.join(bundle_dir, "version.json") local = json.load(open(local_path)) if os.path.exists(local_path) else {} if remote.get("files") != local.get("files"): print("(a newer panel bundle is available — run --update when convenient)", flush=True) except Exception: pass # --------------------------------------------------------------------------- # # The board: register / dry-run / push. The payload is PQ-thin BY CONSTRUCTION: # fourteen labeled aggregate measurements + flavor. The server derives your # stats and level from them (one formula home; updates never strand agents). # --------------------------------------------------------------------------- # def measurements_from_feed(feed): t = feed.get("totals") or {} c = feed.get("cost_estimate") or {} da = feed.get("daily_activity") or [] tools = {x.get("name"): (x.get("count") or 0) for x in (feed.get("top_tools") or [])} return { "user_prompts": t.get("user_prompts") or 0, "total_active_min": t.get("total_active_min") or 0, "avg_session_min": t.get("avg_session_min") or 0, "active_days": t.get("active_days") or 0, "corpus_days": (feed.get("meta") or {}).get("corpus_days") or 1, "tokens_total": t.get("tokens_total") or 0, "tokens_io": (t.get("tokens_input") or 0) + (t.get("tokens_output") or 0), "user_words": t.get("user_words") or 0, "user_chars_typed": t.get("user_chars_typed") or 0, "compactions": t.get("compactions") or 0, "agent_launches": t.get("agent_launches") or 0, "workflow_uses": tools.get("Workflow") or 0, "peak_day_io_tokens": max([r.get("tokens_io") or 0 for r in da] or [0]), "cost_io_usd": round((c.get("input_usd") or 0) + (c.get("output_usd") or 0), 2), } def merge_feeds(base, cur): """Best-effort monotonic merge of a restored mirror feed (base) into the freshly derived feed (cur) — the reinstall healer. Exact reconstruction is impossible without the raw ledger, but every game-relevant number is a monotonic counter, so per-key max() never overstates either side. Means (avg_session_min) take max too — a deliberate, mild bias toward the hero's best recorded self.""" out = json.loads(json.dumps(cur)) for blk in ("totals", "cost_estimate"): b, o = base.get(blk) or {}, out.setdefault(blk, {}) for k, v in b.items(): if isinstance(v, (int, float)) and isinstance(o.get(k, 0), (int, float)): o[k] = max(o.get(k, 0) or 0, v) elif k not in o: o[k] = v bydate = {r["date"]: dict(r) for r in (base.get("daily_activity") or []) if r.get("date")} days = out.get("daily_activity") or [] for r in days: b = bydate.pop(r.get("date"), None) if b: for k, v in b.items(): if isinstance(v, (int, float)): r[k] = max(r.get(k, 0) or 0, v) days += list(bydate.values()) days.sort(key=lambda r: r.get("date", "")) if days: # re-fill the contiguous span (a lost-ledger gap otherwise breaks the montage walk) from datetime import date as _date filled, d = [], _date.fromisoformat(days[0]["date"]) have = {r["date"]: r for r in days} end = _date.fromisoformat(days[-1]["date"]) while d <= end: ds = d.isoformat() filled.append(have.get(ds) or {"date": ds, "prompts": 0, "tokens": 0, "tokens_io": 0, "active_min": 0, "words": 0, "sessions": 0, "agents": 0}) d += timedelta(days=1) out["daily_activity"] = filled bh, oh = base.get("histograms") or {}, out.setdefault("histograms", {}) for k in ("hours", "weekdays"): bl = bh.get(k) or [] if bl: ol = oh.get(k) or [] n = max(len(bl), len(ol)) oh[k] = [max(ol[i] if i < len(ol) else 0, bl[i] if i < len(bl) else 0) for i in range(n)] for blk, key, cnt in (("top_tools", "name", "count"), ("models", "name", "turns")): bmap = {t[key]: t.get(cnt, 0) for t in (base.get(blk) or []) if t.get(key)} merged = {} for t in (out.get(blk) or []): merged[t[key]] = max(t.get(cnt, 0) or 0, bmap.pop(t[key], 0)) merged.update(bmap) rows = [{key: n, cnt: c} for n, c in sorted(merged.items(), key=lambda x: -x[1])] if blk == "models": tot = sum(merged.values()) or 1 for r in rows: r["pct"] = round(r["turns"] * 100.0 / tot, 1) out[blk] = rows bm, om = base.get("meta") or {}, out.setdefault("meta", {}) starts = [x for x in (bm.get("corpus_start"), om.get("corpus_start")) if x] ends = [x for x in (bm.get("corpus_end"), om.get("corpus_end")) if x] if starts: om["corpus_start"] = min(starts) if ends: om["corpus_end"] = max(ends) if starts and ends: om["corpus_days"] = (date.fromisoformat(om["corpus_end"]) - date.fromisoformat(om["corpus_start"])).days + 1 return out def scrub_feed_for_mirror(feed): """The mirror scrub: drop the identifier-leak vectors, keep the game. - projects (client/employer names; the panel never reads it) - histograms.weekday_hour (finest-grained schedule matrix; panel-unused) - mcp__* tool names except mcp__chrome-devtools__* (may carry private tooling names) — folded into one "Other" count Everything else uploads whole — deliberately: cost is a bragging stat, and the full daily series doubles as YOUR history escrow (it survives a lost local ledger). The server re-enforces this same scrub on arrival (belt).""" f = json.loads(json.dumps(feed)) f.pop("projects", None) (f.get("histograms") or {}).pop("weekday_hour", None) tools, other = [], 0 for t in (f.get("top_tools") or []): n = str(t.get("name") or "") if n.startswith("mcp__") and not n.startswith("mcp__chrome-devtools__"): other += int(t.get("count") or 0) else: tools.append(t) if other: tools.append({"name": "Other", "count": other}) f["top_tools"] = tools return f def board_xp_floor(base_url, cfg): """The hero's board xp, reconstructed from the public standing (level + frac). Used as a LOCAL floor so a lost ledger never shows a demoted hero — bars only fill. Returns 0 when unregistered/offline/unknown.""" if not cfg.get("handle"): return 0.0 try: import urllib.parse q = urllib.parse.urlencode({"action": "get", "pack": cfg.get("pack", "tokens"), "handle": cfg["handle"]}) r = json.loads(_http_get(base_url + "/board?" + q, timeout=6)) hero = r.get("hero") or {} lvl = int(hero.get("level") or 0) frac = float(hero.get("frac") or 0.0) return lvl * lvl + frac * ((lvl + 1) ** 2 - lvl * lvl) if lvl else 0.0 except Exception: return 0.0 def _fmt_tokens(n): if n >= 1e6: return "%.1fM" % (n / 1e6) if n >= 1e3: return "%.0fk" % (n / 1e3) return str(int(n or 0)) def snapshot_from_feed(feed): m = measurements_from_feed(feed) hrs = int(m["total_active_min"] / 60 + 0.5) motto = "%dh adventured · %s tokens slain" % (hrs, _fmt_tokens(m["tokens_io"])) return {"measurements": m, "motto": motto, "avatar": "⚔"} # ⚔ def save_agent_config(cfg): os.makedirs(TQ_DIR, exist_ok=True) try: os.chmod(TQ_DIR, 0o700) except OSError: pass path = os.path.join(TQ_DIR, "config.json") write_json_atomic(path, cfg, pretty=True) os.chmod(path, 0o600) def run_register(args): if not args.handle: sys.exit("--register needs --handle (immutable, first-claim-wins)") import secrets passphrase = "TQ-" + secrets.token_hex(8) r = _http_post_json(args.base_url + "/board?action=register", {"pack": args.pack, "handle": args.handle, "passphrase": passphrase}) if not r.get("ok"): sys.exit("register failed: %s" % r.get("error")) t = _http_post_json(args.base_url + "/board?action=issue_token", {"pack": args.pack, "handle": args.handle, "passphrase": passphrase}) if not t.get("ok"): sys.exit("registered, but token mint failed: %s — retry with issue_token" % t.get("error")) save_agent_config({"pack": args.pack, "handle": args.handle, "device_token": t["device_token"]}) print("hero claimed: %s (pack %s)" % (args.handle, args.pack)) print() print(" PASSPHRASE (shown ONCE, never stored — write it down; it is the only") print(" way to re-claim your hero on a new machine or revoke a device):") print() print(" %s" % passphrase) print() print("everyday pushes use a device token, saved in %s (mode 600)." % os.path.join(TQ_DIR, "config.json")) print("see exactly what a push sends: python3 %s --dry-run" % os.path.abspath(sys.argv[0])) print("enter the Halls: python3 %s --push" % os.path.abspath(sys.argv[0])) print("stay fresh while you work — add this line yourself (crontab -e); we never install it:") print(" */5 * * * * /usr/bin/python3 %s --push >> %s/push.log 2>&1" % (os.path.abspath(sys.argv[0]), TQ_DIR)) def run_dry_run(args, ledger_path): generated_at = datetime.now(TZ).isoformat(timespec="seconds") feed = build_feed(args.home, args.server, args.pricing, generated_at, ledger_path) snap = snapshot_from_feed(feed) print("This — and ONLY this — is what --push sends to %s:" % BASE_URL) print() json.dump(snap, sys.stdout, indent=2, ensure_ascii=False) print() print() print("Fourteen aggregate measurements + a motto + an avatar. No project names, no") print("histograms, no daily series, no text you ever typed. The server derives your") print("stats and level from the measurements. Nothing was sent just now.") cfg = load_agent_config() if cfg.get("mirror"): raw = feed scrubbed = scrub_feed_for_mirror(feed) dropped_tools = len(raw.get("top_tools") or []) - len([t for t in (scrubbed.get("top_tools") or []) if t.get("name") != "Other"]) print() print("Your mirror is ON, so a push ALSO uploads your panel feed (%.1f KB), scrubbed:" % (len(json.dumps(scrubbed)) / 1024.0)) print(" dropped: projects (%d rows) · histograms.weekday_hour · %d mcp tool name(s) → \"Other\"" % (len(raw.get("projects") or []), dropped_tools)) print(" kept: totals, cost estimate, daily activity (your history escrow), hour/weekday") print(" histograms, built-in tool counts, models. View key: unlisted, rotatable.") def push_once(args, ledger_path, cfg, quiet=False): generated_at = datetime.now(TZ).isoformat(timespec="seconds") feed = build_feed(args.home, args.server, args.pricing, generated_at, ledger_path) r = _http_post_json(args.base_url + "/board?action=submit", {"pack": cfg.get("pack", "tokens"), "handle": cfg["handle"], "device_token": cfg["device_token"], "snapshot": snapshot_from_feed(feed)}) if not r.get("ok"): raise RuntimeError("push failed: %s" % r.get("error")) msg = ("push ok — %s stands at rank %s (renown %s) · %s/halls" % (cfg["handle"], r.get("rank", "?"), r.get("renown", "?"), args.base_url)) if cfg.get("mirror"): m = _http_post_json(args.base_url + "/board?action=mirror_push", {"pack": cfg.get("pack", "tokens"), "handle": cfg["handle"], "device_token": cfg["device_token"], "feed": scrub_feed_for_mirror(feed)}) msg += " · mirror %s" % ("updated (%.1f KB)" % (m.get("bytes", 0) / 1024.0) if m.get("ok") else "FAILED: %s" % m.get("error")) if not quiet: print(msg, flush=True) return msg def run_push(args, ledger_path): cfg = load_agent_config() if not (cfg.get("handle") and cfg.get("device_token")): sys.exit("no hero yet — run --register --handle first") try: push_once(args, ledger_path, cfg) except RuntimeError as e: sys.exit(str(e)) def run_restore(args, ledger_path, then_push=True): """Pull YOUR OWN mirrored feed and keep it as a local baseline (restore.json) — the reinstall healer. Run it BEFORE the first post-reinstall push so the escrowed history survives; the next push then re-fills the mirror with the merged feed.""" cfg = load_agent_config() if not (cfg.get("handle") and cfg.get("device_token")): sys.exit("no hero yet — run --reclaim --handle first") if not cfg.get("mirror"): r = _http_post_json(args.base_url + "/board?action=mirror_on", {"pack": cfg.get("pack", "tokens"), "handle": cfg["handle"], "device_token": cfg["device_token"]}) if not r.get("ok"): sys.exit("no mirror to restore from (%s)" % r.get("error")) cfg["mirror"] = {"share_key": r["share_key"], "url": r["url"]} save_agent_config(cfg) import urllib.parse q = urllib.parse.urlencode({"action": "mirror_feed", "pack": cfg.get("pack", "tokens"), "handle": cfg["handle"], "token": cfg["mirror"]["share_key"]}) try: base = json.loads(_http_get(args.base_url + "/board?" + q)) except Exception as e: sys.exit("restore failed: %s (no mirror data yet?)" % e) write_json_atomic(os.path.join(TQ_DIR, "restore.json"), base) days = len(base.get("daily_activity") or []) print("restored your mirrored history: %d day(s) · %s tokens — merged into every feed from now on" % (days, _fmt_tokens((base.get("totals") or {}).get("tokens_total") or 0))) if then_push: try: push_once(args, ledger_path, load_agent_config()) except RuntimeError as e: warn("post-restore push failed (%s) — run --push when convenient" % e) def run_reclaim(args, ledger_path): """Re-claim your hero on a fresh machine/install: passphrase → new device token → (if you had a mirror) restore your escrowed history, then push. The passphrase is the one --register printed; the old device's token is rotated out.""" if not args.handle: sys.exit("--reclaim needs --handle ") pw = args.passphrase if not pw: import getpass pw = getpass.getpass("passphrase for %s (from your original --register): " % args.handle) r = _http_post_json(args.base_url + "/board?action=issue_token", {"pack": args.pack, "handle": args.handle, "passphrase": pw}) if not r.get("ok"): sys.exit("reclaim failed: %s" % r.get("error")) save_agent_config({"pack": args.pack, "handle": args.handle, "device_token": r["device_token"]}) print("hero reclaimed: %s (a fresh device token is saved; the old device is signed out)" % args.handle) try: g = json.loads(_http_get(args.base_url + "/board?action=get&pack=%s&handle=%s" % (args.pack, args.handle.replace(" ", "%20")))) except Exception: g = {} if g.get("mirror"): print("you had a mirror — restoring your escrowed history…") run_restore(args, ledger_path, then_push=True) else: print("level walks back in from the board on your next --push (the progression floor).") def run_mirror(args, ledger_path): cfg = load_agent_config() if args.mirror == "status": m = cfg.get("mirror") print("mirror: %s" % (("on — " + m.get("url", "?")) if m else "off")) return if not (cfg.get("handle") and cfg.get("device_token")): sys.exit("no hero yet — run --register --handle first") auth = {"pack": cfg.get("pack", "tokens"), "handle": cfg["handle"], "device_token": cfg["device_token"]} if args.mirror == "off": r = _http_post_json(args.base_url + "/board?action=mirror_off", auth) if not r.get("ok"): sys.exit("mirror off failed: %s" % r.get("error")) cfg.pop("mirror", None) save_agent_config(cfg) print("mirror off — the stored feed is deleted; your panel is local-only again") return action = "mirror_rotate" if args.mirror == "rotate" else "mirror_on" r = _http_post_json(args.base_url + "/board?" + "action=" + action, auth) if not r.get("ok"): sys.exit("mirror %s failed: %s" % (args.mirror, r.get("error"))) cfg["mirror"] = {"share_key": r["share_key"], "url": r["url"]} save_agent_config(cfg) print("mirror %s — your panel, viewable anywhere you share this link:" % args.mirror) print(" %s" % r["url"]) print("(unlisted: the key IS the visibility — rotate it with --mirror rotate if a link escapes)") try: push_once(args, ledger_path, cfg) # fill it right away so the link works now except RuntimeError as e: warn("first mirror fill failed (%s) — the next --push fills it" % e) # --------------------------------------------------------------------------- # # --play: the local game server. Loopback only. Serves the panel shell + your # locally-derived feed; the panel runs sandboxed in your browser and talks only # to this server. Two shims keep the stock panel happy with zero panel edits: # * any ?token= value is accepted (the panel just requires one in its URL) # * live-status emits a reference-shaped user entry keyed by YOUR username # --------------------------------------------------------------------------- # FEED_TTL_SEC = 60 # re-derive the stats feed at most this often WORKING_WINDOW_SEC = 120 # newest transcript mtime younger than this => "working" MIME = {".html": "text/html; charset=utf-8", ".json": "application/json", ".js": "text/javascript", ".css": "text/css", ".svg": "image/svg+xml", ".png": "image/png", ".ico": "image/x-icon", ".txt": "text/plain"} class GameState: """Feed cache + working/idle tracker shared across request threads.""" def __init__(self, home, server, pricing_path, ledger_path): self.home = os.path.abspath(os.path.expanduser(home)) self.server = server self.pricing_path = pricing_path self.ledger_path = ledger_path self.lock = threading.Lock() self.feed_bytes = None self.feed_at = 0.0 self.working = False self.since = datetime.now(TZ).isoformat(timespec="seconds") self.xp_floor = 0.0 # board xp (progression durability) — injected into served feeds def feed(self): now = datetime.now(timezone.utc).timestamp() with self.lock: if self.feed_bytes is None or now - self.feed_at > FEED_TTL_SEC: generated_at = datetime.now(TZ).isoformat(timespec="seconds") obj = build_feed(self.home, self.server, self.pricing_path, generated_at, self.ledger_path) if self.xp_floor > 0: (obj.setdefault("totals", {}))["xp_floor"] = self.xp_floor self.feed_bytes = json.dumps(obj, separators=(",", ":")).encode() self.feed_at = now return self.feed_bytes def live_status(self): user = os.path.basename(self.home.rstrip("/")) projdir = os.path.join(self.home, ".claude", "projects") newest = 0.0 for root, _dirs, files in os.walk(projdir): for f in files: if f.endswith(".jsonl"): try: newest = max(newest, os.path.getmtime(os.path.join(root, f))) except OSError: pass working = (datetime.now(timezone.utc).timestamp() - newest) <= WORKING_WINDOW_SEC with self.lock: if working and not self.working: self.since = datetime.now(TZ).isoformat(timespec="seconds") self.working = working since = self.since st = "working" if working else "idle" # key + fields shaped like the reference live monitor; the panel falls back to the # feed's first user when no "b4e" key (the house install) is present return json.dumps({"status": st, "server": self.server, "users": {self.server + ":" + user: {"server": self.server, "user": user, "status": st, "act_status": st, "since": since}}}, separators=(",", ":")).encode() def make_handler(state, panel_dir, base_path=""): panel_root = os.path.realpath(panel_dir) bp = ("/" + base_path.strip("/")) if base_path.strip("/") else "" class Handler(BaseHTTPRequestHandler): def log_message(self, fmt, *args): # quiet by default pass def _send(self, code, body, ctype="application/json", cache="no-store"): self.send_response(code) self.send_header("Content-Type", ctype) self.send_header("Content-Length", str(len(body))) self.send_header("Cache-Control", cache) self.end_headers() self.wfile.write(body) def do_GET(self): path = self.path.split("?", 1)[0] # any ?token= is fine here — it's your box try: if path == "/tq/claude-stats.json": return self._send(200, state.feed()) if path == "/tq/live-status.json": return self._send(200, state.live_status()) if path.startswith("/tq/"): return self._send(404, b'{"error":"not served by the lite agent"}') if path in ("/", "/index.html"): path = "/index.html" full = os.path.realpath(os.path.join(panel_root, path.lstrip("/"))) if full != panel_root and not full.startswith(panel_root + os.sep): return self._send(403, b'{"error":"forbidden"}') if not os.path.isfile(full): return self._send(404, b'{"error":"not found"}') with open(full, "rb") as fh: body = fh.read() ctype = MIME.get(os.path.splitext(full)[1].lower(), "application/octet-stream") if bp and full.endswith(".html"): # behind a reverse proxy at a sub-path (--base-path): re-root the panel's # absolute fetches so they stay inside the proxied prefix body = body.replace(b'"/tq/', b'"%s/tq/' % bp.encode()) body = body.replace(b'"/character-page.html', b'"%s/character-page.html' % bp.encode()) return self._send(200, body, ctype, cache="max-age=60") except (OSError, ValueError) as e: warn("serve error %s (%s)" % (self.path, e)) try: self._send(500, b'{"error":"internal"}') except OSError: pass return Handler def run_play(args): panel_dir = os.path.expanduser(args.panel_dir or os.path.join(TQ_DIR, "bundle")) if not args.panel_dir: # default bundle: fetch on first run, else quiet update hint if not os.path.isfile(os.path.join(panel_dir, "index.html")): print("no panel bundle yet — fetching it (display code only; sends nothing)", flush=True) fetch_bundle(panel_dir, args.base_url) else: bundle_update_hint(panel_dir, args.base_url) try: # world refresh; offline keeps the cached slice fetch_content(panel_dir, args.base_url) except Exception as e: warn("world refresh skipped (%s) — playing from the cached slice" % e) if not os.path.isfile(os.path.join(panel_dir, "index.html")): sys.exit("no panel at %s" % panel_dir) ledger_path = None if args.no_ledger else (args.ledger or os.path.join(TQ_DIR, "ledger.db")) state = GameState(args.home, args.server, args.pricing, ledger_path) cfg = load_agent_config() registered = bool(cfg.get("handle") and cfg.get("device_token")) state.xp_floor = board_xp_floor(args.base_url, cfg) if registered else 0.0 srv = ThreadingHTTPServer(("127.0.0.1", args.port), make_handler(state, panel_dir, args.base_path)) print("TokenQuest is live on your box: http://127.0.0.1:%d/?token=local" % args.port, flush=True) if registered and not args.no_push: def _pusher(): while True: try: push_once(args, ledger_path, cfg, quiet=True) except Exception as e: warn("background push skipped (%s)" % e) for _ in range(300): import time time.sleep(1) threading.Thread(target=_pusher, daemon=True).start() print("board push: every 5 min while playing, as %s%s (--no-push to disable)" % (cfg["handle"], " + mirror" if cfg.get("mirror") else ""), flush=True) elif registered: print("board push: OFF (--no-push)", flush=True) print("(local only otherwise — nothing else is sent; Ctrl-C stops it)", flush=True) try: srv.serve_forever() except KeyboardInterrupt: print("\nstopped") def main(): global TZ, TZ_NAME ap = argparse.ArgumentParser(description="TokenQuest lite agent (steps 1-2: local feed + local play)") ap.add_argument("--feed", action="store_true", help="derive the local claude-stats feed") ap.add_argument("--play", action="store_true", help="serve the game locally (127.0.0.1)") ap.add_argument("--update", action="store_true", help="fetch/refresh the panel bundle") ap.add_argument("--register", action="store_true", help="claim your hero on the board") ap.add_argument("--dry-run", action="store_true", help="print EXACTLY what --push would send; send nothing") ap.add_argument("--push", action="store_true", help="push your standing to the board") ap.add_argument("--handle", default=None, help="--register: your hero name (immutable)") ap.add_argument("--pack", default="tokens", help="--register: source pack (default tokens)") ap.add_argument("--mirror", choices=["on", "off", "rotate", "status"], default=None, help="opt-in panel mirror: view your panel anywhere via an unlisted rotatable link") ap.add_argument("--no-push", action="store_true", help="--play: don't push to the board while playing") ap.add_argument("--reclaim", action="store_true", help="re-claim your hero on a fresh install (passphrase); restores mirror history") ap.add_argument("--restore", action="store_true", help="pull your own mirrored history as a local baseline (the reinstall healer)") ap.add_argument("--passphrase", default=None, help="--reclaim: passphrase (prompted if omitted)") ap.add_argument("--base-url", default=BASE_URL, help=argparse.SUPPRESS) # dev/test hook ap.add_argument("--output", help="--feed: write the feed here (default: stdout)") ap.add_argument("--pretty", action="store_true") ap.add_argument("--home", default="~", help="home dir holding .claude (default: yours)") ap.add_argument("--server", default="local", help="label for this machine in the feed") ap.add_argument("--tz", default=None, help="IANA timezone (default: this machine's)") ap.add_argument("--pricing", default=None, help="pricing.json override (default: embedded table)") ap.add_argument("--ledger", default=None, help="ledger path (default: %s; sessions survive transcript pruning)" % os.path.join(TQ_DIR, "ledger.db")) ap.add_argument("--no-ledger", action="store_true", help="stateless: surviving transcripts only") ap.add_argument("--port", type=int, default=8123, help="--play: port on 127.0.0.1") ap.add_argument("--panel-dir", default=None, help="--play: dir holding the panel (until --update exists)") ap.add_argument("--base-path", default="", help="--play: sub-path when behind a reverse proxy (e.g. /lite)") ap.add_argument("--generated-at", default=None, help=argparse.SUPPRESS) # test harness hook args = ap.parse_args() TZ, TZ_NAME = resolve_tz(args.tz) ledger_path = None if args.no_ledger else (args.ledger or os.path.join(TQ_DIR, "ledger.db")) if args.update: bundle_dir = os.path.join(TQ_DIR, "bundle") fetch_bundle(bundle_dir, args.base_url) return fetch_content(bundle_dir, args.base_url) if args.register: return run_register(args) if args.reclaim: return run_reclaim(args, ledger_path) if args.restore: return run_restore(args, ledger_path) if args.mirror: return run_mirror(args, ledger_path) if args.dry_run: return run_dry_run(args, ledger_path) if args.push: return run_push(args, ledger_path) if args.play: return run_play(args) if not args.feed: ap.error("pick a mode: --feed / --play / --update / --register / --dry-run / --push / --mirror") generated_at = args.generated_at or datetime.now(TZ).isoformat(timespec="seconds") feed = build_feed(args.home, args.server, args.pricing, generated_at, ledger_path) if args.output: write_json_atomic(args.output, feed, args.pretty) print("wrote %s" % args.output, file=sys.stderr) else: json.dump(feed, sys.stdout, indent=2 if args.pretty else None, separators=None if args.pretty else (",", ":")) sys.stdout.write("\n") if __name__ == "__main__": main()