#!/usr/bin/env python3 # pyright: reportUnusedCallResult=false, reportAny=false """Pinned third-party dependency acquisition, verification, and metadata tool. Modes: --list Show inventory status (offline, no mutation). --check Verify all local archives match known SHA256 (offline). --fetch [DEPS...] Download pending or named dependencies. --update-hashes Replace PENDING_T12 in versions.cmake with computed hashes. --dry-run Show what fetch/update would do without executing. Legacy commands (preserved): --verify-fuzztest-baseline Verify the pinned FuzzTest archive SHA256. --verify-fuzztest-patch Apply FuzzTest patches and scan no-network contract. --print-metadata fuzztest Print pinned dependency metadata as JSON. """ from __future__ import annotations import argparse import hashlib import json import os import re import shutil import subprocess import sys import tarfile import tempfile import urllib.request from pathlib import Path from typing import cast ROOT = Path(__file__).resolve().parents[1] VERSIONS_CMAKE = ROOT / "cmake" / "deps" / "versions.cmake" ARCHIVES_DIR = ROOT / "3rd" / "archives" PENDING_TOKEN = "PENDING_T12" DOWNLOAD_TIMEOUT = 120 # seconds CHUNK_SIZE = 1024 * 1024 # 1 MiB FUZZTEST = { "name": "fuzztest", "version": "2026-02-19", "release": "2026-02-19", "commit": "b73724d4866c22d9b64c152a2d7ac22c7ca94168", "archive": "fuzztest-2026-02-19.tar.gz", "url": "https://github.com/google/fuzztest/releases/download/2026-02-19/fuzztest-2026-02-19.tar.gz", "sha256": "1c6e04065eb988e2c99613369db8294aa58429d392bf479740b237f1255204ef", "patch_dir": "3rd/patches/fuzztest/2026-02-19", "required_targets": [ "absl::flat_hash_map", "absl::random_bit_gen_ref", "absl::status", "absl::statusor", "absl::strings", "absl::string_view", "absl::span", "absl::flags", "absl::flags_parse", "re2::re2", "GTest::gtest", "GTest::gmock", "antlr4_static", ], "required_variables": ["ANTLR4_RUNTIME_INCLUDE_DIR"], "forbidden_targets": [ "protobuf::libprotobuf", "nlohmann_json::nlohmann_json", "flatbuffers", ], } FORBIDDEN_PATCHED_TOKENS = [ "FetchContent_Declare", "FetchContent_MakeAvailable", "FetchContent_Populate", "ExternalProject_Add", "CPMAddPackage", "GIT_REPOSITORY", "git clone", ] PATCHED_DEPENDENCY_FILES = [ "cmake/BuildDependencies.cmake", "CMakeLists.txt", "grammar_codegen/generated_antlr_parser/CMakeLists.txt", ] # --------------------------------------------------------------------------- # Inventory parsing from versions.cmake # --------------------------------------------------------------------------- def parse_inventory(cmake_path: Path = VERSIONS_CMAKE) -> list[dict[str, str]]: """Parse dependency entries from versions.cmake into a list of dicts.""" text = cmake_path.read_text(encoding="utf-8") entries: list[dict[str, str]] = [] # Find all set(CPP_TEMPLATE__VERSION ...) blocks. # Each dependency starts with a VERSION line; collect variables until next VERSION. version_pattern = re.compile( r'set\(CPP_TEMPLATE_(\w+)_VERSION\s+"([^"]+)"\)' ) var_pattern = re.compile( r'set\(CPP_TEMPLATE_(\w+)_(\w+)\s+(?:"([^"]*)"|(.+?))\s*\)', re.MULTILINE, ) # Split into blocks by finding dependency names from VERSION lines dep_names: list[str] = [] for match in version_pattern.finditer(text): name = match.group(1) dep_names.append(name) # For each dependency name, extract all its variables for name in dep_names: entry: dict[str, str] = {"cmake_name": name} for match in var_pattern.finditer(text): var_dep = match.group(1) var_key = match.group(2) var_val = match.group(3) if match.group(3) is not None else match.group(4).strip() if var_dep == name: entry[var_key] = var_val if "VERSION" in entry: entries.append(entry) return entries def inventory_to_dep(entry: dict[str, str]) -> dict[str, str]: """Normalize a cmake inventory entry into a dependency dict.""" return { "name": entry["cmake_name"].lower().replace("_", "-"), "cmake_name": entry["cmake_name"], "version": entry.get("VERSION", ""), "url": entry.get("URL", ""), "archive": entry.get("ARCHIVE", ""), "sha256": entry.get("SHA256", ""), "cxx_standard": entry.get("CXX_STANDARD", ""), "targets": entry.get("TARGETS", ""), "presets": entry.get("PRESETS", ""), } # --------------------------------------------------------------------------- # SHA256 helpers # --------------------------------------------------------------------------- def sha256_file(path: Path) -> str: digest = hashlib.sha256() with path.open("rb") as file: for chunk in iter(lambda: file.read(CHUNK_SIZE), b""): digest.update(chunk) return digest.hexdigest() def sha256_streaming(url: str, dest: Path) -> str: """Download url to dest using streaming, return SHA256 hex digest. Uses a temp file in the same directory for atomic replacement. """ tmp_fd, tmp_path = tempfile.mkstemp( dir=str(dest.parent), prefix=f".{dest.name}.tmp", ) tmp_file = Path(tmp_path) try: os.close(tmp_fd) digest = hashlib.sha256() response = urllib.request.urlopen(url, timeout=DOWNLOAD_TIMEOUT) with tmp_file.open("wb") as out: while True: chunk = response.read(CHUNK_SIZE) if not chunk: break out.write(chunk) digest.update(chunk) os.replace(str(tmp_file), str(dest)) return digest.hexdigest() except BaseException: # Clean up temp file on any failure if tmp_file.exists(): tmp_file.unlink() raise # --------------------------------------------------------------------------- # Download / verify # --------------------------------------------------------------------------- def download_dep( dep: dict[str, str], *, force: bool = False, dry_run: bool = False, ) -> str | None: """Download and verify one dependency archive. Returns computed SHA256 on success, None if skipped. Raises SystemExit on failure. """ archive_path = ARCHIVES_DIR / dep["archive"] expected_hash = dep["sha256"] is_pending = expected_hash == PENDING_TOKEN # If archive exists and is not pending, check hash if archive_path.exists(): actual_hash = sha256_file(archive_path) if actual_hash == expected_hash: print(f" ok {dep['archive']} (hash matches)") return None if not force and not is_pending: raise SystemExit(f" FAIL {dep['archive']}: hash mismatch\n expected: {expected_hash}\n actual: {actual_hash}\n use --force to re-download") if not force and is_pending: # Archive exists but hash is pending — compute and report print(f" ok {dep['archive']} (SHA256={actual_hash}, pending verification)") return actual_hash if dry_run: print(f" would download {dep['url']} -> {archive_path}") return None print(f" fetching {dep['url']}") ARCHIVES_DIR.mkdir(parents=True, exist_ok=True) actual_hash = sha256_streaming(dep["url"], archive_path) if not is_pending and actual_hash != expected_hash: # Hash mismatch after download — remove and fail archive_path.unlink() raise SystemExit(f" FAIL {dep['archive']}: download hash mismatch\n expected: {expected_hash}\n actual: {actual_hash}") status = "pending" if is_pending else "verified" print(f" ok {dep['archive']} (SHA256={actual_hash}, {status})") return actual_hash def verify_dep(dep: dict[str, str]) -> str: """Verify a single dependency archive. Returns status string.""" archive_path = ARCHIVES_DIR / dep["archive"] expected_hash = dep["sha256"] is_pending = expected_hash == PENDING_TOKEN if not archive_path.exists(): if is_pending: return "PENDING" return "MISSING" actual_hash = sha256_file(archive_path) if is_pending: return f"LOCAL:{actual_hash}" if actual_hash == expected_hash: return "OK" return f"CORRUPT:{actual_hash}" # --------------------------------------------------------------------------- # versions.cmake hash update # --------------------------------------------------------------------------- def update_versions_cmake( replacements: dict[str, str], cmake_path: Path = VERSIONS_CMAKE, *, dry_run: bool = False, ) -> int: """Replace PENDING_T12 hashes in versions.cmake. Returns count of replacements.""" text = cmake_path.read_text(encoding="utf-8") count = 0 for cmake_name, new_hash in replacements.items(): pattern = f'set(CPP_TEMPLATE_{cmake_name}_SHA256 "{PENDING_TOKEN}")' replacement = f'set(CPP_TEMPLATE_{cmake_name}_SHA256 "{new_hash}")' if pattern in text: if dry_run: print(f" would update CPP_TEMPLATE_{cmake_name}_SHA256 -> {new_hash}") else: text = text.replace(pattern, replacement) print(f" updated CPP_TEMPLATE_{cmake_name}_SHA256 = {new_hash}") count += 1 if not dry_run and count > 0: cmake_path.write_text(text, encoding="utf-8") return count # --------------------------------------------------------------------------- # Legacy FuzzTest functions (preserved from T5) # --------------------------------------------------------------------------- def archive_path() -> Path: return ARCHIVES_DIR / cast(str, FUZZTEST["archive"]) def patch_dir() -> Path: return ROOT / cast(str, FUZZTEST["patch_dir"]) def verify_fuzztest_baseline() -> None: path = archive_path() if not path.exists(): raise SystemExit( f"missing {path}; acquire the pristine archive from {FUZZTEST['url']} and verify SHA256" ) actual_sha256 = sha256_file(path) if actual_sha256 != FUZZTEST["sha256"]: raise SystemExit( f"sha256 mismatch for {path}: expected {FUZZTEST['sha256']}, got {actual_sha256}" ) print(f"verified {path} sha256={actual_sha256}") def extract_archive(destination: Path) -> Path: with tarfile.open(archive_path(), "r:gz") as archive: archive.extractall(destination, filter="data") roots = [path for path in destination.iterdir() if path.is_dir()] if len(roots) != 1: raise SystemExit(f"expected one FuzzTest archive root in {destination}, found {len(roots)}") return roots[0] def apply_patch(source_root: Path, patch_file: Path) -> None: patch_binary = shutil.which("patch") if patch_binary is None: raise SystemExit("patch executable is required for --verify-fuzztest-patch") result = subprocess.run( [patch_binary, "-p1", "-i", str(patch_file)], cwd=source_root, check=False, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) if result.returncode != 0: raise SystemExit(f"failed to apply {patch_file}:\n{result.stdout}") def scan_no_network_contract(source_root: Path) -> None: findings: list[str] = [] for relative in PATCHED_DEPENDENCY_FILES: path = source_root / relative if not path.exists(): raise SystemExit(f"patched verification file missing: {relative}") text = path.read_text(encoding="utf-8") for token in FORBIDDEN_PATCHED_TOKENS: if token in text: findings.append(f"{relative}: forbidden token {token}") if findings: raise SystemExit("patched FuzzTest no-network contract failed:\n" + "\n".join(findings)) def verify_fuzztest_patch() -> None: verify_fuzztest_baseline() patches = sorted(patch_dir().glob("*.patch")) if not patches: raise SystemExit(f"no patch files found in {patch_dir()}") with tempfile.TemporaryDirectory(prefix="fuzztest-patch-") as temporary: source_root = extract_archive(Path(temporary)) for patch_file in patches: apply_patch(source_root, patch_file) scan_no_network_contract(source_root) print(f"verified {len(patches)} FuzzTest patch file(s) from {patch_dir()}") def print_metadata(name: str) -> None: if name != "fuzztest": raise SystemExit(f"unknown dependency metadata: {name}") print(json.dumps(FUZZTEST, indent=2, sort_keys=True)) # --------------------------------------------------------------------------- # CLI mode implementations # --------------------------------------------------------------------------- def cmd_list(_args: argparse.Namespace) -> int: """List all dependencies and their archive status.""" entries = parse_inventory() if not entries: print("No dependencies found in versions.cmake") return 0 print(f"{'Dependency':<30} {'Version':<16} {'Archive':<32} {'Status'}") print("-" * 100) errors = 0 for entry in entries: dep = inventory_to_dep(entry) status = verify_dep(dep) if status.startswith("CORRUPT") or status == "MISSING": errors += 1 print(f"{dep['name']:<30} {dep['version']:<16} {dep['archive']:<32} {status}") print() concrete = sum( 1 for e in entries if e.get("SHA256", "") != PENDING_TOKEN ) pending = len(entries) - concrete print(f"Total: {len(entries)} dependencies, {concrete} with concrete SHA256, {pending} pending") return 1 if errors else 0 def cmd_check(_args: argparse.Namespace) -> int: """Verify all local archives against known SHA256 hashes. Offline.""" entries = parse_inventory() if not entries: print("No dependencies found in versions.cmake") return 0 errors = 0 pending_count = 0 ok_count = 0 for entry in entries: dep = inventory_to_dep(entry) status = verify_dep(dep) if status == "OK": ok_count += 1 elif status == "PENDING": pending_count += 1 elif status.startswith("LOCAL:"): pending_count += 1 elif status == "MISSING": print(f" MISSING {dep['archive']}: archive not found in {ARCHIVES_DIR}") errors += 1 elif status.startswith("CORRUPT:"): actual = status.split(":", 1)[1] print(f" CORRUPT {dep['archive']}") print(f" expected: {dep['sha256']}") print(f" actual: {actual}") errors += 1 print() print(f"Checked {len(entries)} dependencies: {ok_count} ok, {pending_count} pending, {errors} errors") if errors: print("FAIL: archive verification failed") return 1 print("PASS: all concrete hashes verified") return 0 def cmd_fetch(args: argparse.Namespace) -> int: """Download pending or selected dependencies.""" entries = parse_inventory() if not entries: print("No dependencies found in versions.cmake") return 0 # Filter by requested names if any if args.deps: name_map = {inventory_to_dep(e)["name"]: e for e in entries} cmake_name_map = {e["cmake_name"].lower(): e for e in entries} selected: list[dict[str, str]] = [] for name in args.deps: dep_name = name.lower().replace("_", "-") if dep_name in name_map: selected.append(name_map[dep_name]) elif name.upper() in {e["cmake_name"] for e in entries}: selected.append(cmake_name_map[name.upper()]) else: print(f" unknown dependency: {name}") return 1 entries = selected dry_run = args.dry_run force = getattr(args, "force", False) computed_hashes: dict[str, str] = {} for entry in entries: dep = inventory_to_dep(entry) print(f"[{dep['name']}]") result = download_dep(dep, force=force, dry_run=dry_run) if result is not None: computed_hashes[entry["cmake_name"]] = result if computed_hashes and not dry_run: print() print(f"Fetched {len(computed_hashes)} archives with new hashes.") print("Run 'fetch_deps.py --update-hashes' to commit hashes to versions.cmake") return 0 def cmd_update_hashes(args: argparse.Namespace) -> int: """Replace PENDING_T12 entries in versions.cmake with computed hashes.""" entries = parse_inventory() if not entries: print("No dependencies found in versions.cmake") return 0 dry_run = args.dry_run replacements: dict[str, str] = {} for entry in entries: dep = inventory_to_dep(entry) if entry.get("SHA256") != PENDING_TOKEN: continue archive_path = ARCHIVES_DIR / dep["archive"] if not archive_path.exists(): print(f" skip {dep['name']}: archive not found at {archive_path}") continue actual_hash = sha256_file(archive_path) replacements[entry["cmake_name"]] = actual_hash if not replacements: print("No PENDING_T12 entries with local archives to update.") return 0 count = update_versions_cmake(replacements, dry_run=dry_run) if dry_run: print(f"\nWould update {count} hashes in {VERSIONS_CMAKE}") else: print(f"\nUpdated {count} hashes in {VERSIONS_CMAKE}") return 0 # --------------------------------------------------------------------------- # CLI parsing # --------------------------------------------------------------------------- def parse_args(argv: list[str]) -> argparse.Namespace: parser = argparse.ArgumentParser( description="Pinned third-party dependency acquisition, verification, and metadata tool.", ) parser.add_argument( "--list", action="store_true", help="show dependency inventory status (offline, no mutation)", ) parser.add_argument( "--check", action="store_true", help="verify all local archives against known SHA256 (offline)", ) parser.add_argument( "--fetch", action="store_true", help="download pending or selected dependencies", ) parser.add_argument( "--update-hashes", action="store_true", help="replace PENDING_T12 in versions.cmake with computed hashes", ) parser.add_argument( "--verify-fuzztest-baseline", action="store_true", help="verify the pinned FuzzTest archive SHA256", ) parser.add_argument( "--verify-fuzztest-patch", action="store_true", help="apply FuzzTest patches and scan no-network contract", ) parser.add_argument( "--print-metadata", choices=["fuzztest"], help="print pinned dependency metadata as JSON", ) parser.add_argument( "deps", nargs="*", help="specific dependency names to operate on (for --fetch)", ) parser.add_argument( "--dry-run", action="store_true", help="show what would be done without executing (for --fetch, --update-hashes)", ) parser.add_argument( "--force", action="store_true", help="re-download even if archive exists (for --fetch)", ) return parser.parse_args(argv) def main(argv: list[str]) -> int: args = parse_args(argv) did_work = False rc = 0 # Legacy FuzzTest actions run first in deterministic order if args.verify_fuzztest_baseline: verify_fuzztest_baseline() did_work = True if args.verify_fuzztest_patch: verify_fuzztest_patch() did_work = True if args.print_metadata: print_metadata(args.print_metadata) did_work = True # Inventory mode actions if args.list: rc = cmd_list(args) did_work = True if args.check: rc = cmd_check(args) did_work = True if args.fetch: rc = cmd_fetch(args) did_work = True if args.update_hashes: rc = cmd_update_hashes(args) did_work = True if not did_work: raise SystemExit("no action requested; use --help") return rc if __name__ == "__main__": sys.exit(main(sys.argv[1:]))