Merge pull request #62 from bing-zhub/fix_custom_claude

fix: claude streaming tool use and add cache control;
This commit is contained in:
Xinlu Lai 2025-08-29 01:05:08 +08:00 committed by GitHub
commit 9e21c5f48f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 177 additions and 55 deletions

View File

@ -206,10 +206,10 @@ export async function* query(
typeof lastUserMessage.message.content === 'string'
? reminders + lastUserMessage.message.content
: [
{ type: 'text', text: reminders },
...(Array.isArray(lastUserMessage.message.content)
? lastUserMessage.message.content
: []),
{ type: 'text', text: reminders },
],
},
}
@ -567,8 +567,16 @@ async function* checkPermissionsAndCallTool(
// (surprisingly, the model is not great at generating valid input)
const isValidInput = tool.inputSchema.safeParse(input)
if (!isValidInput.success) {
// Create a more helpful error message for common cases
let errorMessage = `InputValidationError: ${isValidInput.error.message}`
// Special handling for the "View" tool (FileReadTool) being called with empty parameters
if (tool.name === 'View' && Object.keys(input).length === 0) {
errorMessage = `Error: The View tool requires a 'file_path' parameter to specify which file to read. Please provide the absolute path to the file you want to view. For example: {"file_path": "/path/to/file.txt"}`
}
logEvent('tengu_tool_use_error', {
error: `InputValidationError: ${isValidInput.error.message}`,
error: errorMessage,
messageID: assistantMessage.message.id,
toolName: tool.name,
toolInput: JSON.stringify(input).slice(0, 200),
@ -576,7 +584,7 @@ async function* checkPermissionsAndCallTool(
yield createUserMessage([
{
type: 'tool_result',
content: `InputValidationError: ${isValidInput.error.message}`,
content: errorMessage,
is_error: true,
tool_use_id: toolUseID,
},

View File

@ -962,6 +962,84 @@ export function resetAnthropicClient(): void {
* 4. Fallback region (us-east5)
*/
/**
* Manage cache control to ensure it doesn't exceed Claude's 4 cache block limit
* Priority:
* 1. System prompts (high priority)
* 2. Long documents or reference materials (high priority)
* 3. Reusable context (medium priority)
* 4. Short messages or one-time content (no caching)
*/
function applyCacheControlWithLimits(
systemBlocks: TextBlockParam[],
messageParams: MessageParam[]
): { systemBlocks: TextBlockParam[]; messageParams: MessageParam[] } {
if (!PROMPT_CACHING_ENABLED) {
return { systemBlocks, messageParams }
}
const maxCacheBlocks = 4
let usedCacheBlocks = 0
// 1. Prioritize adding cache to system prompts (highest priority)
const processedSystemBlocks = systemBlocks.map((block, index) => {
if (usedCacheBlocks < maxCacheBlocks && block.text.length > 1000) {
usedCacheBlocks++
return {
...block,
cache_control: { type: 'ephemeral' as const }
}
}
const { cache_control, ...blockWithoutCache } = block
return blockWithoutCache
})
// 2. Add cache to message content based on priority
const processedMessageParams = messageParams.map((message, messageIndex) => {
if (Array.isArray(message.content)) {
const processedContent = message.content.map((contentBlock, blockIndex) => {
// Determine whether this content block should be cached
const shouldCache =
usedCacheBlocks < maxCacheBlocks &&
contentBlock.type === 'text' &&
typeof contentBlock.text === 'string' &&
(
// Long documents (over 2000 characters)
contentBlock.text.length > 2000 ||
// Last content block of the last message (may be important context)
(messageIndex === messageParams.length - 1 &&
blockIndex === message.content.length - 1 &&
contentBlock.text.length > 500)
)
if (shouldCache) {
usedCacheBlocks++
return {
...contentBlock,
cache_control: { type: 'ephemeral' as const }
}
}
// Remove existing cache_control
const { cache_control, ...blockWithoutCache } = contentBlock as any
return blockWithoutCache
})
return {
...message,
content: processedContent
}
}
return message
})
return {
systemBlocks: processedSystemBlocks,
messageParams: processedMessageParams
}
}
export function userMessageToMessageParam(
message: UserMessage,
addCache = false,
@ -974,23 +1052,13 @@ export function userMessageToMessageParam(
{
type: 'text',
text: message.message.content,
...(PROMPT_CACHING_ENABLED
? { cache_control: { type: 'ephemeral' } }
: {}),
},
],
}
} else {
return {
role: 'user',
content: message.message.content.map((_, i) => ({
..._,
...(i === message.message.content.length - 1
? PROMPT_CACHING_ENABLED
? { cache_control: { type: 'ephemeral' } }
: {}
: {}),
})),
content: message.message.content.map((_) => ({ ..._ })),
}
}
}
@ -1012,25 +1080,13 @@ export function assistantMessageToMessageParam(
{
type: 'text',
text: message.message.content,
...(PROMPT_CACHING_ENABLED
? { cache_control: { type: 'ephemeral' } }
: {}),
},
],
}
} else {
return {
role: 'assistant',
content: message.message.content.map((_, i) => ({
..._,
...(i === message.message.content.length - 1 &&
_.type !== 'thinking' &&
_.type !== 'redacted_thinking'
? PROMPT_CACHING_ENABLED
? { cache_control: { type: 'ephemeral' } }
: {}
: {}),
})),
content: message.message.content.map((_) => ({ ..._ })),
}
}
}
@ -1383,9 +1439,6 @@ async function queryAnthropicNative(
const system: TextBlockParam[] = splitSysPromptPrefix(systemPrompt).map(
_ => ({
...(PROMPT_CACHING_ENABLED
? { cache_control: { type: 'ephemeral' } }
: {}),
text: _,
type: 'text',
}),
@ -1404,6 +1457,10 @@ async function queryAnthropicNative(
)
const anthropicMessages = addCacheBreakpoints(messages)
// apply cache control
const { systemBlocks: processedSystem, messageParams: processedMessages } =
applyCacheControlWithLimits(system, anthropicMessages)
const startIncludingRetries = Date.now()
// 记录系统提示构建过程
@ -1426,8 +1483,8 @@ async function queryAnthropicNative(
const params: Anthropic.Beta.Messages.MessageCreateParams = {
model,
max_tokens: getMaxTokensFromProfile(modelProfile),
messages: anthropicMessages,
system,
messages: processedMessages,
system: processedSystem,
tools: toolSchemas.length > 0 ? toolSchemas : undefined,
tool_choice: toolSchemas.length > 0 ? { type: 'auto' } : undefined,
}
@ -1450,6 +1507,7 @@ async function queryAnthropicNative(
: null,
maxTokens: params.max_tokens,
temperature: MAIN_QUERY_TEMPERATURE,
params: params,
messageCount: params.messages?.length || 0,
streamMode: true,
toolsCount: toolSchemas.length,
@ -1471,6 +1529,7 @@ async function queryAnthropicNative(
let finalResponse: any | null = null
let messageStartEvent: any = null
const contentBlocks: any[] = []
const inputJSONBuffers = new Map<number, string>()
let usage: any = null
let stopReason: string | null = null
let stopSequence: string | null = null
@ -1484,30 +1543,81 @@ async function queryAnthropicNative(
})
throw new Error('Request was cancelled')
}
if (event.type === 'message_start') {
messageStartEvent = event
finalResponse = {
...event.message,
content: [], // Will be populated from content blocks
}
} else if (event.type === 'content_block_start') {
contentBlocks[event.index] = { ...event.content_block }
} else if (event.type === 'content_block_delta') {
if (!contentBlocks[event.index]) {
contentBlocks[event.index] = {
type: event.delta.type === 'text_delta' ? 'text' : 'unknown',
text: '',
switch (event.type) {
case 'message_start':
messageStartEvent = event
finalResponse = {
...event.message,
content: [], // Will be populated from content blocks
}
}
if (event.delta.type === 'text_delta') {
contentBlocks[event.index].text += event.delta.text
}
} else if (event.type === 'message_delta') {
if (event.delta.stop_reason) stopReason = event.delta.stop_reason
if (event.delta.stop_sequence)
stopSequence = event.delta.stop_sequence
if (event.usage) usage = { ...usage, ...event.usage }
} else if (event.type === 'message_stop') {
break
case 'content_block_start':
contentBlocks[event.index] = { ...event.content_block }
// Initialize JSON buffer for tool_use blocks
if (event.content_block.type === 'tool_use') {
inputJSONBuffers.set(event.index, '')
}
break
case 'content_block_delta':
const blockIndex = event.index
// Ensure content block exists
if (!contentBlocks[blockIndex]) {
contentBlocks[blockIndex] = {
type: event.delta.type === 'text_delta' ? 'text' : 'tool_use',
text: event.delta.type === 'text_delta' ? '' : undefined,
}
if (event.delta.type === 'input_json_delta') {
inputJSONBuffers.set(blockIndex, '')
}
}
if (event.delta.type === 'text_delta') {
contentBlocks[blockIndex].text += event.delta.text
} else if (event.delta.type === 'input_json_delta') {
const currentBuffer = inputJSONBuffers.get(blockIndex) || ''
inputJSONBuffers.set(blockIndex, currentBuffer + event.delta.partial_json)
}
break
case 'message_delta':
if (event.delta.stop_reason) stopReason = event.delta.stop_reason
if (event.delta.stop_sequence) stopSequence = event.delta.stop_sequence
if (event.usage) usage = { ...usage, ...event.usage }
break
case 'content_block_stop':
const stopIndex = event.index
const block = contentBlocks[stopIndex]
if (block?.type === 'tool_use' && inputJSONBuffers.has(stopIndex)) {
const jsonStr = inputJSONBuffers.get(stopIndex)
if (jsonStr) {
try {
block.input = JSON.parse(jsonStr)
} catch (error) {
debugLogger.error('JSON_PARSE_ERROR', {
blockIndex: stopIndex,
jsonStr,
error: error instanceof Error ? error.message : String(error)
})
block.input = {}
}
inputJSONBuffers.delete(stopIndex)
}
}
break
case 'message_stop':
// Clear any remaining buffers
inputJSONBuffers.clear()
break
}
if (event.type === 'message_stop') {
break
}
}
@ -1557,6 +1667,10 @@ async function queryAnthropicNative(
}
}, { signal })
debugLogger.api('ANTHROPIC_API_CALL_SUCCESS', {
content: response.content
})
const ttftMs = start - Date.now()
const durationMs = Date.now() - startIncludingRetries