Files

623 lines
20 KiB
Python
Raw Permalink Normal View History

2026-05-18 09:41:16 +08:00
#!/usr/bin/env python3
# pyright: reportUnusedCallResult=false, reportAny=false
"""Pinned third-party dependency acquisition, verification, and metadata tool.
Modes:
--list Show inventory status (offline, no mutation).
--check Verify all local archives match known SHA256 (offline).
--fetch [DEPS...] Download pending or named dependencies.
--update-hashes Replace PENDING_T12 in versions.cmake with computed hashes.
--dry-run Show what fetch/update would do without executing.
Legacy commands (preserved):
--verify-fuzztest-baseline Verify the pinned FuzzTest archive SHA256.
--verify-fuzztest-patch Apply FuzzTest patches and scan no-network contract.
--print-metadata fuzztest Print pinned dependency metadata as JSON.
"""
from __future__ import annotations
import argparse
import hashlib
import json
import os
import re
import shutil
import subprocess
import sys
import tarfile
import tempfile
import urllib.request
from pathlib import Path
from typing import cast
ROOT = Path(__file__).resolve().parents[1]
VERSIONS_CMAKE = ROOT / "cmake" / "deps" / "versions.cmake"
ARCHIVES_DIR = ROOT / "3rd" / "archives"
PENDING_TOKEN = "PENDING_T12"
DOWNLOAD_TIMEOUT = 120 # seconds
CHUNK_SIZE = 1024 * 1024 # 1 MiB
FUZZTEST = {
"name": "fuzztest",
"version": "2026-02-19",
"release": "2026-02-19",
"commit": "b73724d4866c22d9b64c152a2d7ac22c7ca94168",
"archive": "fuzztest-2026-02-19.tar.gz",
"url": "https://github.com/google/fuzztest/releases/download/2026-02-19/fuzztest-2026-02-19.tar.gz",
"sha256": "1c6e04065eb988e2c99613369db8294aa58429d392bf479740b237f1255204ef",
"patch_dir": "3rd/patches/fuzztest/2026-02-19",
"required_targets": [
"absl::flat_hash_map",
"absl::random_bit_gen_ref",
"absl::status",
"absl::statusor",
"absl::strings",
"absl::string_view",
"absl::span",
"absl::flags",
"absl::flags_parse",
"re2::re2",
"GTest::gtest",
"GTest::gmock",
"antlr4_static",
],
"required_variables": ["ANTLR4_RUNTIME_INCLUDE_DIR"],
"forbidden_targets": [
"protobuf::libprotobuf",
"nlohmann_json::nlohmann_json",
"flatbuffers",
],
}
FORBIDDEN_PATCHED_TOKENS = [
"FetchContent_Declare",
"FetchContent_MakeAvailable",
"FetchContent_Populate",
"ExternalProject_Add",
"CPMAddPackage",
"GIT_REPOSITORY",
"git clone",
]
PATCHED_DEPENDENCY_FILES = [
"cmake/BuildDependencies.cmake",
"CMakeLists.txt",
"grammar_codegen/generated_antlr_parser/CMakeLists.txt",
]
# ---------------------------------------------------------------------------
# Inventory parsing from versions.cmake
# ---------------------------------------------------------------------------
def parse_inventory(cmake_path: Path = VERSIONS_CMAKE) -> list[dict[str, str]]:
"""Parse dependency entries from versions.cmake into a list of dicts."""
text = cmake_path.read_text(encoding="utf-8")
entries: list[dict[str, str]] = []
# Find all set(CPP_TEMPLATE_<NAME>_VERSION ...) blocks.
# Each dependency starts with a VERSION line; collect variables until next VERSION.
version_pattern = re.compile(
r'set\(CPP_TEMPLATE_(\w+)_VERSION\s+"([^"]+)"\)'
)
var_pattern = re.compile(
r'set\(CPP_TEMPLATE_(\w+)_(\w+)\s+(?:"([^"]*)"|(.+?))\s*\)',
re.MULTILINE,
)
# Split into blocks by finding dependency names from VERSION lines
dep_names: list[str] = []
for match in version_pattern.finditer(text):
name = match.group(1)
dep_names.append(name)
# For each dependency name, extract all its variables
for name in dep_names:
entry: dict[str, str] = {"cmake_name": name}
for match in var_pattern.finditer(text):
var_dep = match.group(1)
var_key = match.group(2)
var_val = match.group(3) if match.group(3) is not None else match.group(4).strip()
if var_dep == name:
entry[var_key] = var_val
if "VERSION" in entry:
entries.append(entry)
return entries
def inventory_to_dep(entry: dict[str, str]) -> dict[str, str]:
"""Normalize a cmake inventory entry into a dependency dict."""
return {
"name": entry["cmake_name"].lower().replace("_", "-"),
"cmake_name": entry["cmake_name"],
"version": entry.get("VERSION", ""),
"url": entry.get("URL", ""),
"archive": entry.get("ARCHIVE", ""),
"sha256": entry.get("SHA256", ""),
"cxx_standard": entry.get("CXX_STANDARD", ""),
"targets": entry.get("TARGETS", ""),
"presets": entry.get("PRESETS", ""),
}
# ---------------------------------------------------------------------------
# SHA256 helpers
# ---------------------------------------------------------------------------
def sha256_file(path: Path) -> str:
digest = hashlib.sha256()
with path.open("rb") as file:
for chunk in iter(lambda: file.read(CHUNK_SIZE), b""):
digest.update(chunk)
return digest.hexdigest()
def sha256_streaming(url: str, dest: Path) -> str:
"""Download url to dest using streaming, return SHA256 hex digest.
Uses a temp file in the same directory for atomic replacement.
"""
tmp_fd, tmp_path = tempfile.mkstemp(
dir=str(dest.parent),
prefix=f".{dest.name}.tmp",
)
tmp_file = Path(tmp_path)
try:
os.close(tmp_fd)
digest = hashlib.sha256()
response = urllib.request.urlopen(url, timeout=DOWNLOAD_TIMEOUT)
with tmp_file.open("wb") as out:
while True:
chunk = response.read(CHUNK_SIZE)
if not chunk:
break
out.write(chunk)
digest.update(chunk)
os.replace(str(tmp_file), str(dest))
return digest.hexdigest()
except BaseException:
# Clean up temp file on any failure
if tmp_file.exists():
tmp_file.unlink()
raise
# ---------------------------------------------------------------------------
# Download / verify
# ---------------------------------------------------------------------------
def download_dep(
dep: dict[str, str],
*,
force: bool = False,
dry_run: bool = False,
) -> str | None:
"""Download and verify one dependency archive.
Returns computed SHA256 on success, None if skipped.
Raises SystemExit on failure.
"""
archive_path = ARCHIVES_DIR / dep["archive"]
expected_hash = dep["sha256"]
is_pending = expected_hash == PENDING_TOKEN
# If archive exists and is not pending, check hash
if archive_path.exists():
actual_hash = sha256_file(archive_path)
if actual_hash == expected_hash:
print(f" ok {dep['archive']} (hash matches)")
return None
if not force and not is_pending:
raise SystemExit(f" FAIL {dep['archive']}: hash mismatch\n expected: {expected_hash}\n actual: {actual_hash}\n use --force to re-download")
if not force and is_pending:
# Archive exists but hash is pending — compute and report
print(f" ok {dep['archive']} (SHA256={actual_hash}, pending verification)")
return actual_hash
if dry_run:
print(f" would download {dep['url']} -> {archive_path}")
return None
print(f" fetching {dep['url']}")
ARCHIVES_DIR.mkdir(parents=True, exist_ok=True)
actual_hash = sha256_streaming(dep["url"], archive_path)
if not is_pending and actual_hash != expected_hash:
# Hash mismatch after download — remove and fail
archive_path.unlink()
raise SystemExit(f" FAIL {dep['archive']}: download hash mismatch\n expected: {expected_hash}\n actual: {actual_hash}")
status = "pending" if is_pending else "verified"
print(f" ok {dep['archive']} (SHA256={actual_hash}, {status})")
return actual_hash
def verify_dep(dep: dict[str, str]) -> str:
"""Verify a single dependency archive. Returns status string."""
archive_path = ARCHIVES_DIR / dep["archive"]
expected_hash = dep["sha256"]
is_pending = expected_hash == PENDING_TOKEN
if not archive_path.exists():
if is_pending:
return "PENDING"
return "MISSING"
actual_hash = sha256_file(archive_path)
if is_pending:
return f"LOCAL:{actual_hash}"
if actual_hash == expected_hash:
return "OK"
return f"CORRUPT:{actual_hash}"
# ---------------------------------------------------------------------------
# versions.cmake hash update
# ---------------------------------------------------------------------------
def update_versions_cmake(
replacements: dict[str, str],
cmake_path: Path = VERSIONS_CMAKE,
*,
dry_run: bool = False,
) -> int:
"""Replace PENDING_T12 hashes in versions.cmake. Returns count of replacements."""
text = cmake_path.read_text(encoding="utf-8")
count = 0
for cmake_name, new_hash in replacements.items():
pattern = f'set(CPP_TEMPLATE_{cmake_name}_SHA256 "{PENDING_TOKEN}")'
replacement = f'set(CPP_TEMPLATE_{cmake_name}_SHA256 "{new_hash}")'
if pattern in text:
if dry_run:
print(f" would update CPP_TEMPLATE_{cmake_name}_SHA256 -> {new_hash}")
else:
text = text.replace(pattern, replacement)
print(f" updated CPP_TEMPLATE_{cmake_name}_SHA256 = {new_hash}")
count += 1
if not dry_run and count > 0:
cmake_path.write_text(text, encoding="utf-8")
return count
# ---------------------------------------------------------------------------
# Legacy FuzzTest functions (preserved from T5)
# ---------------------------------------------------------------------------
def archive_path() -> Path:
return ARCHIVES_DIR / cast(str, FUZZTEST["archive"])
def patch_dir() -> Path:
return ROOT / cast(str, FUZZTEST["patch_dir"])
def verify_fuzztest_baseline() -> None:
path = archive_path()
if not path.exists():
raise SystemExit(
f"missing {path}; acquire the pristine archive from {FUZZTEST['url']} and verify SHA256"
)
actual_sha256 = sha256_file(path)
if actual_sha256 != FUZZTEST["sha256"]:
raise SystemExit(
f"sha256 mismatch for {path}: expected {FUZZTEST['sha256']}, got {actual_sha256}"
)
print(f"verified {path} sha256={actual_sha256}")
def extract_archive(destination: Path) -> Path:
with tarfile.open(archive_path(), "r:gz") as archive:
archive.extractall(destination, filter="data")
roots = [path for path in destination.iterdir() if path.is_dir()]
if len(roots) != 1:
raise SystemExit(f"expected one FuzzTest archive root in {destination}, found {len(roots)}")
return roots[0]
def apply_patch(source_root: Path, patch_file: Path) -> None:
patch_binary = shutil.which("patch")
if patch_binary is None:
raise SystemExit("patch executable is required for --verify-fuzztest-patch")
result = subprocess.run(
[patch_binary, "-p1", "-i", str(patch_file)],
cwd=source_root,
check=False,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
if result.returncode != 0:
raise SystemExit(f"failed to apply {patch_file}:\n{result.stdout}")
def scan_no_network_contract(source_root: Path) -> None:
findings: list[str] = []
for relative in PATCHED_DEPENDENCY_FILES:
path = source_root / relative
if not path.exists():
raise SystemExit(f"patched verification file missing: {relative}")
text = path.read_text(encoding="utf-8")
for token in FORBIDDEN_PATCHED_TOKENS:
if token in text:
findings.append(f"{relative}: forbidden token {token}")
if findings:
raise SystemExit("patched FuzzTest no-network contract failed:\n" + "\n".join(findings))
def verify_fuzztest_patch() -> None:
verify_fuzztest_baseline()
patches = sorted(patch_dir().glob("*.patch"))
if not patches:
raise SystemExit(f"no patch files found in {patch_dir()}")
with tempfile.TemporaryDirectory(prefix="fuzztest-patch-") as temporary:
source_root = extract_archive(Path(temporary))
for patch_file in patches:
apply_patch(source_root, patch_file)
scan_no_network_contract(source_root)
print(f"verified {len(patches)} FuzzTest patch file(s) from {patch_dir()}")
def print_metadata(name: str) -> None:
if name != "fuzztest":
raise SystemExit(f"unknown dependency metadata: {name}")
print(json.dumps(FUZZTEST, indent=2, sort_keys=True))
# ---------------------------------------------------------------------------
# CLI mode implementations
# ---------------------------------------------------------------------------
def cmd_list(_args: argparse.Namespace) -> int:
"""List all dependencies and their archive status."""
entries = parse_inventory()
if not entries:
print("No dependencies found in versions.cmake")
return 0
print(f"{'Dependency':<30} {'Version':<16} {'Archive':<32} {'Status'}")
print("-" * 100)
errors = 0
for entry in entries:
dep = inventory_to_dep(entry)
status = verify_dep(dep)
if status.startswith("CORRUPT") or status == "MISSING":
errors += 1
print(f"{dep['name']:<30} {dep['version']:<16} {dep['archive']:<32} {status}")
print()
concrete = sum(
1 for e in entries
if e.get("SHA256", "") != PENDING_TOKEN
)
pending = len(entries) - concrete
print(f"Total: {len(entries)} dependencies, {concrete} with concrete SHA256, {pending} pending")
return 1 if errors else 0
def cmd_check(_args: argparse.Namespace) -> int:
"""Verify all local archives against known SHA256 hashes. Offline."""
entries = parse_inventory()
if not entries:
print("No dependencies found in versions.cmake")
return 0
errors = 0
pending_count = 0
ok_count = 0
for entry in entries:
dep = inventory_to_dep(entry)
status = verify_dep(dep)
if status == "OK":
ok_count += 1
elif status == "PENDING":
pending_count += 1
elif status.startswith("LOCAL:"):
pending_count += 1
elif status == "MISSING":
print(f" MISSING {dep['archive']}: archive not found in {ARCHIVES_DIR}")
errors += 1
elif status.startswith("CORRUPT:"):
actual = status.split(":", 1)[1]
print(f" CORRUPT {dep['archive']}")
print(f" expected: {dep['sha256']}")
print(f" actual: {actual}")
errors += 1
print()
print(f"Checked {len(entries)} dependencies: {ok_count} ok, {pending_count} pending, {errors} errors")
if errors:
print("FAIL: archive verification failed")
return 1
print("PASS: all concrete hashes verified")
return 0
def cmd_fetch(args: argparse.Namespace) -> int:
"""Download pending or selected dependencies."""
entries = parse_inventory()
if not entries:
print("No dependencies found in versions.cmake")
return 0
# Filter by requested names if any
if args.deps:
name_map = {inventory_to_dep(e)["name"]: e for e in entries}
cmake_name_map = {e["cmake_name"].lower(): e for e in entries}
selected: list[dict[str, str]] = []
for name in args.deps:
dep_name = name.lower().replace("_", "-")
if dep_name in name_map:
selected.append(name_map[dep_name])
elif name.upper() in {e["cmake_name"] for e in entries}:
selected.append(cmake_name_map[name.upper()])
else:
print(f" unknown dependency: {name}")
return 1
entries = selected
dry_run = args.dry_run
force = getattr(args, "force", False)
computed_hashes: dict[str, str] = {}
for entry in entries:
dep = inventory_to_dep(entry)
print(f"[{dep['name']}]")
result = download_dep(dep, force=force, dry_run=dry_run)
if result is not None:
computed_hashes[entry["cmake_name"]] = result
if computed_hashes and not dry_run:
print()
print(f"Fetched {len(computed_hashes)} archives with new hashes.")
print("Run 'fetch_deps.py --update-hashes' to commit hashes to versions.cmake")
return 0
def cmd_update_hashes(args: argparse.Namespace) -> int:
"""Replace PENDING_T12 entries in versions.cmake with computed hashes."""
entries = parse_inventory()
if not entries:
print("No dependencies found in versions.cmake")
return 0
dry_run = args.dry_run
replacements: dict[str, str] = {}
for entry in entries:
dep = inventory_to_dep(entry)
if entry.get("SHA256") != PENDING_TOKEN:
continue
archive_path = ARCHIVES_DIR / dep["archive"]
if not archive_path.exists():
print(f" skip {dep['name']}: archive not found at {archive_path}")
continue
actual_hash = sha256_file(archive_path)
replacements[entry["cmake_name"]] = actual_hash
if not replacements:
print("No PENDING_T12 entries with local archives to update.")
return 0
count = update_versions_cmake(replacements, dry_run=dry_run)
if dry_run:
print(f"\nWould update {count} hashes in {VERSIONS_CMAKE}")
else:
print(f"\nUpdated {count} hashes in {VERSIONS_CMAKE}")
return 0
# ---------------------------------------------------------------------------
# CLI parsing
# ---------------------------------------------------------------------------
def parse_args(argv: list[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Pinned third-party dependency acquisition, verification, and metadata tool.",
)
parser.add_argument(
"--list", action="store_true",
help="show dependency inventory status (offline, no mutation)",
)
parser.add_argument(
"--check", action="store_true",
help="verify all local archives against known SHA256 (offline)",
)
parser.add_argument(
"--fetch", action="store_true",
help="download pending or selected dependencies",
)
parser.add_argument(
"--update-hashes", action="store_true",
help="replace PENDING_T12 in versions.cmake with computed hashes",
)
parser.add_argument(
"--verify-fuzztest-baseline", action="store_true",
help="verify the pinned FuzzTest archive SHA256",
)
parser.add_argument(
"--verify-fuzztest-patch", action="store_true",
help="apply FuzzTest patches and scan no-network contract",
)
parser.add_argument(
"--print-metadata", choices=["fuzztest"],
help="print pinned dependency metadata as JSON",
)
parser.add_argument(
"deps", nargs="*",
help="specific dependency names to operate on (for --fetch)",
)
parser.add_argument(
"--dry-run", action="store_true",
help="show what would be done without executing (for --fetch, --update-hashes)",
)
parser.add_argument(
"--force", action="store_true",
help="re-download even if archive exists (for --fetch)",
)
return parser.parse_args(argv)
def main(argv: list[str]) -> int:
args = parse_args(argv)
did_work = False
rc = 0
# Legacy FuzzTest actions run first in deterministic order
if args.verify_fuzztest_baseline:
verify_fuzztest_baseline()
did_work = True
if args.verify_fuzztest_patch:
verify_fuzztest_patch()
did_work = True
if args.print_metadata:
print_metadata(args.print_metadata)
did_work = True
# Inventory mode actions
if args.list:
rc = cmd_list(args)
did_work = True
if args.check:
rc = cmd_check(args)
did_work = True
if args.fetch:
rc = cmd_fetch(args)
did_work = True
if args.update_hashes:
rc = cmd_update_hashes(args)
did_work = True
if not did_work:
raise SystemExit("no action requested; use --help")
return rc
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))