mirror of
https://github.com/openclaw/openclaw.git
synced 2026-02-19 00:27:31 +00:00
fix: add safety timeout to session.compact() to prevent lane deadlock (#16533)
Merged via /review-pr -> /prepare-pr -> /merge-pr.
Prepared head SHA: 21e4045add
Co-authored-by: BinHPdev <219093083+BinHPdev@users.noreply.github.com>
Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com>
Reviewed-by: @gumadeiras
This commit is contained in:
@@ -14,6 +14,7 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
### Fixes
|
||||
|
||||
- Agents: add a safety timeout around embedded `session.compact()` to ensure stalled compaction runs settle and release blocked session lanes. (#16331) Thanks @BinHPdev.
|
||||
- Agents: keep unresolved mutating tool failures visible until the same action retry succeeds, scope mutation-error surfacing to mutating calls (including `session_status` model changes), and dedupe duplicate failure warnings in outbound replies. (#16131) Thanks @Swader.
|
||||
- Agents: classify external timeout aborts during compaction the same as internal timeouts, preventing unnecessary auth-profile rotation and preserving compaction-timeout snapshot fallback behavior. (#9855) Thanks @mverrilli.
|
||||
- Sessions/Agents: harden transcript path resolution for mismatched agent context by preserving explicit store roots and adding safe absolute-path fallback to the correct agent sessions directory. (#16288) Thanks @robbyczgw-cla.
|
||||
|
||||
@@ -0,0 +1,45 @@
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import {
|
||||
compactWithSafetyTimeout,
|
||||
EMBEDDED_COMPACTION_TIMEOUT_MS,
|
||||
} from "./pi-embedded-runner/compaction-safety-timeout.js";
|
||||
|
||||
describe("compactWithSafetyTimeout", () => {
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("rejects with timeout when compaction never settles", async () => {
|
||||
vi.useFakeTimers();
|
||||
const compactPromise = compactWithSafetyTimeout(() => new Promise<never>(() => {}));
|
||||
const timeoutAssertion = expect(compactPromise).rejects.toThrow("Compaction timed out");
|
||||
|
||||
await vi.advanceTimersByTimeAsync(EMBEDDED_COMPACTION_TIMEOUT_MS);
|
||||
await timeoutAssertion;
|
||||
expect(vi.getTimerCount()).toBe(0);
|
||||
});
|
||||
|
||||
it("returns result and clears timer when compaction settles first", async () => {
|
||||
vi.useFakeTimers();
|
||||
const compactPromise = compactWithSafetyTimeout(
|
||||
() => new Promise<string>((resolve) => setTimeout(() => resolve("ok"), 10)),
|
||||
30,
|
||||
);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(10);
|
||||
await expect(compactPromise).resolves.toBe("ok");
|
||||
expect(vi.getTimerCount()).toBe(0);
|
||||
});
|
||||
|
||||
it("preserves compaction errors and clears timer", async () => {
|
||||
vi.useFakeTimers();
|
||||
const error = new Error("provider exploded");
|
||||
|
||||
await expect(
|
||||
compactWithSafetyTimeout(async () => {
|
||||
throw error;
|
||||
}, 30),
|
||||
).rejects.toBe(error);
|
||||
expect(vi.getTimerCount()).toBe(0);
|
||||
});
|
||||
});
|
||||
@@ -57,6 +57,7 @@ import {
|
||||
type SkillSnapshot,
|
||||
} from "../skills.js";
|
||||
import { resolveTranscriptPolicy } from "../transcript-policy.js";
|
||||
import { compactWithSafetyTimeout } from "./compaction-safety-timeout.js";
|
||||
import { buildEmbeddedExtensionPaths } from "./extensions.js";
|
||||
import {
|
||||
logToolSchemasForGoogle,
|
||||
@@ -107,7 +108,7 @@ export type CompactEmbeddedPiSessionParams = {
|
||||
reasoningLevel?: ReasoningLevel;
|
||||
bashElevated?: ExecElevatedDefaults;
|
||||
customInstructions?: string;
|
||||
trigger?: "overflow" | "manual" | "cache_ttl" | "safeguard";
|
||||
trigger?: "overflow" | "manual";
|
||||
diagId?: string;
|
||||
attempt?: number;
|
||||
maxAttempts?: number;
|
||||
@@ -632,7 +633,9 @@ export async function compactEmbeddedPiSessionDirect(
|
||||
}
|
||||
|
||||
const compactStartedAt = Date.now();
|
||||
const result = await session.compact(params.customInstructions);
|
||||
const result = await compactWithSafetyTimeout(() =>
|
||||
session.compact(params.customInstructions),
|
||||
);
|
||||
// Estimate tokens after compaction by summing token estimates for remaining messages
|
||||
let tokensAfter: number | undefined;
|
||||
try {
|
||||
|
||||
10
src/agents/pi-embedded-runner/compaction-safety-timeout.ts
Normal file
10
src/agents/pi-embedded-runner/compaction-safety-timeout.ts
Normal file
@@ -0,0 +1,10 @@
|
||||
import { withTimeout } from "../../node-host/with-timeout.js";
|
||||
|
||||
export const EMBEDDED_COMPACTION_TIMEOUT_MS = 300_000;
|
||||
|
||||
export async function compactWithSafetyTimeout<T>(
|
||||
compact: () => Promise<T>,
|
||||
timeoutMs: number = EMBEDDED_COMPACTION_TIMEOUT_MS,
|
||||
): Promise<T> {
|
||||
return await withTimeout(() => compact(), timeoutMs, "Compaction");
|
||||
}
|
||||
240
src/agents/pi-embedded-runner/run.overflow-compaction.test.ts
Normal file
240
src/agents/pi-embedded-runner/run.overflow-compaction.test.ts
Normal file
@@ -0,0 +1,240 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
vi.mock("./run/attempt.js", () => ({
|
||||
runEmbeddedAttempt: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("./compact.js", () => ({
|
||||
compactEmbeddedPiSessionDirect: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("./model.js", () => ({
|
||||
resolveModel: vi.fn(() => ({
|
||||
model: {
|
||||
id: "test-model",
|
||||
provider: "anthropic",
|
||||
contextWindow: 200000,
|
||||
api: "messages",
|
||||
},
|
||||
error: null,
|
||||
authStorage: {
|
||||
setRuntimeApiKey: vi.fn(),
|
||||
},
|
||||
modelRegistry: {},
|
||||
})),
|
||||
}));
|
||||
|
||||
vi.mock("../model-auth.js", () => ({
|
||||
ensureAuthProfileStore: vi.fn(() => ({})),
|
||||
getApiKeyForModel: vi.fn(async () => ({
|
||||
apiKey: "test-key",
|
||||
profileId: "test-profile",
|
||||
source: "test",
|
||||
})),
|
||||
resolveAuthProfileOrder: vi.fn(() => []),
|
||||
}));
|
||||
|
||||
vi.mock("../models-config.js", () => ({
|
||||
ensureOpenClawModelsJson: vi.fn(async () => {}),
|
||||
}));
|
||||
|
||||
vi.mock("../context-window-guard.js", () => ({
|
||||
CONTEXT_WINDOW_HARD_MIN_TOKENS: 1000,
|
||||
CONTEXT_WINDOW_WARN_BELOW_TOKENS: 5000,
|
||||
evaluateContextWindowGuard: vi.fn(() => ({
|
||||
shouldWarn: false,
|
||||
shouldBlock: false,
|
||||
tokens: 200000,
|
||||
source: "model",
|
||||
})),
|
||||
resolveContextWindowInfo: vi.fn(() => ({
|
||||
tokens: 200000,
|
||||
source: "model",
|
||||
})),
|
||||
}));
|
||||
|
||||
vi.mock("../../process/command-queue.js", () => ({
|
||||
enqueueCommandInLane: vi.fn((_lane: string, task: () => unknown) => task()),
|
||||
}));
|
||||
|
||||
vi.mock("../../utils/message-channel.js", () => ({
|
||||
isMarkdownCapableMessageChannel: vi.fn(() => true),
|
||||
}));
|
||||
|
||||
vi.mock("../agent-paths.js", () => ({
|
||||
resolveOpenClawAgentDir: vi.fn(() => "/tmp/agent-dir"),
|
||||
}));
|
||||
|
||||
vi.mock("../auth-profiles.js", () => ({
|
||||
isProfileInCooldown: vi.fn(() => false),
|
||||
markAuthProfileFailure: vi.fn(async () => {}),
|
||||
markAuthProfileGood: vi.fn(async () => {}),
|
||||
markAuthProfileUsed: vi.fn(async () => {}),
|
||||
}));
|
||||
|
||||
vi.mock("../defaults.js", () => ({
|
||||
DEFAULT_CONTEXT_TOKENS: 200000,
|
||||
DEFAULT_MODEL: "test-model",
|
||||
DEFAULT_PROVIDER: "anthropic",
|
||||
}));
|
||||
|
||||
vi.mock("../failover-error.js", () => ({
|
||||
FailoverError: class extends Error {},
|
||||
resolveFailoverStatus: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("../usage.js", () => ({
|
||||
normalizeUsage: vi.fn((usage?: unknown) =>
|
||||
usage && typeof usage === "object" ? usage : undefined,
|
||||
),
|
||||
derivePromptTokens: vi.fn(
|
||||
(usage?: { input?: number; cacheRead?: number; cacheWrite?: number }) => {
|
||||
if (!usage) {
|
||||
return undefined;
|
||||
}
|
||||
const input = usage.input ?? 0;
|
||||
const cacheRead = usage.cacheRead ?? 0;
|
||||
const cacheWrite = usage.cacheWrite ?? 0;
|
||||
const sum = input + cacheRead + cacheWrite;
|
||||
return sum > 0 ? sum : undefined;
|
||||
},
|
||||
),
|
||||
}));
|
||||
|
||||
vi.mock("../workspace-run.js", () => ({
|
||||
resolveRunWorkspaceDir: vi.fn((params: { workspaceDir: string }) => ({
|
||||
workspaceDir: params.workspaceDir,
|
||||
usedFallback: false,
|
||||
fallbackReason: undefined,
|
||||
agentId: "main",
|
||||
})),
|
||||
redactRunIdentifier: vi.fn((value?: string) => value ?? ""),
|
||||
}));
|
||||
|
||||
vi.mock("./lanes.js", () => ({
|
||||
resolveSessionLane: vi.fn(() => "session-lane"),
|
||||
resolveGlobalLane: vi.fn(() => "global-lane"),
|
||||
}));
|
||||
|
||||
vi.mock("./logger.js", () => ({
|
||||
log: {
|
||||
debug: vi.fn(),
|
||||
info: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
error: vi.fn(),
|
||||
isEnabled: vi.fn(() => false),
|
||||
},
|
||||
}));
|
||||
|
||||
vi.mock("./run/payloads.js", () => ({
|
||||
buildEmbeddedRunPayloads: vi.fn(() => []),
|
||||
}));
|
||||
|
||||
vi.mock("./tool-result-truncation.js", () => ({
|
||||
truncateOversizedToolResultsInSession: vi.fn(async () => ({
|
||||
truncated: false,
|
||||
truncatedCount: 0,
|
||||
reason: "no oversized tool results",
|
||||
})),
|
||||
sessionLikelyHasOversizedToolResults: vi.fn(() => false),
|
||||
}));
|
||||
|
||||
vi.mock("./utils.js", () => ({
|
||||
describeUnknownError: vi.fn((err: unknown) => {
|
||||
if (err instanceof Error) {
|
||||
return err.message;
|
||||
}
|
||||
return String(err);
|
||||
}),
|
||||
}));
|
||||
|
||||
vi.mock("../pi-embedded-helpers.js", () => ({
|
||||
formatBillingErrorMessage: vi.fn(() => ""),
|
||||
classifyFailoverReason: vi.fn(() => null),
|
||||
formatAssistantErrorText: vi.fn(() => ""),
|
||||
isAuthAssistantError: vi.fn(() => false),
|
||||
isBillingAssistantError: vi.fn(() => false),
|
||||
isCompactionFailureError: vi.fn(() => false),
|
||||
isLikelyContextOverflowError: vi.fn((msg?: string) => {
|
||||
const lower = (msg ?? "").toLowerCase();
|
||||
return lower.includes("request_too_large") || lower.includes("context window exceeded");
|
||||
}),
|
||||
isFailoverAssistantError: vi.fn(() => false),
|
||||
isFailoverErrorMessage: vi.fn(() => false),
|
||||
parseImageSizeError: vi.fn(() => null),
|
||||
parseImageDimensionError: vi.fn(() => null),
|
||||
isRateLimitAssistantError: vi.fn(() => false),
|
||||
isTimeoutErrorMessage: vi.fn(() => false),
|
||||
pickFallbackThinkingLevel: vi.fn(() => null),
|
||||
}));
|
||||
|
||||
import type { EmbeddedRunAttemptResult } from "./run/types.js";
|
||||
import { compactEmbeddedPiSessionDirect } from "./compact.js";
|
||||
import { runEmbeddedPiAgent } from "./run.js";
|
||||
import { runEmbeddedAttempt } from "./run/attempt.js";
|
||||
|
||||
const mockedRunEmbeddedAttempt = vi.mocked(runEmbeddedAttempt);
|
||||
const mockedCompactDirect = vi.mocked(compactEmbeddedPiSessionDirect);
|
||||
|
||||
function makeAttemptResult(
|
||||
overrides: Partial<EmbeddedRunAttemptResult> = {},
|
||||
): EmbeddedRunAttemptResult {
|
||||
return {
|
||||
aborted: false,
|
||||
timedOut: false,
|
||||
timedOutDuringCompaction: false,
|
||||
promptError: null,
|
||||
sessionIdUsed: "test-session",
|
||||
assistantTexts: ["Hello!"],
|
||||
toolMetas: [],
|
||||
lastAssistant: undefined,
|
||||
messagesSnapshot: [],
|
||||
didSendViaMessagingTool: false,
|
||||
messagingToolSentTexts: [],
|
||||
messagingToolSentTargets: [],
|
||||
cloudCodeAssistFormatError: false,
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
describe("runEmbeddedPiAgent overflow compaction trigger routing", () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
it("passes trigger=overflow when retrying compaction after context overflow", async () => {
|
||||
const overflowError = new Error("request_too_large: Request size exceeds model context window");
|
||||
|
||||
mockedRunEmbeddedAttempt
|
||||
.mockResolvedValueOnce(makeAttemptResult({ promptError: overflowError }))
|
||||
.mockResolvedValueOnce(makeAttemptResult({ promptError: null }));
|
||||
|
||||
mockedCompactDirect.mockResolvedValueOnce({
|
||||
ok: true,
|
||||
compacted: true,
|
||||
result: {
|
||||
summary: "Compacted session",
|
||||
firstKeptEntryId: "entry-5",
|
||||
tokensBefore: 150000,
|
||||
},
|
||||
});
|
||||
|
||||
await runEmbeddedPiAgent({
|
||||
sessionId: "test-session",
|
||||
sessionKey: "test-key",
|
||||
sessionFile: "/tmp/session.json",
|
||||
workspaceDir: "/tmp/workspace",
|
||||
prompt: "hello",
|
||||
timeoutMs: 30000,
|
||||
runId: "run-1",
|
||||
});
|
||||
|
||||
expect(mockedCompactDirect).toHaveBeenCalledTimes(1);
|
||||
expect(mockedCompactDirect).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
trigger: "overflow",
|
||||
authProfileId: "test-profile",
|
||||
}),
|
||||
);
|
||||
});
|
||||
});
|
||||
114
src/auto-reply/reply/commands-compact.test.ts
Normal file
114
src/auto-reply/reply/commands-compact.test.ts
Normal file
@@ -0,0 +1,114 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { OpenClawConfig } from "../../config/config.js";
|
||||
import { compactEmbeddedPiSession } from "../../agents/pi-embedded.js";
|
||||
import { handleCompactCommand } from "./commands-compact.js";
|
||||
import { buildCommandTestParams } from "./commands.test-harness.js";
|
||||
|
||||
vi.mock("../../agents/pi-embedded.js", () => ({
|
||||
abortEmbeddedPiRun: vi.fn(),
|
||||
compactEmbeddedPiSession: vi.fn(),
|
||||
isEmbeddedPiRunActive: vi.fn().mockReturnValue(false),
|
||||
waitForEmbeddedPiRunEnd: vi.fn().mockResolvedValue(undefined),
|
||||
}));
|
||||
|
||||
vi.mock("../../infra/system-events.js", () => ({
|
||||
enqueueSystemEvent: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("./session-updates.js", () => ({
|
||||
incrementCompactionCount: vi.fn(),
|
||||
}));
|
||||
|
||||
describe("/compact command", () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
it("returns null when command is not /compact", async () => {
|
||||
const cfg = {
|
||||
commands: { text: true },
|
||||
channels: { whatsapp: { allowFrom: ["*"] } },
|
||||
} as OpenClawConfig;
|
||||
const params = buildCommandTestParams("/status", cfg);
|
||||
|
||||
const result = await handleCompactCommand(
|
||||
{
|
||||
...params,
|
||||
},
|
||||
true,
|
||||
);
|
||||
|
||||
expect(result).toBeNull();
|
||||
expect(vi.mocked(compactEmbeddedPiSession)).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("rejects unauthorized /compact commands", async () => {
|
||||
const cfg = {
|
||||
commands: { text: true },
|
||||
channels: { whatsapp: { allowFrom: ["*"] } },
|
||||
} as OpenClawConfig;
|
||||
const params = buildCommandTestParams("/compact", cfg);
|
||||
|
||||
const result = await handleCompactCommand(
|
||||
{
|
||||
...params,
|
||||
command: {
|
||||
...params.command,
|
||||
isAuthorizedSender: false,
|
||||
senderId: "unauthorized",
|
||||
},
|
||||
},
|
||||
true,
|
||||
);
|
||||
|
||||
expect(result).toEqual({ shouldContinue: false });
|
||||
expect(vi.mocked(compactEmbeddedPiSession)).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("routes manual compaction with explicit trigger and context metadata", async () => {
|
||||
const cfg = {
|
||||
commands: { text: true },
|
||||
channels: { whatsapp: { allowFrom: ["*"] } },
|
||||
session: { store: "/tmp/openclaw-session-store.json" },
|
||||
} as OpenClawConfig;
|
||||
const params = buildCommandTestParams("/compact: focus on decisions", cfg, {
|
||||
From: "+15550001",
|
||||
To: "+15550002",
|
||||
});
|
||||
vi.mocked(compactEmbeddedPiSession).mockResolvedValueOnce({
|
||||
ok: true,
|
||||
compacted: false,
|
||||
});
|
||||
|
||||
const result = await handleCompactCommand(
|
||||
{
|
||||
...params,
|
||||
sessionEntry: {
|
||||
sessionId: "session-1",
|
||||
groupId: "group-1",
|
||||
groupChannel: "#general",
|
||||
space: "workspace-1",
|
||||
spawnedBy: "agent:main:parent",
|
||||
totalTokens: 12345,
|
||||
},
|
||||
},
|
||||
true,
|
||||
);
|
||||
|
||||
expect(result?.shouldContinue).toBe(false);
|
||||
expect(vi.mocked(compactEmbeddedPiSession)).toHaveBeenCalledOnce();
|
||||
expect(vi.mocked(compactEmbeddedPiSession)).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
sessionId: "session-1",
|
||||
sessionKey: "agent:main:main",
|
||||
trigger: "manual",
|
||||
customInstructions: "focus on decisions",
|
||||
messageChannel: "whatsapp",
|
||||
groupId: "group-1",
|
||||
groupChannel: "#general",
|
||||
groupSpace: "workspace-1",
|
||||
spawnedBy: "agent:main:parent",
|
||||
}),
|
||||
);
|
||||
});
|
||||
});
|
||||
@@ -103,6 +103,7 @@ export const handleCompactCommand: CommandHandler = async (params) => {
|
||||
defaultLevel: "off",
|
||||
},
|
||||
customInstructions,
|
||||
trigger: "manual",
|
||||
senderIsOwner: params.command.senderIsOwner,
|
||||
ownerNumbers: params.command.ownerList.length > 0 ? params.command.ownerList : undefined,
|
||||
});
|
||||
|
||||
@@ -14,6 +14,7 @@ export async function withTimeout<T>(
|
||||
const abortCtrl = new AbortController();
|
||||
const timeoutError = new Error(`${label ?? "request"} timed out`);
|
||||
const timer = setTimeout(() => abortCtrl.abort(timeoutError), resolved);
|
||||
timer.unref?.();
|
||||
|
||||
let abortListener: (() => void) | undefined;
|
||||
const abortPromise: Promise<never> = abortCtrl.signal.aborted
|
||||
|
||||
Reference in New Issue
Block a user