212 lines
6.0 KiB
Python
212 lines
6.0 KiB
Python
"""Helpers for no-network CMake verification.
|
|
|
|
The helpers deliberately avoid firewall or privilege assumptions. They make implicit
|
|
network access fail by routing common proxy-aware clients to an unroutable local
|
|
proxy while using clean build/cache directories so local committed inputs must be
|
|
sufficient.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import dataclasses
|
|
import hashlib
|
|
import os
|
|
import pathlib
|
|
import shutil
|
|
import subprocess
|
|
from collections.abc import Iterable, Mapping, Sequence
|
|
|
|
INVALID_PROXY_URL = "http://127.0.0.1:9"
|
|
NO_PROXY_VALUE = "localhost,127.0.0.1,::1"
|
|
|
|
|
|
class NetworkGuardError(RuntimeError):
|
|
"""Base error raised by no-network verification helpers."""
|
|
|
|
|
|
class LocalInputError(NetworkGuardError):
|
|
"""Raised when a required local archive/source input is absent or invalid."""
|
|
|
|
|
|
@dataclasses.dataclass(frozen=True)
|
|
class CommandResult:
|
|
"""Completed command with captured output."""
|
|
|
|
command: Sequence[str]
|
|
cwd: pathlib.Path
|
|
returncode: int
|
|
stdout: str
|
|
stderr: str
|
|
|
|
|
|
@dataclasses.dataclass(frozen=True)
|
|
class ArchiveSpec:
|
|
"""Required local archive with optional sha256 validation."""
|
|
|
|
name: str
|
|
path: pathlib.Path
|
|
sha256: str | None = None
|
|
|
|
|
|
@dataclasses.dataclass(frozen=True)
|
|
class CMakeNoNetworkConfig:
|
|
"""Inputs for an isolated CMake no-network run."""
|
|
|
|
source_dir: pathlib.Path
|
|
build_dir: pathlib.Path
|
|
cpm_cache_dir: pathlib.Path
|
|
build_type: str = "Debug"
|
|
extra_cmake_args: Sequence[str] = dataclasses.field(default_factory=tuple)
|
|
|
|
|
|
def invalid_proxy_environment(base: Mapping[str, str] | None = None) -> dict[str, str]:
|
|
"""Return an environment that makes proxy-aware network access fail locally."""
|
|
|
|
env = dict(os.environ if base is None else base)
|
|
for key in ("HTTP_PROXY", "HTTPS_PROXY", "ALL_PROXY", "http_proxy", "https_proxy", "all_proxy"):
|
|
env[key] = INVALID_PROXY_URL
|
|
env["NO_PROXY"] = NO_PROXY_VALUE
|
|
env["no_proxy"] = NO_PROXY_VALUE
|
|
return env
|
|
|
|
|
|
def reset_directory(path: pathlib.Path) -> None:
|
|
"""Remove and recreate a directory for clean, isolated verification state."""
|
|
|
|
if path.exists():
|
|
if not path.is_dir():
|
|
raise NetworkGuardError(f"Refusing to replace non-directory path: {path}")
|
|
shutil.rmtree(path)
|
|
path.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def prepare_cmake_run(config: CMakeNoNetworkConfig) -> None:
|
|
"""Create clean build and CPM cache directories for an isolated run."""
|
|
|
|
reset_directory(config.build_dir)
|
|
reset_directory(config.cpm_cache_dir)
|
|
|
|
|
|
def format_command(command: Sequence[str]) -> str:
|
|
"""Format a command for evidence logs without shell interpretation."""
|
|
|
|
return " ".join(_quote_arg(part) for part in command)
|
|
|
|
|
|
def cmake_configure_command(config: CMakeNoNetworkConfig) -> list[str]:
|
|
"""Build the CMake configure command for a no-network run."""
|
|
|
|
command = [
|
|
"cmake",
|
|
"-S",
|
|
str(config.source_dir),
|
|
"-B",
|
|
str(config.build_dir),
|
|
f"-DCMAKE_BUILD_TYPE={config.build_type}",
|
|
f"-DCPM_SOURCE_CACHE={config.cpm_cache_dir}",
|
|
]
|
|
command.extend(config.extra_cmake_args)
|
|
return command
|
|
|
|
|
|
def cmake_build_command(config: CMakeNoNetworkConfig, targets: Sequence[str] = ()) -> list[str]:
|
|
"""Build the CMake build command for a no-network run."""
|
|
|
|
command = ["cmake", "--build", str(config.build_dir)]
|
|
for target in targets:
|
|
command.extend(["--target", target])
|
|
return command
|
|
|
|
|
|
def run_logged_command(
|
|
command: Sequence[str],
|
|
cwd: pathlib.Path,
|
|
env: Mapping[str, str],
|
|
log_path: pathlib.Path | None = None,
|
|
) -> CommandResult:
|
|
"""Run a command, capture output, and optionally write an evidence log."""
|
|
|
|
completed = subprocess.run(
|
|
list(command),
|
|
cwd=str(cwd),
|
|
env=dict(env),
|
|
text=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
check=False,
|
|
)
|
|
result = CommandResult(
|
|
command=tuple(command),
|
|
cwd=cwd,
|
|
returncode=completed.returncode,
|
|
stdout=completed.stdout,
|
|
stderr=completed.stderr,
|
|
)
|
|
if log_path is not None:
|
|
write_command_log(result, log_path)
|
|
return result
|
|
|
|
|
|
def write_command_log(result: CommandResult, log_path: pathlib.Path) -> None:
|
|
"""Write a deterministic command log for audit evidence."""
|
|
|
|
log_path.parent.mkdir(parents=True, exist_ok=True)
|
|
_ = log_path.write_text(
|
|
"\n".join(
|
|
[
|
|
f"cwd: {result.cwd}",
|
|
f"command: {format_command(result.command)}",
|
|
f"returncode: {result.returncode}",
|
|
"",
|
|
"stdout:",
|
|
result.stdout.rstrip(),
|
|
"",
|
|
"stderr:",
|
|
result.stderr.rstrip(),
|
|
"",
|
|
]
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
|
|
def verify_archive(spec: ArchiveSpec) -> None:
|
|
"""Verify a required local archive exists and matches its expected hash."""
|
|
|
|
if not spec.path.is_file():
|
|
raise LocalInputError(f"Missing required local archive for {spec.name}: {spec.path}")
|
|
if spec.sha256 is None:
|
|
return
|
|
|
|
actual = sha256_file(spec.path)
|
|
if actual.lower() != spec.sha256.lower():
|
|
raise LocalInputError(
|
|
f"SHA256 mismatch for {spec.name}: expected {spec.sha256.lower()}, got {actual.lower()} ({spec.path})"
|
|
)
|
|
|
|
|
|
def sha256_file(path: pathlib.Path) -> str:
|
|
"""Compute a file sha256 digest with bounded memory usage."""
|
|
|
|
digest = hashlib.sha256()
|
|
with path.open("rb") as handle:
|
|
for chunk in iter(lambda: handle.read(1024 * 1024), b""):
|
|
digest.update(chunk)
|
|
return digest.hexdigest()
|
|
|
|
|
|
def verify_archives(specs: Iterable[ArchiveSpec]) -> None:
|
|
"""Verify all required local archives."""
|
|
|
|
for spec in specs:
|
|
verify_archive(spec)
|
|
|
|
|
|
def _quote_arg(value: str) -> str:
|
|
if not value:
|
|
return "''"
|
|
safe_chars = set("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_+-=./:")
|
|
if all(char in safe_chars for char in value):
|
|
return value
|
|
return "'" + value.replace("'", "'\\''") + "'"
|