feat: modular non-intrusive Amazon Bedrock support

This commit is contained in:
Array 2025-03-20 12:15:34 +08:00
parent d4358ef537
commit 4e10b42b30
3 changed files with 280 additions and 0 deletions

275
app/bedrock.py Normal file
View File

@ -0,0 +1,275 @@
from typing import Dict, List, Literal, Optional, Union
import boto3
import json
import time
import uuid
from datetime import datetime
import sys
# Global variables to track the current tool use ID across function calls
# Tmp solution
CURRENT_TOOLUSE_ID = None
# Class to handle OpenAI-style response formatting
class OpenAIResponse:
def __init__(self, data):
# Recursively convert nested dicts and lists to OpenAIResponse objects
for key, value in data.items():
if isinstance(value, dict):
value = OpenAIResponse(value)
elif isinstance(value, list):
value = [OpenAIResponse(item) if isinstance(item, dict) else item for item in value]
setattr(self, key, value)
def model_dump(self, *args, **kwargs):
# Convert object to dict and add timestamp
data = self.__dict__
data['created_at'] = datetime.now().isoformat()
return data
# Main client class for interacting with Amazon Bedrock
class BedrockClient:
def __init__(self):
# Initialize Bedrock client, you need to configure AWS env first
try:
self.client = boto3.client('bedrock-runtime')
self.chat = Chat(self.client)
except Exception as e:
print(f"Error initializing Bedrock client: {e}")
sys.exit(1)
# Chat interface class
class Chat:
def __init__(self, client):
self.completions = ChatCompletions(client)
# Core class handling chat completions functionality
class ChatCompletions:
def __init__(self, client):
self.client = client
def _convert_openai_tools_to_bedrock_format(self, tools):
# Convert OpenAI function calling format to Bedrock tool format
bedrock_tools = []
for tool in tools:
if tool.get('type') == 'function':
function = tool.get('function', {})
bedrock_tool = {
"toolSpec": {
"name": function.get('name', ''),
"description": function.get('description', ''),
"inputSchema": {
"json": {
"type": "object",
"properties": function.get('parameters', {}).get('properties', {}),
"required": function.get('parameters', {}).get('required', [])
}
}
}
}
bedrock_tools.append(bedrock_tool)
return bedrock_tools
def _convert_openai_messages_to_bedrock_format(self, messages):
# Convert OpenAI message format to Bedrock message format
bedrock_messages = []
system_prompt = []
for message in messages:
if message.get('role') == 'system':
system_prompt = [{"text": message.get('content')}]
elif message.get('role') == 'user':
bedrock_message = {
"role": message.get('role', 'user'),
"content": [{"text": message.get('content')}]
}
bedrock_messages.append(bedrock_message)
elif message.get('role') == 'assistant':
bedrock_message = {
"role": "assistant",
"content": [{"text": message.get('content')}]
}
openai_tool_calls = message.get('tool_calls', [])
if openai_tool_calls:
bedrock_tool_use = {
"toolUseId": openai_tool_calls[0]['id'],
"name": openai_tool_calls[0]['function']['name'],
"input": json.loads(openai_tool_calls[0]['function']['arguments'])
}
bedrock_message['content'].append({"toolUse": bedrock_tool_use})
global CURRENT_TOOLUSE_ID
CURRENT_TOOLUSE_ID = openai_tool_calls[0]['id']
bedrock_messages.append(bedrock_message)
elif message.get('role') == 'tool':
bedrock_message = {
"role": "user",
"content": [
{
"toolResult": {
"toolUseId": CURRENT_TOOLUSE_ID,
"content": [{"text":message.get('content')}]
}
}
]
}
bedrock_messages.append(bedrock_message)
else:
raise ValueError(f"Invalid role: {message.get('role')}")
return system_prompt, bedrock_messages
def _convert_bedrock_response_to_openai_format(self, bedrock_response):
# Convert Bedrock response format to OpenAI format
content = ""
if bedrock_response.get('output', {}).get('message', {}).get('content'):
content_array = bedrock_response['output']['message']['content']
content = "".join(item.get('text', '') for item in content_array)
if content == "": content = "."
# Handle tool calls in response
openai_tool_calls = []
if bedrock_response.get('output', {}).get('message', {}).get('content'):
for content_item in bedrock_response['output']['message']['content']:
if content_item.get('toolUse'):
bedrock_tool_use = content_item['toolUse']
global CURRENT_TOOLUSE_ID
CURRENT_TOOLUSE_ID = bedrock_tool_use['toolUseId']
openai_tool_call = {
'id': CURRENT_TOOLUSE_ID,
'type': 'function',
'function': {
'name': bedrock_tool_use['name'],
'arguments': json.dumps(bedrock_tool_use['input'])
}
}
openai_tool_calls.append(openai_tool_call)
# Construct final OpenAI format response
openai_format = {
"id": f"chatcmpl-{uuid.uuid4()}",
"created": int(time.time()),
"object": "chat.completion",
"system_fingerprint": None,
"choices": [
{
"finish_reason": bedrock_response.get('stopReason', 'end_turn'),
"index": 0,
"message": {
"content": content,
"role": bedrock_response.get('output', {}).get('message', {}).get('role', 'assistant'),
"tool_calls": openai_tool_calls if openai_tool_calls != [] else None,
"function_call": None
}
}
],
"usage": {
"completion_tokens": bedrock_response.get('usage', {}).get('outputTokens', 0),
"prompt_tokens": bedrock_response.get('usage', {}).get('inputTokens', 0),
"total_tokens": bedrock_response.get('usage', {}).get('totalTokens', 0)
}
}
return OpenAIResponse(openai_format)
async def _invoke_bedrock(
self,
model: str,
messages: List[Dict[str, str]],
max_tokens: int,
temperature: float,
tools: Optional[List[dict]] = None,
tool_choice: Literal["none", "auto", "required"] = "auto",
**kwargs
) -> OpenAIResponse:
# Non-streaming invocation of Bedrock model
system_prompt, bedrock_messages = self._convert_openai_messages_to_bedrock_format(messages)
response = self.client.converse(
modelId = model,
system = system_prompt,
messages = bedrock_messages,
inferenceConfig = {"temperature": temperature, "maxTokens": max_tokens},
toolConfig = {"tools": tools} if tools else None,
)
openai_response = self._convert_bedrock_response_to_openai_format(response)
return openai_response
async def _invoke_bedrock_stream(
self,
model: str,
messages: List[Dict[str, str]],
max_tokens: int,
temperature: float,
tools: Optional[List[dict]] = None,
tool_choice: Literal["none", "auto", "required"] = "auto",
**kwargs
) -> OpenAIResponse:
# Streaming invocation of Bedrock model
system_prompt, bedrock_messages = self._convert_openai_messages_to_bedrock_format(messages)
response = self.client.converse_stream(
modelId = model,
system = system_prompt,
messages = bedrock_messages,
inferenceConfig = {"temperature": temperature, "maxTokens": max_tokens},
toolConfig = {"tools": tools} if tools else None,
)
# Initialize response structure
bedrock_response = {
'output': {
'message': {
'role': '',
'content': []
}
},
'stopReason': '',
'usage': {},
'metrics': {}
}
bedrock_response_text = ""
bedrock_response_tool_input = ""
# Process streaming response
stream = response.get('stream')
if stream:
for event in stream:
if event.get('messageStart', {}).get('role'):
bedrock_response['output']['message']['role'] = event['messageStart']['role']
if event.get('contentBlockDelta', {}).get('delta', {}).get('text'):
bedrock_response_text += event['contentBlockDelta']['delta']['text']
print(event['contentBlockDelta']['delta']['text'], end='', flush=True)
if event.get('contentBlockStop', {}).get('contentBlockIndex') == 0:
bedrock_response['output']['message']['content'].append({"text": bedrock_response_text})
if event.get('contentBlockStart', {}).get('start', {}).get('toolUse'):
bedrock_tool_use = event['contentBlockStart']['start']['toolUse']
tool_use = {
"toolUseId": bedrock_tool_use['toolUseId'],
"name": bedrock_tool_use['name'],
}
bedrock_response['output']['message']['content'].append({"toolUse": tool_use})
global CURRENT_TOOLUSE_ID
CURRENT_TOOLUSE_ID = bedrock_tool_use['toolUseId']
if event.get('contentBlockDelta', {}).get('delta', {}).get('toolUse'):
bedrock_response_tool_input += event['contentBlockDelta']['delta']['toolUse']['input']
print(event['contentBlockDelta']['delta']['toolUse']['input'], end='', flush=True)
if event.get('contentBlockStop', {}).get('contentBlockIndex') == 1:
bedrock_response['output']['message']['content'][1]['toolUse']['input'] = json.loads(bedrock_response_tool_input)
print()
openai_response = self._convert_bedrock_response_to_openai_format(bedrock_response)
return openai_response
def create(
self,
model: str,
messages: List[Dict[str, str]],
max_tokens: int,
temperature: float,
stream: Optional[bool] = True,
tools: Optional[List[dict]] = None,
tool_choice: Literal["none", "auto", "required"] = "auto",
**kwargs
) -> OpenAIResponse:
# Main entry point for chat completion
bedrock_tools = []
if tools is not None:
bedrock_tools = self._convert_openai_tools_to_bedrock_format(tools)
if stream:
return self._invoke_bedrock_stream(model, messages, max_tokens, temperature, bedrock_tools, tool_choice, **kwargs)
else:
return self._invoke_bedrock(model, messages, max_tokens, temperature, bedrock_tools, tool_choice, **kwargs)

View File

@ -28,6 +28,7 @@ from app.schema import (
Message, Message,
ToolChoice, ToolChoice,
) )
from app.bedrock import BedrockClient
REASONING_MODELS = ["o1", "o3-mini"] REASONING_MODELS = ["o1", "o3-mini"]
@ -225,6 +226,8 @@ class LLM:
api_key=self.api_key, api_key=self.api_key,
api_version=self.api_version, api_version=self.api_version,
) )
elif self.api_type == "aws":
self.client = BedrockClient()
else: else:
self.client = AsyncOpenAI(api_key=self.api_key, base_url=self.base_url) self.client = AsyncOpenAI(api_key=self.api_key, base_url=self.base_url)

View File

@ -31,3 +31,5 @@ pytest-asyncio~=0.25.3
mcp~=1.4.1 mcp~=1.4.1
httpx>=0.27.0 httpx>=0.27.0
tomli>=2.0.0 tomli>=2.0.0
boto3~=1.37.16