"""Helpers for no-network CMake verification. The helpers deliberately avoid firewall or privilege assumptions. They make implicit network access fail by routing common proxy-aware clients to an unroutable local proxy while using clean build/cache directories so local committed inputs must be sufficient. """ from __future__ import annotations import dataclasses import hashlib import os import pathlib import shutil import subprocess from collections.abc import Iterable, Mapping, Sequence INVALID_PROXY_URL = "http://127.0.0.1:9" NO_PROXY_VALUE = "localhost,127.0.0.1,::1" class NetworkGuardError(RuntimeError): """Base error raised by no-network verification helpers.""" class LocalInputError(NetworkGuardError): """Raised when a required local archive/source input is absent or invalid.""" @dataclasses.dataclass(frozen=True) class CommandResult: """Completed command with captured output.""" command: Sequence[str] cwd: pathlib.Path returncode: int stdout: str stderr: str @dataclasses.dataclass(frozen=True) class ArchiveSpec: """Required local archive with optional sha256 validation.""" name: str path: pathlib.Path sha256: str | None = None @dataclasses.dataclass(frozen=True) class CMakeNoNetworkConfig: """Inputs for an isolated CMake no-network run.""" source_dir: pathlib.Path build_dir: pathlib.Path cpm_cache_dir: pathlib.Path build_type: str = "Debug" extra_cmake_args: Sequence[str] = dataclasses.field(default_factory=tuple) def invalid_proxy_environment(base: Mapping[str, str] | None = None) -> dict[str, str]: """Return an environment that makes proxy-aware network access fail locally.""" env = dict(os.environ if base is None else base) for key in ("HTTP_PROXY", "HTTPS_PROXY", "ALL_PROXY", "http_proxy", "https_proxy", "all_proxy"): env[key] = INVALID_PROXY_URL env["NO_PROXY"] = NO_PROXY_VALUE env["no_proxy"] = NO_PROXY_VALUE return env def reset_directory(path: pathlib.Path) -> None: """Remove and recreate a directory for clean, isolated verification state.""" if path.exists(): if not path.is_dir(): raise NetworkGuardError(f"Refusing to replace non-directory path: {path}") shutil.rmtree(path) path.mkdir(parents=True, exist_ok=True) def prepare_cmake_run(config: CMakeNoNetworkConfig) -> None: """Create clean build and CPM cache directories for an isolated run.""" reset_directory(config.build_dir) reset_directory(config.cpm_cache_dir) def format_command(command: Sequence[str]) -> str: """Format a command for evidence logs without shell interpretation.""" return " ".join(_quote_arg(part) for part in command) def cmake_configure_command(config: CMakeNoNetworkConfig) -> list[str]: """Build the CMake configure command for a no-network run.""" command = [ "cmake", "-S", str(config.source_dir), "-B", str(config.build_dir), f"-DCMAKE_BUILD_TYPE={config.build_type}", f"-DCPM_SOURCE_CACHE={config.cpm_cache_dir}", ] command.extend(config.extra_cmake_args) return command def cmake_build_command(config: CMakeNoNetworkConfig, targets: Sequence[str] = ()) -> list[str]: """Build the CMake build command for a no-network run.""" command = ["cmake", "--build", str(config.build_dir)] for target in targets: command.extend(["--target", target]) return command def run_logged_command( command: Sequence[str], cwd: pathlib.Path, env: Mapping[str, str], log_path: pathlib.Path | None = None, ) -> CommandResult: """Run a command, capture output, and optionally write an evidence log.""" completed = subprocess.run( list(command), cwd=str(cwd), env=dict(env), text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False, ) result = CommandResult( command=tuple(command), cwd=cwd, returncode=completed.returncode, stdout=completed.stdout, stderr=completed.stderr, ) if log_path is not None: write_command_log(result, log_path) return result def write_command_log(result: CommandResult, log_path: pathlib.Path) -> None: """Write a deterministic command log for audit evidence.""" log_path.parent.mkdir(parents=True, exist_ok=True) _ = log_path.write_text( "\n".join( [ f"cwd: {result.cwd}", f"command: {format_command(result.command)}", f"returncode: {result.returncode}", "", "stdout:", result.stdout.rstrip(), "", "stderr:", result.stderr.rstrip(), "", ] ), encoding="utf-8", ) def verify_archive(spec: ArchiveSpec) -> None: """Verify a required local archive exists and matches its expected hash.""" if not spec.path.is_file(): raise LocalInputError(f"Missing required local archive for {spec.name}: {spec.path}") if spec.sha256 is None: return actual = sha256_file(spec.path) if actual.lower() != spec.sha256.lower(): raise LocalInputError( f"SHA256 mismatch for {spec.name}: expected {spec.sha256.lower()}, got {actual.lower()} ({spec.path})" ) def sha256_file(path: pathlib.Path) -> str: """Compute a file sha256 digest with bounded memory usage.""" digest = hashlib.sha256() with path.open("rb") as handle: for chunk in iter(lambda: handle.read(1024 * 1024), b""): digest.update(chunk) return digest.hexdigest() def verify_archives(specs: Iterable[ArchiveSpec]) -> None: """Verify all required local archives.""" for spec in specs: verify_archive(spec) def _quote_arg(value: str) -> str: if not value: return "''" safe_chars = set("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_+-=./:") if all(char in safe_chars for char in value): return value return "'" + value.replace("'", "'\\''") + "'"