Files
2026-05-18 09:41:16 +08:00

212 lines
6.0 KiB
Python

"""Helpers for no-network CMake verification.
The helpers deliberately avoid firewall or privilege assumptions. They make implicit
network access fail by routing common proxy-aware clients to an unroutable local
proxy while using clean build/cache directories so local committed inputs must be
sufficient.
"""
from __future__ import annotations
import dataclasses
import hashlib
import os
import pathlib
import shutil
import subprocess
from collections.abc import Iterable, Mapping, Sequence
INVALID_PROXY_URL = "http://127.0.0.1:9"
NO_PROXY_VALUE = "localhost,127.0.0.1,::1"
class NetworkGuardError(RuntimeError):
"""Base error raised by no-network verification helpers."""
class LocalInputError(NetworkGuardError):
"""Raised when a required local archive/source input is absent or invalid."""
@dataclasses.dataclass(frozen=True)
class CommandResult:
"""Completed command with captured output."""
command: Sequence[str]
cwd: pathlib.Path
returncode: int
stdout: str
stderr: str
@dataclasses.dataclass(frozen=True)
class ArchiveSpec:
"""Required local archive with optional sha256 validation."""
name: str
path: pathlib.Path
sha256: str | None = None
@dataclasses.dataclass(frozen=True)
class CMakeNoNetworkConfig:
"""Inputs for an isolated CMake no-network run."""
source_dir: pathlib.Path
build_dir: pathlib.Path
cpm_cache_dir: pathlib.Path
build_type: str = "Debug"
extra_cmake_args: Sequence[str] = dataclasses.field(default_factory=tuple)
def invalid_proxy_environment(base: Mapping[str, str] | None = None) -> dict[str, str]:
"""Return an environment that makes proxy-aware network access fail locally."""
env = dict(os.environ if base is None else base)
for key in ("HTTP_PROXY", "HTTPS_PROXY", "ALL_PROXY", "http_proxy", "https_proxy", "all_proxy"):
env[key] = INVALID_PROXY_URL
env["NO_PROXY"] = NO_PROXY_VALUE
env["no_proxy"] = NO_PROXY_VALUE
return env
def reset_directory(path: pathlib.Path) -> None:
"""Remove and recreate a directory for clean, isolated verification state."""
if path.exists():
if not path.is_dir():
raise NetworkGuardError(f"Refusing to replace non-directory path: {path}")
shutil.rmtree(path)
path.mkdir(parents=True, exist_ok=True)
def prepare_cmake_run(config: CMakeNoNetworkConfig) -> None:
"""Create clean build and CPM cache directories for an isolated run."""
reset_directory(config.build_dir)
reset_directory(config.cpm_cache_dir)
def format_command(command: Sequence[str]) -> str:
"""Format a command for evidence logs without shell interpretation."""
return " ".join(_quote_arg(part) for part in command)
def cmake_configure_command(config: CMakeNoNetworkConfig) -> list[str]:
"""Build the CMake configure command for a no-network run."""
command = [
"cmake",
"-S",
str(config.source_dir),
"-B",
str(config.build_dir),
f"-DCMAKE_BUILD_TYPE={config.build_type}",
f"-DCPM_SOURCE_CACHE={config.cpm_cache_dir}",
]
command.extend(config.extra_cmake_args)
return command
def cmake_build_command(config: CMakeNoNetworkConfig, targets: Sequence[str] = ()) -> list[str]:
"""Build the CMake build command for a no-network run."""
command = ["cmake", "--build", str(config.build_dir)]
for target in targets:
command.extend(["--target", target])
return command
def run_logged_command(
command: Sequence[str],
cwd: pathlib.Path,
env: Mapping[str, str],
log_path: pathlib.Path | None = None,
) -> CommandResult:
"""Run a command, capture output, and optionally write an evidence log."""
completed = subprocess.run(
list(command),
cwd=str(cwd),
env=dict(env),
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False,
)
result = CommandResult(
command=tuple(command),
cwd=cwd,
returncode=completed.returncode,
stdout=completed.stdout,
stderr=completed.stderr,
)
if log_path is not None:
write_command_log(result, log_path)
return result
def write_command_log(result: CommandResult, log_path: pathlib.Path) -> None:
"""Write a deterministic command log for audit evidence."""
log_path.parent.mkdir(parents=True, exist_ok=True)
_ = log_path.write_text(
"\n".join(
[
f"cwd: {result.cwd}",
f"command: {format_command(result.command)}",
f"returncode: {result.returncode}",
"",
"stdout:",
result.stdout.rstrip(),
"",
"stderr:",
result.stderr.rstrip(),
"",
]
),
encoding="utf-8",
)
def verify_archive(spec: ArchiveSpec) -> None:
"""Verify a required local archive exists and matches its expected hash."""
if not spec.path.is_file():
raise LocalInputError(f"Missing required local archive for {spec.name}: {spec.path}")
if spec.sha256 is None:
return
actual = sha256_file(spec.path)
if actual.lower() != spec.sha256.lower():
raise LocalInputError(
f"SHA256 mismatch for {spec.name}: expected {spec.sha256.lower()}, got {actual.lower()} ({spec.path})"
)
def sha256_file(path: pathlib.Path) -> str:
"""Compute a file sha256 digest with bounded memory usage."""
digest = hashlib.sha256()
with path.open("rb") as handle:
for chunk in iter(lambda: handle.read(1024 * 1024), b""):
digest.update(chunk)
return digest.hexdigest()
def verify_archives(specs: Iterable[ArchiveSpec]) -> None:
"""Verify all required local archives."""
for spec in specs:
verify_archive(spec)
def _quote_arg(value: str) -> str:
if not value:
return "''"
safe_chars = set("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_+-=./:")
if all(char in safe_chars for char in value):
return value
return "'" + value.replace("'", "'\\''") + "'"