diff --git a/app/tool/terminal.py b/app/tool/terminal.py new file mode 100644 index 0000000..df5996e --- /dev/null +++ b/app/tool/terminal.py @@ -0,0 +1,182 @@ +import asyncio +import os +import shlex +from typing import Optional + +from app.tool.base import BaseTool, CLIResult + + +class Terminal(BaseTool): + name: str = "execute_command" + description: str = """Request to execute a CLI command on the system. +Use this when you need to perform system operations or run specific commands to accomplish any step in the user's task. +You must tailor your command to the user's system and provide a clear explanation of what the command does. +Prefer to execute complex CLI commands over creating executable scripts, as they are more flexible and easier to run. +Commands will be executed in the current working directory. +Note: You MUST append a `sleep 0.05` to the end of the command for commands that will complete in under 50ms, as this will circumvent a known issue with the terminal tool where it will sometimes not return the output when the command completes too quickly. +""" + parameters: dict = { + "type": "object", + "properties": { + "command": { + "type": "string", + "description": "(required) The CLI command to execute. This should be valid for the current operating system. Ensure the command is properly formatted and does not contain any harmful instructions.", + } + }, + "required": ["command"], + } + process: Optional[asyncio.subprocess.Process] = None + current_path: str = os.getcwd() + lock: asyncio.Lock = asyncio.Lock() + + async def execute(self, command: str) -> CLIResult: + """ + Execute a terminal command asynchronously with persistent context. + + Args: + command (str): The terminal command to execute. + + Returns: + str: The output, and error of the command execution. + """ + # Split the command by & to handle multiple commands + commands = [cmd.strip() for cmd in command.split('&') if cmd.strip()] + final_output = CLIResult(output="", error="") + + for cmd in commands: + sanitized_command = self._sanitize_command(cmd) + + # Handle 'cd' command internally + if sanitized_command.lstrip().startswith("cd "): + result = await self._handle_cd_command(sanitized_command) + else: + async with self.lock: + try: + self.process = await asyncio.create_subprocess_shell( + sanitized_command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + cwd=self.current_path, + ) + stdout, stderr = await self.process.communicate() + result = CLIResult( + output=stdout.decode().strip(), + error=stderr.decode().strip() + ) + except Exception as e: + result = CLIResult(output="", error=str(e)) + finally: + self.process = None + + # Combine outputs + if result.output: + final_output.output += (result.output + "\n") if final_output.output else result.output + if result.error: + final_output.error += (result.error + "\n") if final_output.error else result.error + + # Remove trailing newlines + final_output.output = final_output.output.rstrip() + final_output.error = final_output.error.rstrip() + return final_output + + async def execute_in_env(self, env_name: str, command: str) -> CLIResult: + """ + Execute a terminal command asynchronously within a specified Conda environment. + + Args: + env_name (str): The name of the Conda environment. + command (str): The terminal command to execute within the environment. + + Returns: + str: The output, and error of the command execution. + """ + sanitized_command = self._sanitize_command(command) + + # Construct the command to run within the Conda environment + # Using 'conda run -n env_name command' to execute without activating + conda_command = f"conda run -n {shlex.quote(env_name)} {sanitized_command}" + + return await self.execute(conda_command) + + async def _handle_cd_command(self, command: str) -> CLIResult: + """ + Handle 'cd' commands to change the current path. + + Args: + command (str): The 'cd' command to process. + + Returns: + TerminalOutput: The result of the 'cd' command. + """ + try: + parts = shlex.split(command) + if len(parts) < 2: + new_path = os.path.expanduser("~") + else: + new_path = os.path.expanduser(parts[1]) + + # Handle relative paths + if not os.path.isabs(new_path): + new_path = os.path.join(self.current_path, new_path) + + new_path = os.path.abspath(new_path) + + if os.path.isdir(new_path): + self.current_path = new_path + return CLIResult( + output=f"Changed directory to {self.current_path}", + error="" + ) + else: + return CLIResult( + output="", + error=f"No such directory: {new_path}" + ) + except Exception as e: + return CLIResult(output="", error=str(e)) + + @staticmethod + def _sanitize_command(command: str) -> str: + """ + Sanitize the command for safe execution. + + Args: + command (str): The command to sanitize. + + Returns: + str: The sanitized command. + """ + # Example sanitization: restrict certain dangerous commands + dangerous_commands = ["rm", "sudo", "shutdown", "reboot"] + try: + parts = shlex.split(command) + if any(cmd in dangerous_commands for cmd in parts): + raise ValueError("Use of dangerous commands is restricted.") + except Exception as e: + # If shlex.split fails, try basic string comparison + if any(cmd in command for cmd in dangerous_commands): + raise ValueError("Use of dangerous commands is restricted.") + + # Additional sanitization logic can be added here + return command + + async def close(self): + """Close the persistent shell process if it exists.""" + async with self.lock: + if self.process: + self.process.terminate() + try: + await asyncio.wait_for(self.process.wait(), timeout=5) + except asyncio.TimeoutError: + self.process.kill() + await self.process.wait() + finally: + self.process = None + + async def __aenter__(self): + """Enter the asynchronous context manager.""" + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Exit the asynchronous context manager and close the process.""" + await self.close()