#!/usr/bin/env python3 """Deterministic FuzzTest patch materializer. Extracts the pristine FuzzTest archive into a build-tree source directory, applies patch files, verifies the no-network contract, and writes a manifest for idempotent reuse. Designed to be called from CMake configure and from CLI. Usage: python3 scripts/patch_fuzztest.py --help python3 scripts/patch_fuzztest.py --materialize [--force] """ from __future__ import annotations import argparse import hashlib import json import shutil import subprocess import sys import tarfile from pathlib import Path from typing import TypedDict, cast ROOT = Path(__file__).resolve().parents[1] ARCHIVES_DIR = ROOT / "3rd" / "archives" MANIFEST_FILENAME = ".patch-manifest.json" MANIFEST_SCHEMA_VERSION = 1 CHUNK_SIZE = 1024 * 1024 # 1 MiB # Pinned FuzzTest metadata — must match cmake/deps/fuzztest.cmake and fetch_deps.py. FUZZTEST_VERSION = "2026-02-19" FUZZTEST_COMMIT = "b73724d4866c22d9b64c152a2d7ac22c7ca94168" FUZZTEST_ARCHIVE = "fuzztest-2026-02-19.tar.gz" FUZZTEST_SHA256 = "1c6e04065eb988e2c99613369db8294aa58429d392bf479740b237f1255204ef" FUZZTEST_PATCH_DIR = ROOT / "3rd" / "patches" / "fuzztest" / "2026-02-19" # Forbidden tokens in patched dependency files (no-network contract). FORBIDDEN_TOKENS = [ "FetchContent_Declare", "FetchContent_MakeAvailable", "FetchContent_Populate", "ExternalProject_Add", "CPMAddPackage", "GIT_REPOSITORY", "git clone", ] PATCHED_DEPENDENCY_FILES = [ "cmake/BuildDependencies.cmake", "CMakeLists.txt", "grammar_codegen/generated_antlr_parser/CMakeLists.txt", ] def sha256_file(path: Path) -> str: """Compute SHA256 hex digest of a file.""" digest = hashlib.sha256() with path.open("rb") as f: for chunk in iter(lambda: f.read(CHUNK_SIZE), b""): digest.update(chunk) return digest.hexdigest() def verify_archive(archive_path: Path, expected_sha256: str) -> None: """Verify archive exists and SHA256 matches expected value.""" if not archive_path.exists(): raise SystemExit( (f"archive not found: {archive_path}\n" f"Expected SHA256: {expected_sha256}") ) actual = sha256_file(archive_path) if actual != expected_sha256: raise SystemExit( (f"archive SHA256 mismatch: {archive_path}\n" f" expected: {expected_sha256}\n" f" actual: {actual}\n" f"The pristine archive is corrupt or has been modified.") ) def extract_archive(archive_path: Path, destination: Path) -> Path: """Extract tar.gz archive to destination, return the single root directory.""" with tarfile.open(archive_path, "r:gz") as tar: tar.extractall(destination, filter="data") roots = sorted(p for p in destination.iterdir() if p.is_dir()) if len(roots) != 1: raise SystemExit( (f"expected one archive root in {destination}, found {len(roots)}: " f"{[r.name for r in roots]}") ) return roots[0] def apply_patch(source_root: Path, patch_file: Path) -> None: """Apply a single patch file to source_root using the patch(1) utility.""" patch_bin = shutil.which("patch") if patch_bin is None: raise SystemExit("'patch' executable not found; required for patch application") result = subprocess.run( [patch_bin, "-p1", "-i", str(patch_file)], cwd=source_root, check=False, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) if result.returncode != 0: raise SystemExit( (f"failed to apply patch {patch_file.name}:\n" f" patch path: {patch_file}\n" f" return code: {result.returncode}\n" f" output:\n{result.stdout}\n" f"Hint: The patch may need rebasing onto a newer archive version.") ) def scan_no_network_contract(source_root: Path) -> None: """Verify patched source has no remote dependency acquisition commands.""" findings: list[str] = [] for relative in PATCHED_DEPENDENCY_FILES: path = source_root / relative if not path.exists(): raise SystemExit( (f"patched source file missing: {relative}\n" f" expected at: {path}") ) text = path.read_text(encoding="utf-8") for token in FORBIDDEN_TOKENS: if token in text: findings.append(f"{relative}: forbidden token '{token}'") if findings: raise SystemExit( ("no-network contract violation in patched source:\n" + "\n".join(f" {f}" for f in findings)) ) class PatchEntry(TypedDict): filename: str sha256: str class Manifest(TypedDict): schema_version: int generator: str fuzztest_version: str fuzztest_commit: str archive_path: str archive_sha256: str patches: list[PatchEntry] generated_path: str def build_manifest( *, archive_path: Path, archive_sha256: str, patch_dir: Path, output_dir: Path, ) -> Manifest: """Build manifest data structure.""" patches = sorted(patch_dir.glob("*.patch")) patch_entries: list[PatchEntry] = [] for p in patches: patch_entries.append({ "filename": p.name, "sha256": sha256_file(p), }) return { "schema_version": MANIFEST_SCHEMA_VERSION, "generator": "patch_fuzztest.py", "fuzztest_version": FUZZTEST_VERSION, "fuzztest_commit": FUZZTEST_COMMIT, "archive_path": str(archive_path), "archive_sha256": archive_sha256, "patches": patch_entries, "generated_path": str(output_dir.resolve()), } def manifest_matches( manifest: Manifest, *, archive_sha256: str, patch_dir: Path, ) -> bool: """Check if existing manifest matches current archive+patches+schema.""" if manifest.get("schema_version") != MANIFEST_SCHEMA_VERSION: return False if manifest.get("archive_sha256") != archive_sha256: return False existing_patches: dict[str, str] = { p["filename"]: p["sha256"] for p in manifest.get("patches", []) } current_patches = sorted(patch_dir.glob("*.patch")) if len(current_patches) != len(existing_patches): return False for p in current_patches: if p.name not in existing_patches: return False if existing_patches[p.name] != sha256_file(p): return False return True def materialize( output_dir: Path, *, archive_path: Path | None = None, patch_dir: Path | None = None, force: bool = False, ) -> int: """Materialize patched FuzzTest source tree. Returns 0 on success, non-zero on failure. """ if archive_path is None: archive_path = ARCHIVES_DIR / FUZZTEST_ARCHIVE if patch_dir is None: patch_dir = FUZZTEST_PATCH_DIR expected_sha256 = FUZZTEST_SHA256 # Step 1: Verify pristine archive integrity. print(f"verifying archive: {archive_path}") verify_archive(archive_path, expected_sha256) print(f" SHA256: {expected_sha256}") # Step 2: Check for idempotent reuse. manifest_path = output_dir / MANIFEST_FILENAME if output_dir.exists() and manifest_path.exists() and not force: try: existing = cast(Manifest, json.loads(manifest_path.read_text(encoding="utf-8"))) if manifest_matches( existing, archive_sha256=expected_sha256, patch_dir=patch_dir, ): print(f"reusing existing patched source: {output_dir}") print(f" manifest: {manifest_path}") return 0 else: print(f"manifest stale or mismatched: {manifest_path}", file=sys.stderr) print(" use --force to regenerate", file=sys.stderr) return 1 except (json.JSONDecodeError, KeyError) as exc: print(f"corrupt manifest: {manifest_path}: {exc}", file=sys.stderr) print(" use --force to regenerate", file=sys.stderr) return 1 # Step 3: Discover patches. patches = sorted(patch_dir.glob("*.patch")) if not patches: raise SystemExit(f"no patch files found in {patch_dir}") print(f"patches: {len(patches)} file(s) from {patch_dir}") # Step 4: Materialize via staging directory for atomic replacement. output_parent = output_dir.parent output_parent.mkdir(parents=True, exist_ok=True) staging_dir = output_parent / f"{output_dir.name}.staging.tmp" if staging_dir.exists(): shutil.rmtree(staging_dir) try: print(f"extracting archive to staging: {staging_dir}") source_root = extract_archive(archive_path, staging_dir) print(f" archive root: {source_root.name}") # Apply patches in sorted order. for patch_file in patches: print(f" applying {patch_file.name}") apply_patch(source_root, patch_file) print(f" all {len(patches)} patch(es) applied") # Verify no-network contract. scan_no_network_contract(source_root) print(" no-network contract: passed") # Write manifest inside the source root. manifest = build_manifest( archive_path=archive_path, archive_sha256=expected_sha256, patch_dir=patch_dir, output_dir=output_dir, ) manifest_path_in_staging = source_root / MANIFEST_FILENAME _ = manifest_path_in_staging.write_text( json.dumps(manifest, indent=2) + "\n", encoding="utf-8", ) print(f" manifest: {manifest_path_in_staging}") # Atomic replace: remove old output, rename staging root to output. if output_dir.exists(): shutil.rmtree(output_dir) _ = source_root.rename(output_dir) print(f"patched source materialized: {output_dir}") finally: # Clean up staging parent directory. if staging_dir.exists(): shutil.rmtree(staging_dir) return 0 class CliArgs: """Typed wrapper for parsed CLI arguments.""" materialize: Path | None force: bool archive: Path | None patch_dir: Path | None def __init__(self, namespace: argparse.Namespace) -> None: self.materialize = cast(Path | None, getattr(namespace, "materialize", None)) self.force = cast(bool, getattr(namespace, "force", False)) self.archive = cast(Path | None, getattr(namespace, "archive", None)) self.patch_dir = cast(Path | None, getattr(namespace, "patch_dir", None)) def parse_args(argv: list[str]) -> CliArgs: parser = argparse.ArgumentParser( description=("Deterministic FuzzTest patch materializer. " "Extracts pristine archive, applies patches, writes manifest."), ) _ = parser.add_argument( "--materialize", type=Path, metavar="DIR", help="Materialize patched FuzzTest source tree into DIR.", ) _ = parser.add_argument( "--force", action="store_true", help="Force regeneration even if manifest matches.", ) _ = parser.add_argument( "--archive", type=Path, metavar="PATH", help=f"Override archive path (default: 3rd/archives/{FUZZTEST_ARCHIVE}).", ) _ = parser.add_argument( "--patch-dir", type=Path, metavar="DIR", help=f"Override patch directory (default: {FUZZTEST_PATCH_DIR}).", ) return CliArgs(parser.parse_args(argv)) def main(argv: list[str]) -> int: args = parse_args(argv) if args.materialize is None: print("error: --materialize is required", file=sys.stderr) return 1 return materialize( args.materialize, archive_path=args.archive, patch_dir=args.patch_dir, force=args.force, ) if __name__ == "__main__": sys.exit(main(sys.argv[1:]))