| | import ast |
| | import enum |
| | import importlib |
| | import json |
| | import os |
| | import pickle |
| | import subprocess |
| | import tempfile |
| | import traceback |
| | import zipfile |
| | from typing import Any, ClassVar |
| | from urllib.parse import urljoin |
| |
|
| | import pandas as pd |
| | import requests |
| | import tqdm |
| | from langchain_core.callbacks import BaseCallbackHandler |
| | from langchain_core.messages.base import get_msg_title_repr |
| | from langchain_core.tools import StructuredTool |
| | from langchain_core.utils.interactive_env import is_interactive_env |
| | from pydantic import BaseModel, Field, ValidationError |
| |
|
| |
|
| | def run_bash_script(script: str) -> str: |
| | """Run a Bash script using subprocess. |
| | |
| | Args: |
| | script: Bash script to run |
| | |
| | Returns: |
| | Output of the Bash script |
| | |
| | Example: |
| | ``` |
| | # Example of a complex Bash script |
| | script = ''' |
| | #!/bin/bash |
| | |
| | # Define variables |
| | DATA_DIR="/path/to/data" |
| | OUTPUT_FILE="results.txt" |
| | |
| | # Create output directory if it doesn't exist |
| | mkdir -p $(dirname $OUTPUT_FILE) |
| | |
| | # Loop through files |
| | for file in $DATA_DIR/*.txt; do |
| | echo "Processing $file..." |
| | # Count lines in each file |
| | line_count=$(wc -l < $file) |
| | echo "$file: $line_count lines" >> $OUTPUT_FILE |
| | done |
| | |
| | echo "Processing complete. Results saved to $OUTPUT_FILE" |
| | ''' |
| | result = run_bash_script(script) |
| | print(result) |
| | ``` |
| | |
| | """ |
| | try: |
| | |
| | script = script.strip() |
| |
|
| | |
| | if not script: |
| | return "Error: Empty script" |
| |
|
| | |
| | with tempfile.NamedTemporaryFile(suffix=".sh", mode="w", delete=False) as f: |
| | |
| | if not script.startswith("#!/"): |
| | f.write("#!/bin/bash\n") |
| | |
| | if "set -e" not in script: |
| | f.write("set -e\n") |
| | f.write(script) |
| | temp_file = f.name |
| |
|
| | |
| | os.chmod(temp_file, 0o755) |
| |
|
| | |
| | env = os.environ.copy() |
| | cwd = os.getcwd() |
| |
|
| | |
| | result = subprocess.run( |
| | [temp_file], |
| | shell=True, |
| | capture_output=True, |
| | text=True, |
| | check=False, |
| | env=env, |
| | cwd=cwd, |
| | ) |
| |
|
| | |
| | os.unlink(temp_file) |
| |
|
| | |
| | if result.returncode != 0: |
| | traceback.print_stack() |
| | print(result) |
| | return f"Error running Bash script (exit code {result.returncode}):\n{result.stderr}" |
| | else: |
| | return result.stdout |
| | except Exception as e: |
| | traceback.print_exc() |
| | return f"Error running Bash script: {str(e)}" |
| |
|
| |
|
| | |
| | def run_cli_command(command: str) -> str: |
| | """Run a CLI command using subprocess. |
| | |
| | Args: |
| | command: CLI command to run |
| | |
| | Returns: |
| | Output of the CLI command |
| | |
| | """ |
| | try: |
| | |
| | command = command.strip() |
| |
|
| | |
| | if not command: |
| | return "Error: Empty command" |
| |
|
| | |
| | import shlex |
| |
|
| | args = shlex.split(command) |
| |
|
| | |
| | result = subprocess.run(args, capture_output=True, text=True, check=False) |
| |
|
| | |
| | if result.returncode != 0: |
| | return f"Error running command '{command}':\n{result.stderr}" |
| | else: |
| | return result.stdout |
| | except Exception as e: |
| | return f"Error running command '{command}': {str(e)}" |
| |
|
| |
|
| | def run_with_timeout(func, args=None, kwargs=None, timeout=600): |
| | """Run a function with a timeout using threading instead of multiprocessing. |
| | This allows variables to persist in the global namespace between function calls. |
| | Returns the function result or a timeout error message. |
| | """ |
| | if args is None: |
| | args = [] |
| | if kwargs is None: |
| | kwargs = {} |
| |
|
| | import ctypes |
| | import queue |
| | import threading |
| |
|
| | result_queue = queue.Queue() |
| |
|
| | def thread_func(func, args, kwargs, result_queue): |
| | """Function to run in a separate thread.""" |
| | try: |
| | result = func(*args, **kwargs) |
| | result_queue.put(("success", result)) |
| | except Exception as e: |
| | result_queue.put(("error", str(e))) |
| |
|
| | |
| | thread = threading.Thread(target=thread_func, args=(func, args, kwargs, result_queue)) |
| | thread.daemon = True |
| | thread.start() |
| |
|
| | |
| | thread.join(timeout) |
| |
|
| | |
| | if thread.is_alive(): |
| | print(f"TIMEOUT: Code execution timed out after {timeout} seconds") |
| |
|
| | |
| | |
| | |
| | try: |
| | |
| | thread_id = thread.ident |
| | if thread_id: |
| | |
| | |
| | res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(thread_id), ctypes.py_object(SystemExit)) |
| | if res > 1: |
| | |
| | ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(thread_id), None) |
| | except Exception as e: |
| | print(f"Error trying to terminate thread: {e}") |
| |
|
| | return f"ERROR: Code execution timed out after {timeout} seconds. Please try with simpler inputs or break your task into smaller steps." |
| |
|
| | |
| | try: |
| | status, result = result_queue.get(block=False) |
| | return result if status == "success" else f"Error in execution: {result}" |
| | except queue.Empty: |
| | return "Error: Execution completed but no result was returned" |
| |
|
| |
|
| | class api_schema(BaseModel): |
| | """api schema specification.""" |
| |
|
| | api_schema: str | None = Field(description="The api schema as a dictionary") |
| |
|
| |
|
| | def function_to_api_schema(function_string, llm): |
| | prompt = """ |
| | Based on a code snippet and help me write an API docstring in the format like this: |
| | |
| | {{'name': 'get_gene_set_enrichment', |
| | 'description': 'Given a list of genes, identify a pathway that is enriched for this gene set. Return a list of pathway name, p-value, z-scores.', |
| | 'required_parameters': [{{'name': 'genes', |
| | 'type': 'List[str]', |
| | 'description': 'List of g`ene symbols to analyze', |
| | 'default': None}}], |
| | 'optional_parameters': [{{'name': 'top_k', |
| | 'type': 'int', |
| | 'description': 'Top K pathways to return', |
| | 'default': 10}}, {{'name': 'database', |
| | 'type': 'str', |
| | 'description': 'Name of the database to use for enrichment analysis', |
| | 'default': "gene_ontology"}}]}} |
| | |
| | Strictly follow the input from the function - don't create fake optional parameters. |
| | For variable without default values, set them as None, not null. |
| | For variable with boolean values, use capitalized True or False, not true or false. |
| | Do not add any return type in the docstring. |
| | Be as clear and succint as possible for the descriptions. Please do not make it overly verbose. |
| | Here is the code snippet: |
| | {code} |
| | """ |
| | llm = llm.with_structured_output(api_schema) |
| |
|
| | for _ in range(7): |
| | try: |
| | api = llm.invoke(prompt.format(code=function_string)).dict()["api_schema"] |
| | return ast.literal_eval(api) |
| | |
| | except Exception as e: |
| | print("API string:", api) |
| | print("Error parsing the API string:", e) |
| | continue |
| |
|
| | return "Error: Could not parse the API schema" |
| | |
| |
|
| |
|
| | def get_all_functions_from_file(file_path): |
| | with open(file_path) as file: |
| | file_content = file.read() |
| |
|
| | |
| | tree = ast.parse(file_content) |
| |
|
| | |
| | functions = [] |
| |
|
| | |
| | for node in tree.body: |
| | if isinstance(node, ast.FunctionDef): |
| | |
| | if node.name.startswith("_"): |
| | continue |
| |
|
| | start_line = node.lineno - 1 |
| | end_line = node.end_lineno |
| | func_code = file_content.splitlines()[start_line:end_line] |
| | functions.append("\n".join(func_code)) |
| |
|
| | return functions |
| |
|
| |
|
| | def write_python_code(request: str): |
| | from langchain_anthropic import ChatAnthropic |
| | from langchain_core.output_parsers import StrOutputParser |
| | from langchain_core.prompts import ChatPromptTemplate |
| |
|
| | model = ChatAnthropic(model="claude-3-5-sonnet-20240620") |
| | template = """Write some python code to solve the user's problem. |
| | |
| | Return only python code in Markdown format, e.g.: |
| | |
| | ```python |
| | .... |
| | ```""" |
| | prompt = ChatPromptTemplate.from_messages([("system", template), ("human", "{input}")]) |
| |
|
| | def _sanitize_output(text: str): |
| | _, after = text.split("```python") |
| | return after.split("```")[0] |
| |
|
| | chain = prompt | model | StrOutputParser() | _sanitize_output |
| | return chain.invoke({"input": "write a code that " + request}) |
| |
|
| |
|
| | def execute_graphql_query( |
| | query: str, |
| | variables: dict, |
| | api_address: str = "https://api.genetics.opentargets.org/graphql", |
| | ) -> dict: |
| | """Executes a GraphQL query with variables and returns the data as a dictionary.""" |
| | headers = {"Content-Type": "application/json"} |
| | response = requests.post(api_address, json={"query": query, "variables": variables}, headers=headers) |
| | if response.status_code == 200: |
| | return response.json() |
| | else: |
| | print(response.text) |
| | response.raise_for_status() |
| |
|
| |
|
| | def get_tool_decorated_functions(relative_path): |
| | import ast |
| | import importlib.util |
| | import os |
| |
|
| | |
| | current_dir = os.path.dirname(os.path.abspath(__file__)) |
| |
|
| | |
| | file_path = os.path.join(current_dir, relative_path) |
| |
|
| | with open(file_path) as file: |
| | tree = ast.parse(file.read(), filename=file_path) |
| |
|
| | tool_function_names = [] |
| |
|
| | for node in ast.walk(tree): |
| | if isinstance(node, ast.FunctionDef): |
| | for decorator in node.decorator_list: |
| | if ( |
| | isinstance(decorator, ast.Name) |
| | and decorator.id == "tool" |
| | or ( |
| | isinstance(decorator, ast.Call) |
| | and isinstance(decorator.func, ast.Name) |
| | and decorator.func.id == "tool" |
| | ) |
| | ): |
| | tool_function_names.append(node.name) |
| |
|
| | |
| | package_path = os.path.relpath(file_path, start=current_dir) |
| | module_name = package_path.replace(os.path.sep, ".").rsplit(".", 1)[0] |
| |
|
| | |
| | spec = importlib.util.spec_from_file_location(module_name, file_path) |
| | module = importlib.util.module_from_spec(spec) |
| | spec.loader.exec_module(module) |
| |
|
| | tool_functions = [getattr(module, name) for name in tool_function_names] |
| |
|
| | return tool_functions |
| |
|
| |
|
| | def load_pickle(file): |
| | import pickle |
| |
|
| | with open(file, "rb") as f: |
| | return pickle.load(f) |
| |
|
| | def pretty_print(message, printout=True): |
| | if isinstance(message, tuple): |
| | title = message |
| | elif isinstance(message.content, list): |
| | title = get_msg_title_repr(message.type.title().upper() + " Message", bold=is_interactive_env()) |
| | if message.name is not None: |
| | title += f"\nName: {message.name}" |
| |
|
| | for i in message.content: |
| | if i["type"] == "text": |
| | title += f"\n{i['text']}\n" |
| | elif i["type"] == "tool_use": |
| | title += f"\nTool: {i['name']}" |
| | title += f"\nInput: {i['input']}" |
| | if printout: |
| | print(f"{title}") |
| | else: |
| | title = get_msg_title_repr(message.type.title() + " Message", bold=is_interactive_env()) |
| | if message.name is not None: |
| | title += f"\nName: {message.name}" |
| | title += f"\n\n{message.content}" |
| | if printout: |
| | print(f"{title}") |
| | return title |
| |
|
| |
|
| | class CustomBaseModel(BaseModel): |
| | api_schema: ClassVar[dict] = None |
| |
|
| | |
| | model_config = {"arbitrary_types_allowed": True} |
| |
|
| | @classmethod |
| | def set_api_schema(cls, schema: dict): |
| | cls.api_schema = schema |
| |
|
| | @classmethod |
| | def model_validate(cls, obj): |
| | try: |
| | return super().model_validate(obj) |
| | except (ValidationError, AttributeError) as e: |
| | if not cls.api_schema: |
| | raise e |
| |
|
| | error_msg = "Required Parameters:\n" |
| | for param in cls.api_schema["required_parameters"]: |
| | error_msg += f"- {param['name']} ({param['type']}): {param['description']}\n" |
| |
|
| | error_msg += "\nErrors:\n" |
| | for err in e.errors(): |
| | field = err["loc"][0] if err["loc"] else "input" |
| | error_msg += f"- {field}: {err['msg']}\n" |
| |
|
| | if not obj: |
| | error_msg += "\nNo input provided" |
| | else: |
| | error_msg += "\nProvided Input:\n" |
| | for key, value in obj.items(): |
| | error_msg += f"- {key}: {value}\n" |
| |
|
| | missing_params = {param["name"] for param in cls.api_schema["required_parameters"]} - set(obj.keys()) |
| | if missing_params: |
| | error_msg += "\nMissing Parameters:\n" |
| | for param in missing_params: |
| | error_msg += f"- {param}\n" |
| |
|
| | |
| | raise ValidationError.from_exception_data( |
| | title="Validation Error", |
| | line_errors=[ |
| | { |
| | "type": "value_error", |
| | "loc": ("input",), |
| | "input": obj, |
| | "ctx": { |
| | "error": error_msg, |
| | }, |
| | } |
| | ], |
| | ) from None |
| |
|
| |
|
| | def safe_execute_decorator(func): |
| | def wrapper(*args, **kwargs): |
| | try: |
| | return func(*args, **kwargs) |
| | except Exception as e: |
| | return str(e) |
| |
|
| | return wrapper |
| |
|
| |
|
| | def api_schema_to_langchain_tool(api_schema, mode="generated_tool", module_name=None): |
| | if mode == "generated_tool": |
| | module = importlib.import_module("histopath.tool.generated_tool." + api_schema["tool_name"] + ".api") |
| | elif mode == "custom_tool": |
| | module = importlib.import_module(module_name) |
| |
|
| | api_function = getattr(module, api_schema["name"]) |
| | api_function = safe_execute_decorator(api_function) |
| |
|
| | |
| | type_mapping = { |
| | "string": str, |
| | "integer": int, |
| | "boolean": bool, |
| | "pandas": pd.DataFrame, |
| | "str": str, |
| | "int": int, |
| | "bool": bool, |
| | "List[str]": list[str], |
| | "List[int]": list[int], |
| | "Dict": dict, |
| | "Any": Any, |
| | } |
| |
|
| | |
| | annotations = {} |
| | for param in api_schema["required_parameters"]: |
| | param_type = param["type"] |
| | if param_type in type_mapping: |
| | annotations[param["name"]] = type_mapping[param_type] |
| | else: |
| | |
| | try: |
| | annotations[param["name"]] = eval(param_type) |
| | except (NameError, SyntaxError): |
| | |
| | annotations[param["name"]] = Any |
| |
|
| | fields = {param["name"]: Field(description=param["description"]) for param in api_schema["required_parameters"]} |
| |
|
| | |
| | ApiInput = type("Input", (CustomBaseModel,), {"__annotations__": annotations, **fields}) |
| | |
| | ApiInput.set_api_schema(api_schema) |
| |
|
| | |
| | api_tool = StructuredTool.from_function( |
| | func=api_function, |
| | name=api_schema["name"], |
| | description=api_schema["description"], |
| | args_schema=ApiInput, |
| | return_direct=True, |
| | ) |
| |
|
| | return api_tool |
| |
|
| | class ID(enum.Enum): |
| | ENTREZ = "Entrez" |
| | ENSEMBL = "Ensembl without version" |
| | ENSEMBL_W_VERSION = "Ensembl with version" |
| |
|
| | def save_pkl(f, filename): |
| | with open(filename, "wb") as file: |
| | pickle.dump(f, file) |
| |
|
| | def load_pkl(filename): |
| | with open(filename, "rb") as file: |
| | return pickle.load(file) |
| |
|
| | _TEXT_COLOR_MAPPING = { |
| | "blue": "36;1", |
| | "yellow": "33;1", |
| | "pink": "38;5;200", |
| | "green": "32;1", |
| | "red": "31;1", |
| | } |
| |
|
| | def color_print(text, color="blue"): |
| | color_str = _TEXT_COLOR_MAPPING[color] |
| | print(f"\u001b[{color_str}m\033[1;3m{text}\u001b[0m") |
| |
|
| | class PromptLogger(BaseCallbackHandler): |
| | def on_chat_model_start(self, serialized, messages, **kwargs): |
| | for message in messages[0]: |
| | color_print(message.pretty_repr(), color="green") |
| |
|
| |
|
| | class NodeLogger(BaseCallbackHandler): |
| | def on_llm_end(self, response, **kwargs): |
| | for generations in response.generations: |
| | for generation in generations: |
| | generated_text = generation.message.content |
| | |
| | color_print(generated_text, color="yellow") |
| |
|
| | def on_agent_action(self, action, **kwargs): |
| | color_print(action.log, color="pink") |
| |
|
| | def on_agent_finish(self, finish, **kwargs): |
| | color_print(finish, color="red") |
| |
|
| | def on_tool_start(self, serialized, input_str, **kwargs): |
| | tool_name = serialized.get("name") |
| | color_print(f"Calling {tool_name} with inputs: {input_str}", color="pink") |
| |
|
| | def on_tool_end(self, output, **kwargs): |
| | output = str(output) |
| | color_print(output, color="blue") |
| |
|
| |
|
| | def check_or_create_path(path=None): |
| | |
| | if path is None: |
| | path = os.path.join(os.getcwd(), "tmp_directory") |
| |
|
| | |
| | if not os.path.exists(path): |
| | |
| | os.makedirs(path) |
| | print(f"Directory created at: {path}") |
| | else: |
| | print(f"Directory already exists at: {path}") |
| |
|
| | return path |
| |
|
| |
|
| | def langchain_to_gradio_message(message): |
| | |
| | if isinstance(message.content, list): |
| | |
| | gradio_messages = [] |
| | for item in message.content: |
| | gradio_message = { |
| | "role": "user" if message.type == "human" else "assistant", |
| | "content": "", |
| | "metadata": {}, |
| | } |
| |
|
| | if item["type"] == "text": |
| | item["text"] = item["text"].replace("<think>", "\n") |
| | item["text"] = item["text"].replace("</think>", "\n") |
| | gradio_message["content"] += f"{item['text']}\n" |
| | gradio_messages.append(gradio_message) |
| | elif item["type"] == "tool_use": |
| | if item["name"] == "run_python_repl": |
| | gradio_message["metadata"]["title"] = "๐ ๏ธ Writing code..." |
| | |
| | gradio_message["metadata"]["log"] = "Executing Code block..." |
| | gradio_message["content"] = f"##### Code: \n ```python \n {item['input']['command']} \n``` \n" |
| | else: |
| | gradio_message["metadata"]["title"] = f"๐ ๏ธ Used tool ```{item['name']}```" |
| | to_print = ";".join([i + ": " + str(j) for i, j in item["input"].items()]) |
| | gradio_message["metadata"]["log"] = f"๐ Input -- {to_print}\n" |
| | gradio_message["metadata"]["status"] = "pending" |
| | gradio_messages.append(gradio_message) |
| |
|
| | else: |
| | gradio_message = { |
| | "role": "user" if message.type == "human" else "assistant", |
| | "content": "", |
| | "metadata": {}, |
| | } |
| | print(message) |
| | content = message.content |
| | content = content.replace("<think>", "\n") |
| | content = content.replace("</think>", "\n") |
| | content = content.replace("<solution>", "\n") |
| | content = content.replace("</solution>", "\n") |
| |
|
| | gradio_message["content"] = content |
| | gradio_messages = [gradio_message] |
| | return gradio_messages |
| |
|
| |
|
| | def parse_hpo_obo(file_path): |
| | """Parse the HPO OBO file and create a dictionary mapping HP IDs to phenotype descriptions. |
| | |
| | Args: |
| | file_path (str): Path to the HPO OBO file. |
| | |
| | Returns: |
| | dict: A dictionary where keys are HP IDs and values are phenotype descriptions. |
| | |
| | """ |
| | hp_dict = {} |
| | current_id = None |
| | current_name = None |
| |
|
| | with open(file_path) as file: |
| | for line in file: |
| | line = line.strip() |
| | if line.startswith("[Term]"): |
| | |
| | if current_id and current_name: |
| | hp_dict[current_id] = current_name |
| | current_id = None |
| | current_name = None |
| | elif line.startswith("id: HP:"): |
| | current_id = line.split(": ")[1] |
| | elif line.startswith("name:"): |
| | current_name = line.split(": ", 1)[1] |
| |
|
| | |
| | if current_id and current_name: |
| | hp_dict[current_id] = current_name |
| |
|
| | return hp_dict |
| |
|
| |
|
| | def textify_api_dict(api_dict): |
| | """Convert a nested API dictionary to a nicely formatted string.""" |
| | lines = [] |
| | for category, methods in api_dict.items(): |
| | lines.append(f"Import file: {category}") |
| | lines.append("=" * (len("Import file: ") + len(category))) |
| | for method in methods: |
| | lines.append(f"Method: {method.get('name', 'N/A')}") |
| | lines.append(f" Description: {method.get('description', 'No description provided.')}") |
| |
|
| | |
| | req_params = method.get("required_parameters", []) |
| | if req_params: |
| | lines.append(" Required Parameters:") |
| | for param in req_params: |
| | param_name = param.get("name", "N/A") |
| | param_type = param.get("type", "N/A") |
| | param_desc = param.get("description", "No description") |
| | param_default = param.get("default", "None") |
| | lines.append(f" - {param_name} ({param_type}): {param_desc} [Default: {param_default}]") |
| |
|
| | |
| | opt_params = method.get("optional_parameters", []) |
| | if opt_params: |
| | lines.append(" Optional Parameters:") |
| | for param in opt_params: |
| | param_name = param.get("name", "N/A") |
| | param_type = param.get("type", "N/A") |
| | param_desc = param.get("description", "No description") |
| | param_default = param.get("default", "None") |
| | lines.append(f" - {param_name} ({param_type}): {param_desc} [Default: {param_default}]") |
| |
|
| | lines.append("") |
| | lines.append("") |
| |
|
| | return "\n".join(lines) |
| |
|
| |
|
| | def read_module2api(): |
| | fields = [ |
| | "support_tools", |
| | "pathology" |
| | ] |
| |
|
| | module2api = {} |
| | for field in fields: |
| | module_name = f"histopath.tool.tool_description.{field}" |
| | module = importlib.import_module(module_name) |
| | module2api[f"histopath.tool.{field}"] = module.description |
| | return module2api |