diff --git a/CHANGELOG.md b/CHANGELOG.md index 675a9f9b5b..5015c07d07 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Agents: add a safety timeout around embedded `session.compact()` to ensure stalled compaction runs settle and release blocked session lanes. (#16331) Thanks @BinHPdev. - Agents: keep unresolved mutating tool failures visible until the same action retry succeeds, scope mutation-error surfacing to mutating calls (including `session_status` model changes), and dedupe duplicate failure warnings in outbound replies. (#16131) Thanks @Swader. - Agents: classify external timeout aborts during compaction the same as internal timeouts, preventing unnecessary auth-profile rotation and preserving compaction-timeout snapshot fallback behavior. (#9855) Thanks @mverrilli. - Sessions/Agents: harden transcript path resolution for mismatched agent context by preserving explicit store roots and adding safe absolute-path fallback to the correct agent sessions directory. (#16288) Thanks @robbyczgw-cla. diff --git a/src/agents/pi-embedded-runner.compaction-safety-timeout.test.ts b/src/agents/pi-embedded-runner.compaction-safety-timeout.test.ts new file mode 100644 index 0000000000..31906dd733 --- /dev/null +++ b/src/agents/pi-embedded-runner.compaction-safety-timeout.test.ts @@ -0,0 +1,45 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { + compactWithSafetyTimeout, + EMBEDDED_COMPACTION_TIMEOUT_MS, +} from "./pi-embedded-runner/compaction-safety-timeout.js"; + +describe("compactWithSafetyTimeout", () => { + afterEach(() => { + vi.useRealTimers(); + }); + + it("rejects with timeout when compaction never settles", async () => { + vi.useFakeTimers(); + const compactPromise = compactWithSafetyTimeout(() => new Promise(() => {})); + const timeoutAssertion = expect(compactPromise).rejects.toThrow("Compaction timed out"); + + await vi.advanceTimersByTimeAsync(EMBEDDED_COMPACTION_TIMEOUT_MS); + await timeoutAssertion; + expect(vi.getTimerCount()).toBe(0); + }); + + it("returns result and clears timer when compaction settles first", async () => { + vi.useFakeTimers(); + const compactPromise = compactWithSafetyTimeout( + () => new Promise((resolve) => setTimeout(() => resolve("ok"), 10)), + 30, + ); + + await vi.advanceTimersByTimeAsync(10); + await expect(compactPromise).resolves.toBe("ok"); + expect(vi.getTimerCount()).toBe(0); + }); + + it("preserves compaction errors and clears timer", async () => { + vi.useFakeTimers(); + const error = new Error("provider exploded"); + + await expect( + compactWithSafetyTimeout(async () => { + throw error; + }, 30), + ).rejects.toBe(error); + expect(vi.getTimerCount()).toBe(0); + }); +}); diff --git a/src/agents/pi-embedded-runner/compact.ts b/src/agents/pi-embedded-runner/compact.ts index 1ada5c626a..6a100bd84a 100644 --- a/src/agents/pi-embedded-runner/compact.ts +++ b/src/agents/pi-embedded-runner/compact.ts @@ -57,6 +57,7 @@ import { type SkillSnapshot, } from "../skills.js"; import { resolveTranscriptPolicy } from "../transcript-policy.js"; +import { compactWithSafetyTimeout } from "./compaction-safety-timeout.js"; import { buildEmbeddedExtensionPaths } from "./extensions.js"; import { logToolSchemasForGoogle, @@ -107,7 +108,7 @@ export type CompactEmbeddedPiSessionParams = { reasoningLevel?: ReasoningLevel; bashElevated?: ExecElevatedDefaults; customInstructions?: string; - trigger?: "overflow" | "manual" | "cache_ttl" | "safeguard"; + trigger?: "overflow" | "manual"; diagId?: string; attempt?: number; maxAttempts?: number; @@ -632,7 +633,9 @@ export async function compactEmbeddedPiSessionDirect( } const compactStartedAt = Date.now(); - const result = await session.compact(params.customInstructions); + const result = await compactWithSafetyTimeout(() => + session.compact(params.customInstructions), + ); // Estimate tokens after compaction by summing token estimates for remaining messages let tokensAfter: number | undefined; try { diff --git a/src/agents/pi-embedded-runner/compaction-safety-timeout.ts b/src/agents/pi-embedded-runner/compaction-safety-timeout.ts new file mode 100644 index 0000000000..689aa9a931 --- /dev/null +++ b/src/agents/pi-embedded-runner/compaction-safety-timeout.ts @@ -0,0 +1,10 @@ +import { withTimeout } from "../../node-host/with-timeout.js"; + +export const EMBEDDED_COMPACTION_TIMEOUT_MS = 300_000; + +export async function compactWithSafetyTimeout( + compact: () => Promise, + timeoutMs: number = EMBEDDED_COMPACTION_TIMEOUT_MS, +): Promise { + return await withTimeout(() => compact(), timeoutMs, "Compaction"); +} diff --git a/src/agents/pi-embedded-runner/run.overflow-compaction.test.ts b/src/agents/pi-embedded-runner/run.overflow-compaction.test.ts new file mode 100644 index 0000000000..1a2c297917 --- /dev/null +++ b/src/agents/pi-embedded-runner/run.overflow-compaction.test.ts @@ -0,0 +1,240 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +vi.mock("./run/attempt.js", () => ({ + runEmbeddedAttempt: vi.fn(), +})); + +vi.mock("./compact.js", () => ({ + compactEmbeddedPiSessionDirect: vi.fn(), +})); + +vi.mock("./model.js", () => ({ + resolveModel: vi.fn(() => ({ + model: { + id: "test-model", + provider: "anthropic", + contextWindow: 200000, + api: "messages", + }, + error: null, + authStorage: { + setRuntimeApiKey: vi.fn(), + }, + modelRegistry: {}, + })), +})); + +vi.mock("../model-auth.js", () => ({ + ensureAuthProfileStore: vi.fn(() => ({})), + getApiKeyForModel: vi.fn(async () => ({ + apiKey: "test-key", + profileId: "test-profile", + source: "test", + })), + resolveAuthProfileOrder: vi.fn(() => []), +})); + +vi.mock("../models-config.js", () => ({ + ensureOpenClawModelsJson: vi.fn(async () => {}), +})); + +vi.mock("../context-window-guard.js", () => ({ + CONTEXT_WINDOW_HARD_MIN_TOKENS: 1000, + CONTEXT_WINDOW_WARN_BELOW_TOKENS: 5000, + evaluateContextWindowGuard: vi.fn(() => ({ + shouldWarn: false, + shouldBlock: false, + tokens: 200000, + source: "model", + })), + resolveContextWindowInfo: vi.fn(() => ({ + tokens: 200000, + source: "model", + })), +})); + +vi.mock("../../process/command-queue.js", () => ({ + enqueueCommandInLane: vi.fn((_lane: string, task: () => unknown) => task()), +})); + +vi.mock("../../utils/message-channel.js", () => ({ + isMarkdownCapableMessageChannel: vi.fn(() => true), +})); + +vi.mock("../agent-paths.js", () => ({ + resolveOpenClawAgentDir: vi.fn(() => "/tmp/agent-dir"), +})); + +vi.mock("../auth-profiles.js", () => ({ + isProfileInCooldown: vi.fn(() => false), + markAuthProfileFailure: vi.fn(async () => {}), + markAuthProfileGood: vi.fn(async () => {}), + markAuthProfileUsed: vi.fn(async () => {}), +})); + +vi.mock("../defaults.js", () => ({ + DEFAULT_CONTEXT_TOKENS: 200000, + DEFAULT_MODEL: "test-model", + DEFAULT_PROVIDER: "anthropic", +})); + +vi.mock("../failover-error.js", () => ({ + FailoverError: class extends Error {}, + resolveFailoverStatus: vi.fn(), +})); + +vi.mock("../usage.js", () => ({ + normalizeUsage: vi.fn((usage?: unknown) => + usage && typeof usage === "object" ? usage : undefined, + ), + derivePromptTokens: vi.fn( + (usage?: { input?: number; cacheRead?: number; cacheWrite?: number }) => { + if (!usage) { + return undefined; + } + const input = usage.input ?? 0; + const cacheRead = usage.cacheRead ?? 0; + const cacheWrite = usage.cacheWrite ?? 0; + const sum = input + cacheRead + cacheWrite; + return sum > 0 ? sum : undefined; + }, + ), +})); + +vi.mock("../workspace-run.js", () => ({ + resolveRunWorkspaceDir: vi.fn((params: { workspaceDir: string }) => ({ + workspaceDir: params.workspaceDir, + usedFallback: false, + fallbackReason: undefined, + agentId: "main", + })), + redactRunIdentifier: vi.fn((value?: string) => value ?? ""), +})); + +vi.mock("./lanes.js", () => ({ + resolveSessionLane: vi.fn(() => "session-lane"), + resolveGlobalLane: vi.fn(() => "global-lane"), +})); + +vi.mock("./logger.js", () => ({ + log: { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + isEnabled: vi.fn(() => false), + }, +})); + +vi.mock("./run/payloads.js", () => ({ + buildEmbeddedRunPayloads: vi.fn(() => []), +})); + +vi.mock("./tool-result-truncation.js", () => ({ + truncateOversizedToolResultsInSession: vi.fn(async () => ({ + truncated: false, + truncatedCount: 0, + reason: "no oversized tool results", + })), + sessionLikelyHasOversizedToolResults: vi.fn(() => false), +})); + +vi.mock("./utils.js", () => ({ + describeUnknownError: vi.fn((err: unknown) => { + if (err instanceof Error) { + return err.message; + } + return String(err); + }), +})); + +vi.mock("../pi-embedded-helpers.js", () => ({ + formatBillingErrorMessage: vi.fn(() => ""), + classifyFailoverReason: vi.fn(() => null), + formatAssistantErrorText: vi.fn(() => ""), + isAuthAssistantError: vi.fn(() => false), + isBillingAssistantError: vi.fn(() => false), + isCompactionFailureError: vi.fn(() => false), + isLikelyContextOverflowError: vi.fn((msg?: string) => { + const lower = (msg ?? "").toLowerCase(); + return lower.includes("request_too_large") || lower.includes("context window exceeded"); + }), + isFailoverAssistantError: vi.fn(() => false), + isFailoverErrorMessage: vi.fn(() => false), + parseImageSizeError: vi.fn(() => null), + parseImageDimensionError: vi.fn(() => null), + isRateLimitAssistantError: vi.fn(() => false), + isTimeoutErrorMessage: vi.fn(() => false), + pickFallbackThinkingLevel: vi.fn(() => null), +})); + +import type { EmbeddedRunAttemptResult } from "./run/types.js"; +import { compactEmbeddedPiSessionDirect } from "./compact.js"; +import { runEmbeddedPiAgent } from "./run.js"; +import { runEmbeddedAttempt } from "./run/attempt.js"; + +const mockedRunEmbeddedAttempt = vi.mocked(runEmbeddedAttempt); +const mockedCompactDirect = vi.mocked(compactEmbeddedPiSessionDirect); + +function makeAttemptResult( + overrides: Partial = {}, +): EmbeddedRunAttemptResult { + return { + aborted: false, + timedOut: false, + timedOutDuringCompaction: false, + promptError: null, + sessionIdUsed: "test-session", + assistantTexts: ["Hello!"], + toolMetas: [], + lastAssistant: undefined, + messagesSnapshot: [], + didSendViaMessagingTool: false, + messagingToolSentTexts: [], + messagingToolSentTargets: [], + cloudCodeAssistFormatError: false, + ...overrides, + }; +} + +describe("runEmbeddedPiAgent overflow compaction trigger routing", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("passes trigger=overflow when retrying compaction after context overflow", async () => { + const overflowError = new Error("request_too_large: Request size exceeds model context window"); + + mockedRunEmbeddedAttempt + .mockResolvedValueOnce(makeAttemptResult({ promptError: overflowError })) + .mockResolvedValueOnce(makeAttemptResult({ promptError: null })); + + mockedCompactDirect.mockResolvedValueOnce({ + ok: true, + compacted: true, + result: { + summary: "Compacted session", + firstKeptEntryId: "entry-5", + tokensBefore: 150000, + }, + }); + + await runEmbeddedPiAgent({ + sessionId: "test-session", + sessionKey: "test-key", + sessionFile: "/tmp/session.json", + workspaceDir: "/tmp/workspace", + prompt: "hello", + timeoutMs: 30000, + runId: "run-1", + }); + + expect(mockedCompactDirect).toHaveBeenCalledTimes(1); + expect(mockedCompactDirect).toHaveBeenCalledWith( + expect.objectContaining({ + trigger: "overflow", + authProfileId: "test-profile", + }), + ); + }); +}); diff --git a/src/auto-reply/reply/commands-compact.test.ts b/src/auto-reply/reply/commands-compact.test.ts new file mode 100644 index 0000000000..7c418ac239 --- /dev/null +++ b/src/auto-reply/reply/commands-compact.test.ts @@ -0,0 +1,114 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import type { OpenClawConfig } from "../../config/config.js"; +import { compactEmbeddedPiSession } from "../../agents/pi-embedded.js"; +import { handleCompactCommand } from "./commands-compact.js"; +import { buildCommandTestParams } from "./commands.test-harness.js"; + +vi.mock("../../agents/pi-embedded.js", () => ({ + abortEmbeddedPiRun: vi.fn(), + compactEmbeddedPiSession: vi.fn(), + isEmbeddedPiRunActive: vi.fn().mockReturnValue(false), + waitForEmbeddedPiRunEnd: vi.fn().mockResolvedValue(undefined), +})); + +vi.mock("../../infra/system-events.js", () => ({ + enqueueSystemEvent: vi.fn(), +})); + +vi.mock("./session-updates.js", () => ({ + incrementCompactionCount: vi.fn(), +})); + +describe("/compact command", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("returns null when command is not /compact", async () => { + const cfg = { + commands: { text: true }, + channels: { whatsapp: { allowFrom: ["*"] } }, + } as OpenClawConfig; + const params = buildCommandTestParams("/status", cfg); + + const result = await handleCompactCommand( + { + ...params, + }, + true, + ); + + expect(result).toBeNull(); + expect(vi.mocked(compactEmbeddedPiSession)).not.toHaveBeenCalled(); + }); + + it("rejects unauthorized /compact commands", async () => { + const cfg = { + commands: { text: true }, + channels: { whatsapp: { allowFrom: ["*"] } }, + } as OpenClawConfig; + const params = buildCommandTestParams("/compact", cfg); + + const result = await handleCompactCommand( + { + ...params, + command: { + ...params.command, + isAuthorizedSender: false, + senderId: "unauthorized", + }, + }, + true, + ); + + expect(result).toEqual({ shouldContinue: false }); + expect(vi.mocked(compactEmbeddedPiSession)).not.toHaveBeenCalled(); + }); + + it("routes manual compaction with explicit trigger and context metadata", async () => { + const cfg = { + commands: { text: true }, + channels: { whatsapp: { allowFrom: ["*"] } }, + session: { store: "/tmp/openclaw-session-store.json" }, + } as OpenClawConfig; + const params = buildCommandTestParams("/compact: focus on decisions", cfg, { + From: "+15550001", + To: "+15550002", + }); + vi.mocked(compactEmbeddedPiSession).mockResolvedValueOnce({ + ok: true, + compacted: false, + }); + + const result = await handleCompactCommand( + { + ...params, + sessionEntry: { + sessionId: "session-1", + groupId: "group-1", + groupChannel: "#general", + space: "workspace-1", + spawnedBy: "agent:main:parent", + totalTokens: 12345, + }, + }, + true, + ); + + expect(result?.shouldContinue).toBe(false); + expect(vi.mocked(compactEmbeddedPiSession)).toHaveBeenCalledOnce(); + expect(vi.mocked(compactEmbeddedPiSession)).toHaveBeenCalledWith( + expect.objectContaining({ + sessionId: "session-1", + sessionKey: "agent:main:main", + trigger: "manual", + customInstructions: "focus on decisions", + messageChannel: "whatsapp", + groupId: "group-1", + groupChannel: "#general", + groupSpace: "workspace-1", + spawnedBy: "agent:main:parent", + }), + ); + }); +}); diff --git a/src/auto-reply/reply/commands-compact.ts b/src/auto-reply/reply/commands-compact.ts index 00b00e7ede..3362950872 100644 --- a/src/auto-reply/reply/commands-compact.ts +++ b/src/auto-reply/reply/commands-compact.ts @@ -103,6 +103,7 @@ export const handleCompactCommand: CommandHandler = async (params) => { defaultLevel: "off", }, customInstructions, + trigger: "manual", senderIsOwner: params.command.senderIsOwner, ownerNumbers: params.command.ownerList.length > 0 ? params.command.ownerList : undefined, }); diff --git a/src/node-host/with-timeout.ts b/src/node-host/with-timeout.ts index 07ea141549..1acf525a79 100644 --- a/src/node-host/with-timeout.ts +++ b/src/node-host/with-timeout.ts @@ -14,6 +14,7 @@ export async function withTimeout( const abortCtrl = new AbortController(); const timeoutError = new Error(`${label ?? "request"} timed out`); const timer = setTimeout(() => abortCtrl.abort(timeoutError), resolved); + timer.unref?.(); let abortListener: (() => void) | undefined; const abortPromise: Promise = abortCtrl.signal.aborted