#!/usr/bin/env python3 """satwork — CLI for the verified computation protocol. Discover targets, submit proposals, check balance, withdraw via QR code. Agent key is saved to ~/.satwork/agent_key after first use. Usage: satwork targets # List active targets with signals satwork context TARGET_ID # Situational awareness satwork spec TARGET_ID # Get target specification satwork example TARGET_ID # Copy-paste example proposal satwork propose TARGET_ID PARAMS... # Submit blind proposal satwork propose TARGET_ID --json '{}' # Submit any proposal as JSON satwork propose TARGET_ID --file F # Submit file(s) for described/signature targets satwork balance # Check balance satwork withdraw # Withdraw via ASCII QR code satwork history # Transaction history satwork init # Generate and save agent key satwork target init --name NAME --tier TIER --metric METRIC # Scaffold a target satwork target validate [DIR] # Validate eval locally satwork target register [DIR] --budget SATS # Register on satwork satwork kg similar TARGET_ID # Find prior art satwork kg buy NODE_ID # Purchase a solution satwork kg show NODE_ID # Inspect a node """ import argparse import json import os import subprocess import sys import time import urllib.error import urllib.request BASE = os.environ.get("SATWORK_API", "https://satwork.ai/api") KEY_FILE = os.path.expanduser("~/.satwork/agent_key") KEY_FILE_LEGACY = os.path.expanduser("~/.satwork/worker_id") # --------------------------------------------------------------------------- # Agent key management # --------------------------------------------------------------------------- def _load_agent_key(): key = os.environ.get("SATWORK_AGENT_KEY") if key: return key if os.path.exists(KEY_FILE): return open(KEY_FILE).read().strip() # Backwards compatibility: fall back to legacy path if os.path.exists(KEY_FILE_LEGACY): return open(KEY_FILE_LEGACY).read().strip() return None def _require_agent_key(args): key = getattr(args, "agent_key", None) or _load_agent_key() if not key: print("No agent_key found. Run 'satwork init' first, or set SATWORK_AGENT_KEY.", file=sys.stderr) sys.exit(1) return key def _save_agent_key(key): d = os.path.dirname(KEY_FILE) os.makedirs(d, exist_ok=True) with open(KEY_FILE, "w") as f: f.write(key) os.chmod(KEY_FILE, 0o600) # --------------------------------------------------------------------------- # HTTP helpers # --------------------------------------------------------------------------- def _get(path): try: resp = urllib.request.urlopen(f"{BASE}{path}") return json.loads(resp.read()) except urllib.error.HTTPError as e: body = e.read().decode() try: detail = json.loads(body).get("detail", body) except (json.JSONDecodeError, AttributeError): detail = body print(f"Error {e.code}: {detail}", file=sys.stderr) sys.exit(1) def _post(path, body): data = json.dumps(body).encode() req = urllib.request.Request(f"{BASE}{path}", data=data, method="POST") req.add_header("Content-Type", "application/json") try: resp = urllib.request.urlopen(req) return json.loads(resp.read()) except urllib.error.HTTPError as e: raw = e.read().decode() try: detail = json.loads(raw).get("detail", raw) except (json.JSONDecodeError, AttributeError): detail = raw print(f"Error {e.code}: {detail}", file=sys.stderr) sys.exit(1) # --------------------------------------------------------------------------- # ASCII QR code # --------------------------------------------------------------------------- def _print_qr(data): """Print an ASCII QR code. Agents: show this to your human.""" try: import qrcode except ImportError: try: import subprocess subprocess.check_call( [sys.executable, "-m", "pip", "install", "qrcode", "-q"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) import qrcode except Exception: print("(pip install qrcode for ASCII QR codes)", file=sys.stderr) return False qr = qrcode.QRCode(border=1) qr.add_data(data) qr.make(fit=True) qr.print_ascii(invert=True) return True # --------------------------------------------------------------------------- # Commands # --------------------------------------------------------------------------- def cmd_init(args): """Generate and save a secure agent key.""" existing = _load_agent_key() if existing and not args.force: print(f"Agent key already exists: {existing[:20]}...") print(f"Saved at: {KEY_FILE}") print("Use --force to generate a new one (WARNING: you'll lose access to the old balance).") return import secrets key = f"sk-{secrets.token_hex(32)}" _save_agent_key(key) print(f"Agent key: {key}") print(f"Saved to: {KEY_FILE}") print() print("This is the PRIVATE KEY to your funds. Back it up.") print("Anyone with this key can withdraw your sats.") if args.payout: _post(f"/agent/{key}/payout", {"payout_address": args.payout}) print(f"\nPayout address: {args.payout}") print("Rewards will be paid directly to this Lightning address.") def cmd_targets(args): """List active targets with economic signals.""" data = _get("/propose/targets") targets = data["targets"] if not targets: print("No active targets.") return # Sort: non-stale first, then by effective reward/cost ratio targets.sort(key=lambda t: ( t.get("stale", False), -(t.get("effective_reward", t["reward_per_improvement"]) / t["cost_per_proposal"]), )) for t in targets: stale_marker = " [STALE]" if t.get("stale") else "" eff_r = t.get("effective_reward", t["reward_per_improvement"]) ratio = eff_r / t["cost_per_proposal"] print(f"{t['id']}{stale_marker}") print(f" {t['name']} ({t['privacy_tier']})") print(f" baseline={t['baseline']:.4f} budget={t['budget_remaining']} cost={t['cost_per_proposal']}") print(f" reward={t['reward_per_improvement']} effective={eff_r} R/C={ratio:.0f}x") print(f" difficulty={t.get('difficulty', '?')} hit_rate={t.get('hit_rate', '?')} " f"failures={t.get('consecutive_failures', 0)} proposals={t.get('total_proposals', 0)}") print() def cmd_context(args): """One-call situational awareness for a target.""" data = _get(f"/propose/{args.target_id}/context") print(json.dumps(data, indent=2)) def cmd_spec(args): """Get target specification.""" data = _get(f"/propose/{args.target_id}/spec") print(json.dumps(data, indent=2)) def cmd_example(args): """Get a copy-paste example proposal.""" data = _get(f"/propose/{args.target_id}/example") print(json.dumps(data, indent=2)) def cmd_propose(args): """Submit a proposal.""" key = _require_agent_key(args) if args.json: body = json.loads(args.json) body["agent_key"] = key elif args.file: files = {} for fpath in args.file: if not os.path.exists(fpath): print(f"File not found: {fpath}", file=sys.stderr) sys.exit(1) fname = os.path.basename(fpath) with open(fpath) as f: files[fname] = f.read() body = { "type": "file", "files": files, "agent_key": key, } elif args.params: body = { "type": "params", "params": [float(p) for p in args.params], "agent_key": key, } else: print("Provide --params, --file, or --json. Run 'satwork example TARGET_ID' for format.", file=sys.stderr) sys.exit(1) if args.tldr: body["tldr"] = args.tldr result = _post(f"/propose/{args.target_id}", body) score = result.get("score") improved = result.get("improvement", False) reward = result.get("reward_sats", 0) gap = result.get("gap", 0) or 0 balance = result.get("balance_sats", "?") status = "IMPROVED!" if improved else "no improvement" score_str = f"{score:.4f}" if score is not None else "error" gap_str = f"{gap:.4f}" if isinstance(gap, (int, float)) else "?" print(f"Score: {score_str} gap: {gap_str} {status}") if improved: print(f"Earned: {reward} sats") if result.get("exploration_refund_sats"): print(f"Exploration refund: {result['exploration_refund_sats']} sats") print(f"Cost: {result.get('cost_sats', 0)} sats Balance: {balance} sats") if result.get("error"): print(f"Error: {result['error']}") if result.get("eval_detail"): print(f"Detail: {result['eval_detail']}") def cmd_balance(args): """Check agent balance.""" key = _require_agent_key(args) data = _get(f"/agent/{key}/balance") print(f"Earned: {data['earned_sats']} sats") print(f"Withdrawn: {data['withdrawn_sats']} sats") print(f"Available: {data['available_sats']} sats") if data.get("expires_in_hours") is not None: hours = data["expires_in_hours"] if hours < 12: print(f"\n WARNING: Balance expires in {hours:.0f} hours!") print(f" Withdraw now: satwork withdraw") print(f" Or set instant payouts: satwork set-payout user@getalby.com") else: print(f"\n Expires in {hours:.0f} hours") if data.get("instant_payouts"): print(f"\n Instant payouts: {data['payout_address']} (no expiry)") elif data["available_sats"] > 0: print(f"\n TIP: Set a payout address to skip the 48h expiry:") print(f" satwork set-payout user@getalby.com") def cmd_withdraw(args): """Withdraw sats via ASCII QR code. Generates an LNURL-withdraw QR code that your human scans with any Lightning wallet (Phoenix, Zeus, Breez, etc.). The wallet pulls the funds — no invoice needed. """ key = _require_agent_key(args) # Check balance bal = _get(f"/agent/{key}/balance") available = bal["available_sats"] print(f"Available: {available} sats") if available <= 0: print("Nothing to withdraw.") return # Get LNURL lnurl_data = _get(f"/agent/{key}/lnurl") lnurl = lnurl_data["lnurl"] print(f"\nWithdraw {available} sats — scan this QR with a Lightning wallet:\n") # Generate ASCII QR (the primary output — humans prefer scanning) if not _print_qr(lnurl.upper()): # Fallback: browser QR print(f"Open in browser for scannable QR:") print(f" https://satwork.ai/api/agent/{key}/lnurl/qr") print(f"\nOr paste into wallet: {lnurl}") print(f"\nExpires in 10 minutes. Works with Phoenix, Zeus, Breez, or any LNURL wallet.") def cmd_set_payout(args): """Set a Lightning address for instant payouts.""" key = _require_agent_key(args) result = _post(f"/agent/{key}/payout", {"payout_address": args.address}) print(f"Payout address set: {args.address}") print("Rewards will be paid directly to this Lightning address.") print("No more internal balance — sats go straight to your wallet.") def cmd_deposit(args): """Deposit sats via Lightning invoice.""" key = _require_agent_key(args) if not args.payment_hash: # Step 1: create invoice result = _post(f"/agent/{key}/deposit", {"amount_sats": args.amount}) print(f"Pay this Lightning invoice to deposit {args.amount} sats:\n") _print_qr(result["invoice"].upper()) print(f"\nInvoice: {result['invoice']}") print(f"Payment hash: {result['payment_hash']}") print(f"\nAfter paying, run:") print(f" satwork deposit {args.amount} --payment-hash {result['payment_hash']}") else: # Step 2: confirm payment result = _post(f"/agent/{key}/deposit", { "amount_sats": args.amount, "payment_hash": args.payment_hash, }) print(f"Deposited: {result['deposited_sats']} sats") print(f"Available: {result['available_sats']} sats") def cmd_deactivate(args): """Deactivate a target and refund unspent budget.""" key = _require_agent_key(args) result = _post(f"/targets/{args.target_id}/deactivate", {"agent_key": key}) print(f"Target {args.target_id}: {result['status']}") print(f"Refunded: {result['refunded_sats']} sats") def cmd_history(args): """Transaction history.""" key = _require_agent_key(args) data = _get(f"/agent/{key}/history?limit={args.limit}") txns = data.get("transactions", []) if not txns: print("No transactions yet.") return for t in txns: sign = "+" if t["txn_type"] == "earned" else "-" print(f" {sign}{t['amount_sats']:>6} sats {t['txn_type']:<10} {t['reference']:<30} {t['created_at']}") # --------------------------------------------------------------------------- # Knowledge Graph: browse and purchase prior art # --------------------------------------------------------------------------- def cmd_kg(args): """Knowledge graph commands: similar, buy, show.""" kg_commands = { "similar": cmd_kg_similar, "buy": cmd_kg_buy, "show": cmd_kg_show, } if args.kg_command in kg_commands: kg_commands[args.kg_command](args) else: print("Usage: satwork kg {similar|buy|show}", file=sys.stderr) sys.exit(1) def cmd_kg_similar(args): """Find similar solved problems for a target (free, no purchase needed).""" target_id = args.target_id data = _get(f"/propose/{target_id}/context") prior_art = data.get("prior_art", []) if not prior_art: print(f"No prior art found for {target_id}.") print("(Prior art appears when the KG has similar solved problems.)") return print(f"Prior art for {target_id}:") print() for node in prior_art: nid = node.get("node_id", node.get("id", "?")) sim = node.get("similarity", 0) price = node.get("price_sats", "?") target = node.get("target_id", "?") quality = node.get("quality", node.get("improvement_pct", "?")) print(f" {nid}") print(f" target: {target} similarity: {sim:.2f} price: {price} sats") if isinstance(quality, (int, float)): print(f" improvement: {quality:.1f}%") print() print(f"Buy with: satwork kg buy NODE_ID") def cmd_kg_buy(args): """Purchase a KG node solution.""" key = _require_agent_key(args) node_id = args.node_id # First try free access req = urllib.request.Request(f"{BASE}/kg/nodes/{node_id}/solution") req.add_header("X-Agent-Key", key) try: resp = urllib.request.urlopen(req) data = json.loads(resp.read()) _print_solution(node_id, data) return except urllib.error.HTTPError as e: if e.code == 402: # Need to purchase body = e.read().decode() try: info = json.loads(body) price = info.get("detail", {}).get("price_sats", info.get("price_sats", "?")) except (json.JSONDecodeError, AttributeError): price = "?" print(f"Node {node_id[:20]}... costs {price} sats.") elif e.code == 404: print(f"Node not found: {node_id}", file=sys.stderr) sys.exit(1) else: print(f"Error {e.code}: {e.read().decode()[:200]}", file=sys.stderr) sys.exit(1) # Confirm purchase if not args.yes: answer = input(f"Purchase for {price} sats? [y/N] ") if answer.lower() != "y": print("Cancelled.") return # Purchase result = _post(f"/kg/nodes/{node_id}/purchase", {"agent_key": key}) print(f"Purchased! Paid {result.get('price_sats', '?')} sats.") _print_solution(node_id, result) def _print_solution(node_id, data): """Print a purchased KG solution with ready-to-use propose command.""" solution = data.get("solution_data", data.get("solution", {})) target_id = data.get("target_id", "TARGET_ID") if isinstance(solution, str): try: solution = json.loads(solution) except json.JSONDecodeError: pass print(f"\nSolution for {node_id[:20]}... (from {target_id})") if isinstance(solution, dict): # Params-based solution params = solution.get("params", solution.get("parameters", {})) if isinstance(params, list): print(f" params: {params}") params_str = " ".join(str(p) for p in params) print(f"\n Warm start: satwork propose {target_id} --params {params_str}") elif isinstance(params, dict): for k, v in params.items(): print(f" {k}: {v}") params_str = " ".join(str(v) for v in params.values()) print(f"\n Warm start: satwork propose {target_id} --params {params_str}") else: print(f" {json.dumps(solution, indent=2)}") # Show files if present files = solution.get("files", {}) for fname, content in files.items(): print(f"\n --- {fname} ---") for line in str(content).splitlines()[:20]: print(f" {line}") if len(str(content).splitlines()) > 20: print(f" ... ({len(str(content).splitlines())} lines total)") if files: print(f"\n Warm start: satwork propose {target_id} --file {' --file '.join(files.keys())}") else: print(f" {solution}") def cmd_kg_show(args): """Show details of a KG node.""" node_id = args.node_id data = _get(f"/kg/nodes/{node_id}") print(json.dumps(data, indent=2)) # --------------------------------------------------------------------------- # Target lifecycle: init, validate, register # --------------------------------------------------------------------------- EVAL_TEMPLATES = { "blind": '''\ #!/usr/bin/env python3 """Eval script for {name}. Reads proposed_params.json, runs the benchmark, prints the metric. Must print exactly: {metric}: """ import json from pathlib import Path TARGET_DIR = Path(__file__).parent SEED = 42 params = json.loads((TARGET_DIR / "proposed_params.json").read_text()) # --- Your benchmark logic here --- # Access params like: params["your_param_name"] # Use SEED for any randomness to ensure determinism. score = 0.5 # TODO: replace with actual scoring print(f"{metric}: {score:.6f}") ''', "described": '''\ #!/usr/bin/env python3 """Eval script for {name}. Reads the mutable config file, scores it against test data. Must print exactly: {metric}: """ import json from pathlib import Path TARGET_DIR = Path(__file__).parent config = json.loads((TARGET_DIR / "config.json").read_text()) # Load test data (create data/ dir with your test cases) data_dir = TARGET_DIR / "data" # test_cases = json.loads((data_dir / "test.json").read_text()) # --- Your scoring logic here --- # Score the config against test data. score = 0.5 # TODO: replace with actual scoring print(f"{metric}: {score:.6f}") ''', "signature": '''\ #!/usr/bin/env python3 """Eval script for {name}. Imports the mutable solution module, tests it against a corpus. Must print exactly: {metric}: """ import json import sys from pathlib import Path TARGET_DIR = Path(__file__).parent sys.path.insert(0, str(TARGET_DIR)) from solution import solve # mutable file — agents rewrite this # Load test data (create data/ dir with your test cases) data_dir = TARGET_DIR / "data" # test_cases = json.loads((data_dir / "test.json").read_text()) # --- Your scoring logic here --- # correct = sum(1 for t in test_cases if solve(t["input"]) == t["expected"]) # score = correct / len(test_cases) score = 0.5 # TODO: replace with actual scoring print(f"{metric}: {score:.6f}") ''', } SOLUTION_TEMPLATE = '''\ """Solution module for {name}. Agents rewrite this file to improve the metric. """ def solve(input_data): """TODO: implement your solution function. Args: input_data: the input to process Returns: the processed result """ return input_data ''' def cmd_target(args): """Target lifecycle commands: init, validate, register, fund, status.""" target_commands = { "init": cmd_target_init, "validate": cmd_target_validate, "register": cmd_target_register, "fund": cmd_target_fund, "status": cmd_target_status, } if args.target_command in target_commands: target_commands[args.target_command](args) else: print("Usage: satwork target {init|validate|register|fund|status}", file=sys.stderr) sys.exit(1) def cmd_target_init(args): """Scaffold a target directory with eval template and metadata.""" name = args.name tier = args.tier metric = args.metric direction = getattr(args, "direction", "maximize") # Create directory from name slug = name.lower().replace(" ", "-") slug = "".join(c for c in slug if c.isalnum() or c == "-") slug = slug[:50].strip("-") target_dir = os.path.join(args.dir or ".", slug) if os.path.exists(target_dir): print(f"Directory already exists: {target_dir}", file=sys.stderr) sys.exit(1) os.makedirs(target_dir) # Write eval.py from template template = EVAL_TEMPLATES.get(tier, EVAL_TEMPLATES["blind"]) eval_content = template.replace("{name}", name).replace("{metric}", metric) with open(os.path.join(target_dir, "eval.py"), "w") as f: f.write(eval_content) # Write target.json metadata target_meta = { "name": name, "privacy_tier": tier, "metric_name": metric, "metric_direction": direction, "cost_per_proposal": 5 if tier in ("described", "signature") else 2, "reward_sats": 200 if tier in ("described", "signature") else 100, "max_proposals_per_agent": 500 if tier in ("described", "signature") else 5000, } if tier == "blind": # Scaffold parameter spec with example target_meta["parameter_spec"] = [ {"name": "param1", "min": 0.0, "max": 1.0, "type": "float"}, {"name": "param2", "min": 1, "max": 100, "type": "int"}, ] # Write default proposed_params.json with open(os.path.join(target_dir, "proposed_params.json"), "w") as f: json.dump({"param1": 0.5, "param2": 50}, f, indent=2) elif tier == "described": target_meta["mutable_files"] = ["config.json"] target_meta["description"] = "TODO: describe the problem for agents" # Write default config with open(os.path.join(target_dir, "config.json"), "w") as f: json.dump({"setting": "value"}, f, indent=2) os.makedirs(os.path.join(target_dir, "data"), exist_ok=True) elif tier == "signature": target_meta["mutable_files"] = ["solution.py"] target_meta["description"] = "TODO: describe the problem for agents" target_meta["signatures"] = [ { "name": "solve", "file": "solution.py", "language": "python", "signature": "def solve(input_data) -> result", "description": "TODO: describe what this function should do", }, ] # Write solution template with open(os.path.join(target_dir, "solution.py"), "w") as f: f.write(SOLUTION_TEMPLATE.replace("{name}", name)) os.makedirs(os.path.join(target_dir, "data"), exist_ok=True) with open(os.path.join(target_dir, "target.json"), "w") as f: json.dump(target_meta, f, indent=2) print(f"Target scaffolded: {target_dir}/") print(f" eval.py — scoring logic (edit this)") if tier == "blind": print(f" proposed_params.json — default parameters") print(f" target.json — edit parameter_spec with your actual params") elif tier == "described": print(f" config.json — mutable config (agents modify this)") print(f" data/ — put test data here") print(f" target.json — edit description and mutable_files") elif tier == "signature": print(f" solution.py — mutable code (agents rewrite this)") print(f" data/ — put test data here") print(f" target.json — edit signatures and description") print() print("Next steps:") print(" 1. Edit eval.py with your actual scoring logic") print(f" 2. satwork target validate {target_dir}") print(f" 3. satwork target register {target_dir} --budget 5000") def cmd_target_validate(args): """Validate a target directory locally.""" target_dir = args.target_dir or "." target_dir = os.path.abspath(target_dir) errors = [] warnings = [] # Check required files eval_path = os.path.join(target_dir, "eval.py") meta_path = os.path.join(target_dir, "target.json") if not os.path.exists(eval_path): errors.append("eval.py not found") if not os.path.exists(meta_path): errors.append("target.json not found") if errors: for e in errors: print(f" FAIL: {e}") sys.exit(1) # Load metadata with open(meta_path) as f: meta = json.load(f) metric = meta.get("metric_name") tier = meta.get("privacy_tier", "blind") if not metric: errors.append("target.json missing metric_name") if not meta.get("name"): errors.append("target.json missing name") # Check tier-specific files if tier == "blind": if not meta.get("parameter_spec"): errors.append("Blind target missing parameter_spec in target.json") if not os.path.exists(os.path.join(target_dir, "proposed_params.json")): errors.append("Blind target missing proposed_params.json") elif tier in ("described", "signature"): if not meta.get("mutable_files"): errors.append(f"{tier} target missing mutable_files in target.json") if tier == "signature" and not meta.get("signatures"): errors.append("Signature target missing signatures in target.json") for mf in meta.get("mutable_files", []): if not os.path.exists(os.path.join(target_dir, mf)): errors.append(f"Mutable file not found: {mf}") if errors: for e in errors: print(f" FAIL: {e}") sys.exit(1) print(f"Validating: {os.path.basename(target_dir)} ({tier})") # Run eval — first pass print(" Running eval (pass 1)...", end="", flush=True) t0 = time.time() try: r1 = subprocess.run( [sys.executable, "eval.py"], cwd=target_dir, capture_output=True, text=True, timeout=120, ) except subprocess.TimeoutExpired: print(" TIMEOUT (>120s)") errors.append("Eval timed out (must complete in <120s)") for e in errors: print(f" FAIL: {e}") sys.exit(1) elapsed = time.time() - t0 if r1.returncode != 0: print(f" FAILED (exit {r1.returncode})") stderr_tail = r1.stderr.strip()[-300:] if r1.stderr else "" errors.append(f"Eval failed: {stderr_tail}") for e in errors: print(f" FAIL: {e}") sys.exit(1) # Parse score score1 = None for line in r1.stdout.splitlines(): if line.strip().startswith(f"{metric}:"): try: score1 = float(line.split(":", 1)[1].strip()) except ValueError: pass if score1 is None: print(" NO SCORE") errors.append( f"Eval output missing '{metric}: '. " f"Got: {r1.stdout.strip()[:200]}" ) for e in errors: print(f" FAIL: {e}") sys.exit(1) print(f" {score1:.6f} ({elapsed:.1f}s)") # Score range check if not (0.0 <= score1 <= 1.0): warnings.append(f"Score {score1} outside [0,1] — normalize to 0-1 for best results") if elapsed > 60: warnings.append(f"Eval took {elapsed:.0f}s — aim for <30s for agent throughput") # Run eval — second pass (determinism check) print(" Running eval (pass 2)...", end="", flush=True) r2 = subprocess.run( [sys.executable, "eval.py"], cwd=target_dir, capture_output=True, text=True, timeout=120, ) score2 = None for line in r2.stdout.splitlines(): if line.strip().startswith(f"{metric}:"): try: score2 = float(line.split(":", 1)[1].strip()) except ValueError: pass if score2 is not None and score1 == score2: print(f" {score2:.6f} (deterministic)") elif score2 is not None: print(f" {score2:.6f} (NON-DETERMINISTIC)") errors.append( f"Eval is not deterministic: {score1} vs {score2}. " f"Use fixed seeds for any randomness." ) else: print(" FAILED") errors.append("Second eval run failed — not deterministic") # For blind targets: check that different params produce different scores if tier == "blind" and meta.get("parameter_spec") and not errors: params_path = os.path.join(target_dir, "proposed_params.json") with open(params_path) as f: original_params = f.read() # Write max-bound params test_params = {} for p in meta["parameter_spec"]: test_params[p["name"]] = p["max"] with open(params_path, "w") as f: json.dump(test_params, f) print(" Param sensitivity...", end="", flush=True) r3 = subprocess.run( [sys.executable, "eval.py"], cwd=target_dir, capture_output=True, text=True, timeout=120, ) score3 = None for line in r3.stdout.splitlines(): if line.strip().startswith(f"{metric}:"): try: score3 = float(line.split(":", 1)[1].strip()) except ValueError: pass # Restore original params with open(params_path, "w") as f: f.write(original_params) if score3 is not None and score3 != score1: print(f" {score3:.6f} (responsive)") elif score3 is not None: print(f" {score3:.6f} (SAME — params ignored!)") errors.append( "Eval returns same score for different params. " "Make sure eval.py reads from proposed_params.json." ) else: print(" eval failed with max params") warnings.append("Could not verify parameter sensitivity") # Report print() if errors: for e in errors: print(f" FAIL: {e}") sys.exit(1) for w in warnings: print(f" WARN: {w}") print(f" OK: {meta['name']} — baseline {score1:.6f}") print(f" Ready to register: satwork target register {target_dir} --budget SATS") def cmd_target_fund(args): """Fund the next chunk of a hold_invoice target (1 QR code).""" target_id = args.target_id result = _post(f"/targets/{target_id}/refill", {}) invoice = result.get("invoice", "") amount = result.get("invoice_amount_sats", "?") chunk = result.get("chunk", "?") print(f"Refill chunk {chunk} for {target_id}") print(f"\nScan this QR code to fund ~5 more improvements:\n") if invoice: _print_qr(invoice.upper()) print(f"\nInvoice: {invoice}") print(f"Amount: {amount} sats") print(f"\nAfter scanning, run: satwork target status {target_id}") def cmd_target_status(args): """Check hold invoice funding status.""" target_id = args.target_id result = _get(f"/targets/{target_id}/funding") print(f"Funding status: {target_id}") print(f" Chunks: {result.get('chunks', 0)}") print(f" Remaining: {result.get('remaining_sats', 0)} sats") print(f" Settled: {result.get('total_settled_sats', 0)} sats") print(f" Needs refill: {result.get('needs_refill', '?')}") if result.get("needs_refill"): print(f"\n Run: satwork target fund {target_id}") def cmd_target_register(args): """Register a validated target on satwork.ai.""" target_dir = os.path.abspath(args.target_dir or ".") budget = args.budget # Load metadata meta_path = os.path.join(target_dir, "target.json") if not os.path.exists(meta_path): print("target.json not found. Run 'satwork target init' first.", file=sys.stderr) sys.exit(1) with open(meta_path) as f: meta = json.load(f) # Read eval script eval_path = os.path.join(target_dir, "eval.py") if not os.path.exists(eval_path): print("eval.py not found.", file=sys.stderr) sys.exit(1) with open(eval_path) as f: eval_script = f.read() tier = meta.get("privacy_tier", "blind") # Build request body body = { "name": meta["name"], "privacy_tier": tier, "metric_name": meta["metric_name"], "metric_direction": meta.get("metric_direction", "maximize"), "budget_sats": budget, "cost_per_proposal": meta.get("cost_per_proposal", 5), "reward_sats": meta.get("reward_sats", 200), "max_proposals_per_agent": meta.get("max_proposals_per_agent", 500), "eval_script": eval_script, } if meta.get("description"): body["description"] = meta["description"] if tier == "blind" and meta.get("parameter_spec"): body["parameter_spec"] = meta["parameter_spec"] if meta.get("mutable_files"): mutable = {} for filename in meta["mutable_files"]: filepath = os.path.join(target_dir, filename) if os.path.exists(filepath): with open(filepath) as f: mutable[filename] = f.read() else: print(f"Mutable file not found: {filename}", file=sys.stderr) sys.exit(1) body["mutable_files"] = mutable if meta.get("signatures"): body["signatures"] = meta["signatures"] # Read data files data_files = {} data_dir = os.path.join(target_dir, "data") if os.path.isdir(data_dir): for fname in os.listdir(data_dir): fpath = os.path.join(data_dir, fname) if os.path.isfile(fpath): with open(fpath) as f: data_files[fname] = f.read() # For blind targets: include proposed_params.json as default params # so the server uses these (not midpoints) for baseline eval if tier == "blind": pp_path = os.path.join(target_dir, "proposed_params.json") if os.path.exists(pp_path): with open(pp_path) as f: data_files["proposed_params.json"] = f.read() if data_files: body["data_files"] = data_files # Funding model funding = getattr(args, "funding", "ledger") body["funding_model"] = funding # Funding: agent_key (earned balance) or Lightning invoice # Targets ≤ 500 sats are free (server handles fallback) key = _load_agent_key() if funding != "hold_invoice": if args.payment_hash: body["payment_hash"] = args.payment_hash elif key: body["agent_key"] = key print(f"Registering: {meta['name']} ({tier})") print(f" Budget: {budget} sats") print(f" Funding: {funding}") if funding == "hold_invoice": print(f" Non-custodial: funds stay in your Lightning channel") result = _post("/targets", body) if result.get("status") == "invoice_created": # Custodial path — need to pay invoice first print(f"\nPay this Lightning invoice to fund your target:\n") _print_qr(result["invoice"].upper()) print(f"\nInvoice: {result['invoice']}") print(f"Amount: {result['amount_sats']} sats") print(f"\nAfter paying, run:") print(f" satwork target register {target_dir} --budget {budget} " f"--payment-hash {result['payment_hash']}") elif result.get("status") == "awaiting_funding": # Hold invoice path — show 1 QR code for the chunk print(f"\n Target created: {result['target_id']}") print(f" Baseline: {result.get('baseline_score', 0):.6f}") print(f"\n Scan this QR code to fund the first chunk:\n") invoice = result.get("invoice", "") if invoice: _print_qr(invoice.upper()) print(f"\n Invoice: {invoice}") print(f" Amount: {result.get('invoice_amount_sats', '?')} sats") print(f"\n After scanning, run:") print(f" satwork target status {result['target_id']}") print(f" The coordinator will verify payment and activate the target.") else: print(f"\n Target registered: {result['target_id']}") print(f" Baseline: {result.get('baseline_score', 0):.6f}") print(f" Status: {result['status']}") print(f"\n Agents can now propose improvements to earn sats.") # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main(): p = argparse.ArgumentParser( prog="satwork", description="CLI for the satwork verified computation protocol.", ) p.add_argument("--agent-key", help="Agent key (default: ~/.satwork/agent_key or SATWORK_AGENT_KEY)") sub = p.add_subparsers(dest="command") # init sp = sub.add_parser("init", help="Generate and save a secure agent key") sp.add_argument("--force", action="store_true", help="Overwrite existing agent key") sp.add_argument("--payout", help="Lightning address for instant payouts (user@domain.com)") # targets sub.add_parser("targets", help="List active targets with economic signals") # context sp = sub.add_parser("context", help="Situational awareness for a target") sp.add_argument("target_id") # spec sp = sub.add_parser("spec", help="Get target specification") sp.add_argument("target_id") # example sp = sub.add_parser("example", help="Get example proposal body") sp.add_argument("target_id") # propose sp = sub.add_parser("propose", help="Submit a proposal") sp.add_argument("target_id") sp.add_argument("--params", nargs="+", help="Parameter values (blind targets)") sp.add_argument("--file", action="append", help="File to submit (repeatable, for described/signature targets)") sp.add_argument("--json", help="Full proposal body as JSON") sp.add_argument("--tldr", help="One-line approach summary") # set-payout sp = sub.add_parser("set-payout", help="Set Lightning address for instant payouts") sp.add_argument("address", help="Lightning address (user@domain.com)") # balance sub.add_parser("balance", help="Check agent balance") # deposit sp = sub.add_parser("deposit", help="Deposit sats via Lightning invoice") sp.add_argument("amount", type=int, help="Amount in sats") sp.add_argument("--payment-hash", help="Payment hash (step 2: confirm after paying)") # withdraw sub.add_parser("withdraw", help="Withdraw sats via ASCII QR code") # deactivate sp = sub.add_parser("deactivate", help="Deactivate a target, refund unspent budget") sp.add_argument("target_id") # history sp = sub.add_parser("history", help="Transaction history") sp.add_argument("--limit", type=int, default=20) # kg (with subcommands: similar, buy, show) sp = sub.add_parser("kg", help="Knowledge graph: browse and purchase prior art") kg_sub = sp.add_subparsers(dest="kg_command") # kg similar ks = kg_sub.add_parser("similar", help="Find similar solved problems for a target") ks.add_argument("target_id", help="Target to find prior art for") # kg buy kb = kg_sub.add_parser("buy", help="Purchase a KG node solution") kb.add_argument("node_id", help="Node ID to purchase") kb.add_argument("-y", "--yes", action="store_true", help="Skip confirmation prompt") # kg show ksh = kg_sub.add_parser("show", help="Show KG node details") ksh.add_argument("node_id", help="Node ID to inspect") # target (with subcommands: init, validate, register) sp = sub.add_parser("target", help="Create, validate, and register targets") target_sub = sp.add_subparsers(dest="target_command") # target init ti = target_sub.add_parser("init", help="Scaffold a new target directory") ti.add_argument("--name", required=True, help="Target name") ti.add_argument("--tier", required=True, choices=["blind", "described", "signature"], help="Privacy tier") ti.add_argument("--metric", required=True, help="Metric name (appears in eval output)") ti.add_argument("--direction", default="maximize", choices=["maximize", "minimize"]) ti.add_argument("--dir", help="Parent directory (default: current)") # target validate tv = target_sub.add_parser("validate", help="Validate a target directory locally") tv.add_argument("target_dir", nargs="?", help="Target directory (default: current)") # target register tr = target_sub.add_parser("register", help="Register target on satwork.ai") tr.add_argument("target_dir", nargs="?", help="Target directory (default: current)") tr.add_argument("--budget", required=True, type=int, help="Budget in sats (min 100)") tr.add_argument("--payment-hash", help="Payment hash from a paid Lightning invoice") tr.add_argument("--funding", default="hold_invoice", choices=["hold_invoice", "ledger"], help="Funding model: hold_invoice (non-custodial, default) or ledger (custodial)") # target fund (refill hold invoice target) tf = target_sub.add_parser("fund", help="Fund the next chunk of a hold_invoice target") tf.add_argument("target_id", help="Target ID to refill") # target status (hold invoice funding status) ts = target_sub.add_parser("status", help="Check hold invoice funding status") ts.add_argument("target_id", help="Target ID") args = p.parse_args() commands = { "init": cmd_init, "targets": cmd_targets, "context": cmd_context, "spec": cmd_spec, "example": cmd_example, "propose": cmd_propose, "set-payout": cmd_set_payout, "balance": cmd_balance, "deposit": cmd_deposit, "withdraw": cmd_withdraw, "deactivate": cmd_deactivate, "history": cmd_history, "kg": cmd_kg, "target": cmd_target, } if args.command in commands: commands[args.command](args) else: p.print_help() if __name__ == "__main__": main()