Kode-cli/docs/develop/modules/query-engine.md
CrazyBoyM 7a3c4a7baa Refactor project structure and update documentation
- Update project branding from claude-cli to Kode
- Reorganize documentation with new development guides
- Add CONTRIBUTING.md and Chinese README
- Remove worktree_merge command and relocate system-design.md
- Update dependencies and package configuration
- Improve custom commands service with better error handling
- Clean up storage utilities and debug logging
2025-08-11 21:31:18 +08:00

10 KiB

Query Engine Module

Overview

The Query Engine (src/query.ts) is the heart of Kode's AI conversation system. It orchestrates the entire conversation flow, manages tool execution, handles streaming responses, and ensures proper context management.

Core Functionality

Main Query Function

export async function* query(
  queryOptions: QueryOptions
): AsyncGenerator<QueryStreamEvent> {
  // 1. Context preparation
  // 2. Model selection
  // 3. API request with streaming
  // 4. Tool use detection and execution
  // 5. Result integration
  // 6. Recursive continuation if needed
}

Query Options

interface QueryOptions {
  prompt: string
  model: ModelInfo
  messages: Message[]
  tools: Tool[]
  hasPermissionsToUseTool: PermissionChecker
  onUpdateUsage?: (usage: Usage) => void
  abortSignal?: AbortSignal
  enableThinking?: boolean
  safeMode?: boolean
}

Conversation Flow

1. Message Preparation

function prepareMessages(
  messages: Message[],
  systemPrompt: string,
  context: Context
): AnthropicMessage[] {
  // Add system context
  const systemMessage = buildSystemMessage(systemPrompt, context)
  
  // Convert internal messages to API format
  const apiMessages = messages.map(convertToAPIFormat)
  
  // Optimize token usage
  return compactMessages(apiMessages)
}

2. Streaming Response Handler

async function* handleStreamingResponse(
  stream: AsyncIterable<StreamEvent>
): AsyncGenerator<QueryStreamEvent> {
  for await (const event of stream) {
    switch (event.type) {
      case 'content_block_start':
        yield { type: 'text_delta', text: event.content }
        break
        
      case 'tool_use':
        yield { type: 'tool_request', tool: event.tool }
        break
        
      case 'message_stop':
        yield { type: 'complete' }
        break
    }
  }
}

3. Tool Execution Pipeline

async function* executeTools(
  toolUses: ToolUse[],
  context: ToolContext
): AsyncGenerator<ToolExecutionEvent> {
  // Determine execution strategy
  const strategy = context.safeMode ? 'serial' : 'concurrent'
  
  if (strategy === 'concurrent') {
    // Execute tools in parallel
    yield* executeConcurrent(toolUses, context)
  } else {
    // Execute tools serially for safety
    yield* executeSerial(toolUses, context)
  }
}

Tool Execution

Serial Execution (Safe Mode)

async function* executeSerial(
  toolUses: ToolUse[],
  context: ToolContext
): AsyncGenerator<ToolResult> {
  for (const toolUse of toolUses) {
    // Request permission
    const permission = await requestPermission(toolUse)
    
    if (!permission.granted) {
      yield {
        type: 'error',
        toolUseId: toolUse.id,
        error: 'Permission denied'
      }
      continue
    }
    
    // Execute tool
    const tool = findTool(toolUse.name)
    yield* tool.call(toolUse.input, context)
  }
}

Concurrent Execution (Permissive Mode)

async function* executeConcurrent(
  toolUses: ToolUse[],
  context: ToolContext
): AsyncGenerator<ToolResult> {
  // Group tools by permission requirements
  const { safe, needsPermission } = groupByPermission(toolUses)
  
  // Execute safe tools immediately
  const safePromises = safe.map(tool => 
    executeToolAsync(tool, context)
  )
  
  // Request permissions for others
  const permissions = await requestBatchPermissions(needsPermission)
  
  // Execute approved tools
  const approvedPromises = permissions
    .filter(p => p.granted)
    .map(p => executeToolAsync(p.tool, context))
  
  // Yield results as they complete
  for await (const result of mergeAsyncIterables([
    ...safePromises,
    ...approvedPromises
  ])) {
    yield result
  }
}

Context Management

Token Counting

class TokenManager {
  private usage: TokenUsage = {
    input: 0,
    output: 0,
    total: 0
  }
  
  count(messages: Message[]): number {
    // Use tiktoken for accurate counting
    return messages.reduce((total, msg) => 
      total + countTokens(msg.content), 0
    )
  }
  
  shouldCompact(usage: TokenUsage, limit: number): boolean {
    return usage.total > limit * 0.8
  }
  
  compact(messages: Message[]): Message[] {
    // Remove old messages while preserving context
    return intelligentCompaction(messages)
  }
}

Context Injection

function injectContext(
  messages: Message[],
  context: ProjectContext
): Message[] {
  // Add project context
  const contextMessage = {
    role: 'system',
    content: formatContext(context)
  }
  
  // Add recent file reads
  const fileContext = getRecentFileReads()
  
  // Add git status
  const gitContext = getGitStatus()
  
  return [
    contextMessage,
    ...fileContext,
    ...gitContext,
    ...messages
  ]
}

Error Handling

Error Recovery

