Files
cpp-project-template/scripts/patch_fuzztest.py
T

380 lines
12 KiB
Python
Raw Normal View History

2026-05-18 09:41:16 +08:00
#!/usr/bin/env python3
"""Deterministic FuzzTest patch materializer.
Extracts the pristine FuzzTest archive into a build-tree source directory,
applies patch files, verifies the no-network contract, and writes a manifest
for idempotent reuse. Designed to be called from CMake configure and from CLI.
Usage:
python3 scripts/patch_fuzztest.py --help
python3 scripts/patch_fuzztest.py --materialize <dir> [--force]
"""
from __future__ import annotations
import argparse
import hashlib
import json
import shutil
import subprocess
import sys
import tarfile
from pathlib import Path
from typing import TypedDict, cast
ROOT = Path(__file__).resolve().parents[1]
ARCHIVES_DIR = ROOT / "3rd" / "archives"
MANIFEST_FILENAME = ".patch-manifest.json"
MANIFEST_SCHEMA_VERSION = 1
CHUNK_SIZE = 1024 * 1024 # 1 MiB
# Pinned FuzzTest metadata — must match cmake/deps/fuzztest.cmake and fetch_deps.py.
FUZZTEST_VERSION = "2026-02-19"
FUZZTEST_COMMIT = "b73724d4866c22d9b64c152a2d7ac22c7ca94168"
FUZZTEST_ARCHIVE = "fuzztest-2026-02-19.tar.gz"
FUZZTEST_SHA256 = "1c6e04065eb988e2c99613369db8294aa58429d392bf479740b237f1255204ef"
FUZZTEST_PATCH_DIR = ROOT / "3rd" / "patches" / "fuzztest" / "2026-02-19"
# Forbidden tokens in patched dependency files (no-network contract).
FORBIDDEN_TOKENS = [
"FetchContent_Declare",
"FetchContent_MakeAvailable",
"FetchContent_Populate",
"ExternalProject_Add",
"CPMAddPackage",
"GIT_REPOSITORY",
"git clone",
]
PATCHED_DEPENDENCY_FILES = [
"cmake/BuildDependencies.cmake",
"CMakeLists.txt",
"grammar_codegen/generated_antlr_parser/CMakeLists.txt",
]
def sha256_file(path: Path) -> str:
"""Compute SHA256 hex digest of a file."""
digest = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(CHUNK_SIZE), b""):
digest.update(chunk)
return digest.hexdigest()
def verify_archive(archive_path: Path, expected_sha256: str) -> None:
"""Verify archive exists and SHA256 matches expected value."""
if not archive_path.exists():
raise SystemExit(
(f"archive not found: {archive_path}\n"
f"Expected SHA256: {expected_sha256}")
)
actual = sha256_file(archive_path)
if actual != expected_sha256:
raise SystemExit(
(f"archive SHA256 mismatch: {archive_path}\n"
f" expected: {expected_sha256}\n"
f" actual: {actual}\n"
f"The pristine archive is corrupt or has been modified.")
)
def extract_archive(archive_path: Path, destination: Path) -> Path:
"""Extract tar.gz archive to destination, return the single root directory."""
with tarfile.open(archive_path, "r:gz") as tar:
tar.extractall(destination, filter="data")
roots = sorted(p for p in destination.iterdir() if p.is_dir())
if len(roots) != 1:
raise SystemExit(
(f"expected one archive root in {destination}, found {len(roots)}: "
f"{[r.name for r in roots]}")
)
return roots[0]
def apply_patch(source_root: Path, patch_file: Path) -> None:
"""Apply a single patch file to source_root using the patch(1) utility."""
patch_bin = shutil.which("patch")
if patch_bin is None:
raise SystemExit("'patch' executable not found; required for patch application")
result = subprocess.run(
[patch_bin, "-p1", "-i", str(patch_file)],
cwd=source_root,
check=False,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
if result.returncode != 0:
raise SystemExit(
(f"failed to apply patch {patch_file.name}:\n"
f" patch path: {patch_file}\n"
f" return code: {result.returncode}\n"
f" output:\n{result.stdout}\n"
f"Hint: The patch may need rebasing onto a newer archive version.")
)
def scan_no_network_contract(source_root: Path) -> None:
"""Verify patched source has no remote dependency acquisition commands."""
findings: list[str] = []
for relative in PATCHED_DEPENDENCY_FILES:
path = source_root / relative
if not path.exists():
raise SystemExit(
(f"patched source file missing: {relative}\n"
f" expected at: {path}")
)
text = path.read_text(encoding="utf-8")
for token in FORBIDDEN_TOKENS:
if token in text:
findings.append(f"{relative}: forbidden token '{token}'")
if findings:
raise SystemExit(
("no-network contract violation in patched source:\n"
+ "\n".join(f" {f}" for f in findings))
)
class PatchEntry(TypedDict):
filename: str
sha256: str
class Manifest(TypedDict):
schema_version: int
generator: str
fuzztest_version: str
fuzztest_commit: str
archive_path: str
archive_sha256: str
patches: list[PatchEntry]
generated_path: str
def build_manifest(
*,
archive_path: Path,
archive_sha256: str,
patch_dir: Path,
output_dir: Path,
) -> Manifest:
"""Build manifest data structure."""
patches = sorted(patch_dir.glob("*.patch"))
patch_entries: list[PatchEntry] = []
for p in patches:
patch_entries.append({
"filename": p.name,
"sha256": sha256_file(p),
})
return {
"schema_version": MANIFEST_SCHEMA_VERSION,
"generator": "patch_fuzztest.py",
"fuzztest_version": FUZZTEST_VERSION,
"fuzztest_commit": FUZZTEST_COMMIT,
"archive_path": str(archive_path),
"archive_sha256": archive_sha256,
"patches": patch_entries,
"generated_path": str(output_dir.resolve()),
}
def manifest_matches(
manifest: Manifest,
*,
archive_sha256: str,
patch_dir: Path,
) -> bool:
"""Check if existing manifest matches current archive+patches+schema."""
if manifest.get("schema_version") != MANIFEST_SCHEMA_VERSION:
return False
if manifest.get("archive_sha256") != archive_sha256:
return False
existing_patches: dict[str, str] = {
p["filename"]: p["sha256"] for p in manifest.get("patches", [])
}
current_patches = sorted(patch_dir.glob("*.patch"))
if len(current_patches) != len(existing_patches):
return False
for p in current_patches:
if p.name not in existing_patches:
return False
if existing_patches[p.name] != sha256_file(p):
return False
return True
def materialize(
output_dir: Path,
*,
archive_path: Path | None = None,
patch_dir: Path | None = None,
force: bool = False,
) -> int:
"""Materialize patched FuzzTest source tree.
Returns 0 on success, non-zero on failure.
"""
if archive_path is None:
archive_path = ARCHIVES_DIR / FUZZTEST_ARCHIVE
if patch_dir is None:
patch_dir = FUZZTEST_PATCH_DIR
expected_sha256 = FUZZTEST_SHA256
# Step 1: Verify pristine archive integrity.
print(f"verifying archive: {archive_path}")
verify_archive(archive_path, expected_sha256)
print(f" SHA256: {expected_sha256}")
# Step 2: Check for idempotent reuse.
manifest_path = output_dir / MANIFEST_FILENAME
if output_dir.exists() and manifest_path.exists() and not force:
try:
existing = cast(Manifest, json.loads(manifest_path.read_text(encoding="utf-8")))
if manifest_matches(
existing,
archive_sha256=expected_sha256,
patch_dir=patch_dir,
):
print(f"reusing existing patched source: {output_dir}")
print(f" manifest: {manifest_path}")
return 0
else:
print(f"manifest stale or mismatched: {manifest_path}", file=sys.stderr)
print(" use --force to regenerate", file=sys.stderr)
return 1
except (json.JSONDecodeError, KeyError) as exc:
print(f"corrupt manifest: {manifest_path}: {exc}", file=sys.stderr)
print(" use --force to regenerate", file=sys.stderr)
return 1
# Step 3: Discover patches.
patches = sorted(patch_dir.glob("*.patch"))
if not patches:
raise SystemExit(f"no patch files found in {patch_dir}")
print(f"patches: {len(patches)} file(s) from {patch_dir}")
# Step 4: Materialize via staging directory for atomic replacement.
output_parent = output_dir.parent
output_parent.mkdir(parents=True, exist_ok=True)
staging_dir = output_parent / f"{output_dir.name}.staging.tmp"
if staging_dir.exists():
shutil.rmtree(staging_dir)
try:
print(f"extracting archive to staging: {staging_dir}")
source_root = extract_archive(archive_path, staging_dir)
print(f" archive root: {source_root.name}")
# Apply patches in sorted order.
for patch_file in patches:
print(f" applying {patch_file.name}")
apply_patch(source_root, patch_file)
print(f" all {len(patches)} patch(es) applied")
# Verify no-network contract.
scan_no_network_contract(source_root)
print(" no-network contract: passed")
# Write manifest inside the source root.
manifest = build_manifest(
archive_path=archive_path,
archive_sha256=expected_sha256,
patch_dir=patch_dir,
output_dir=output_dir,
)
manifest_path_in_staging = source_root / MANIFEST_FILENAME
_ = manifest_path_in_staging.write_text(
json.dumps(manifest, indent=2) + "\n",
encoding="utf-8",
)
print(f" manifest: {manifest_path_in_staging}")
# Atomic replace: remove old output, rename staging root to output.
if output_dir.exists():
shutil.rmtree(output_dir)
_ = source_root.rename(output_dir)
print(f"patched source materialized: {output_dir}")
finally:
# Clean up staging parent directory.
if staging_dir.exists():
shutil.rmtree(staging_dir)
return 0
class CliArgs:
"""Typed wrapper for parsed CLI arguments."""
materialize: Path | None
force: bool
archive: Path | None
patch_dir: Path | None
def __init__(self, namespace: argparse.Namespace) -> None:
self.materialize = cast(Path | None, getattr(namespace, "materialize", None))
self.force = cast(bool, getattr(namespace, "force", False))
self.archive = cast(Path | None, getattr(namespace, "archive", None))
self.patch_dir = cast(Path | None, getattr(namespace, "patch_dir", None))
def parse_args(argv: list[str]) -> CliArgs:
parser = argparse.ArgumentParser(
description=("Deterministic FuzzTest patch materializer. "
"Extracts pristine archive, applies patches, writes manifest."),
)
_ = parser.add_argument(
"--materialize",
type=Path,
metavar="DIR",
help="Materialize patched FuzzTest source tree into DIR.",
)
_ = parser.add_argument(
"--force",
action="store_true",
help="Force regeneration even if manifest matches.",
)
_ = parser.add_argument(
"--archive",
type=Path,
metavar="PATH",
help=f"Override archive path (default: 3rd/archives/{FUZZTEST_ARCHIVE}).",
)
_ = parser.add_argument(
"--patch-dir",
type=Path,
metavar="DIR",
help=f"Override patch directory (default: {FUZZTEST_PATCH_DIR}).",
)
return CliArgs(parser.parse_args(argv))
def main(argv: list[str]) -> int:
args = parse_args(argv)
if args.materialize is None:
print("error: --materialize is required", file=sys.stderr)
return 1
return materialize(
args.materialize,
archive_path=args.archive,
patch_dir=args.patch_dir,
force=args.force,
)
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))