async function* handleError(
  error: Error,
  context: QueryContext
): AsyncGenerator<QueryStreamEvent> {
  if (error.name === 'AbortError') {
    yield { type: 'cancelled' }
    return
  }
  
  if (error.name === 'RateLimitError') {
    // Switch to backup model
    const backupModel = getBackupModel()
    yield* retryWithModel(backupModel, context)
    return
  }
  
  if (error.name === 'ContextLengthError') {
    // Compact and retry
    const compacted = compactMessages(context.messages)
    yield* retryWithMessages(compacted, context)
    return
  }
  
  // Unrecoverable error
  yield {
    type: 'error',
    error: formatError(error)
  }
}

Graceful Degradation

function selectFallbackStrategy(error: Error): Strategy {
  switch (error.type) {
    case 'MODEL_UNAVAILABLE':
      return useAlternativeModel()
      
    case 'TOOL_FAILURE':
      return continueWithoutTool()
      
    case 'PERMISSION_DENIED':
      return requestAlternativeApproach()
      
    default:
      return reportErrorToUser()
  }
}

Streaming Architecture

Event Types

type QueryStreamEvent = 
  | { type: 'text_delta', text: string }
  | { type: 'tool_request', tool: ToolUse }
  | { type: 'tool_result', result: ToolResult }
  | { type: 'thinking', content: string }
  | { type: 'error', error: Error }
  | { type: 'complete' }
  | { type: 'usage', usage: TokenUsage }

Stream Management

class StreamManager {
  private buffer: string = ''
  private chunks: StreamEvent[] = []
  
  async *process(
    stream: ReadableStream
  ): AsyncGenerator<QueryStreamEvent> {
    const reader = stream.getReader()
    
    try {
      while (true) {
        const { done, value } = await reader.read()
        
        if (done) break
        
        // Parse SSE events
        const events = parseSSE(value)
        
        for (const event of events) {
          yield* processEvent(event)
        }
      }
    } finally {
      reader.releaseLock()
    }
  }
}

Performance Optimizations

Caching

class ResponseCache {
  private cache = new LRUCache<string, CachedResponse>({
    max: 100,
    ttl: 1000 * 60 * 5 // 5 minutes
  })
  
  getCacheKey(messages: Message[], tools: Tool[]): string {
    return hash({ messages, tools: tools.map(t => t.name) })
  }
  
  get(key: string): CachedResponse | null {
    return this.cache.get(key)
  }
  
  set(key: string, response: CachedResponse): void {
    this.cache.set(key, response)
  }
}

Parallel Processing

async function processToolsInBatches(
  tools: ToolUse[],
  batchSize: number = 5
): Promise<ToolResult[]> {
  const results: ToolResult[] = []
  
  for (let i = 0; i < tools.length; i += batchSize) {
    const batch = tools.slice(i, i + batchSize)
    const batchResults = await Promise.all(
      batch.map(tool => executeTool(tool))
    )
    results.push(...batchResults)
  }
  
  return results
}

Advanced Features

Thinking Tokens

function processThinkingTokens(
  response: APIResponse
): ProcessedResponse {
  const thinkingBlocks = extractThinkingBlocks(response)
  
  if (shouldShowThinking()) {
    return {
      ...response,
      thinking: thinkingBlocks
    }
  } else {
    // Hide thinking from user
    return {
      ...response,
      content: removeThinkingBlocks(response.content)
    }
  }
}

Binary Feedback

class BinaryFeedback {
  async collectFeedback(
    response1: Response,
    response2: Response
  ): Promise<Feedback> {
    // Show both responses
    displayComparison(response1, response2)
    
    // Collect user preference
    const preference = await getUserPreference()
    
    // Log for model improvement
    logFeedback({
      responses: [response1, response2],
      preference,
      context: getCurrentContext()
    })
    
    return preference
  }
}

Model Switching

class ModelSwitcher {
  async switchModel(
    reason: SwitchReason,
    currentModel: Model
  ): Promise<Model> {
    switch (reason) {
      case 'CONTEXT_TOO_LARGE':
        return this.getLargerContextModel()
        
      case 'RATE_LIMITED':
        return this.getBackupModel()
        
      case 'SPECIALIZED_TASK':
        return this.getSpecializedModel()
        
      default:
        return this.getDefaultModel()
    }
  }
}

Monitoring and Metrics

Performance Tracking

interface QueryMetrics {
  startTime: number
  endTime: number
  tokensUsed: TokenUsage
  toolsExecuted: number
  errorsEncountered: number
  modelUsed: string
  cacheHit: boolean
}

function trackQuery(metrics: QueryMetrics): void {
  // Log to analytics
  analytics.track('query_completed', metrics)
  
  // Update cost tracking
  updateCostTracking(metrics.tokensUsed, metrics.modelUsed)
  
  // Performance monitoring
  if (metrics.endTime - metrics.startTime > 30000) {
    logSlowQuery(metrics)
  }
}

Error Recovery Strategies

Retry Logic

async function* retryWithBackoff(
  operation: () => AsyncGenerator<QueryStreamEvent>,
  maxRetries: number = 3
): AsyncGenerator<QueryStreamEvent> {
  let retries = 0
  let delay = 1000
  
  while (retries < maxRetries) {
    try {
      yield* operation()
      return
    } catch (error) {
      if (!isRetryable(error)) throw error
      
      retries++
      yield {
        type: 'retry',
        attempt: retries,
        delay
      }
      
      await sleep(delay)
      delay *= 2 // Exponential backoff
    }
  }
  
  throw new Error('Max retries exceeded')
}

The Query Engine provides robust, efficient, and extensible AI conversation orchestration with comprehensive error handling, performance optimization, and security integration.