sviluppo gestione documenti in corso. da rivedere

This commit is contained in:
Alfredo Oliviero 2025-09-10 14:51:34 +02:00
parent dec6273050
commit 694c2ba0a7
37 changed files with 1160 additions and 134 deletions

59
.env.test Normal file
View File

@ -0,0 +1,59 @@
# Environment variables for local testing.
# This file is loaded by pytest-dotenv and overrides variables from .env.
# Google AI Configuration
GOOGLE_GENAI_USE_VERTEXAI=FALSE
GOOGLE_API_KEY=AIzaSyCPTw-PjIJR1WqSSyLcdR7IHGAGPOBnb-M
# RAG Configuration
ENCODER_MODEL=all-MiniLM-L6-v2
#ENCODER_MODEL=models/text-embedding-004
VECTOR_SIZE=384
COLLECTION_NAME=dave_knowledge
# Qdrant Configuration
QDRANT_HOST=long-term-memory
QDRANT_PORT=6333
QDRANT_WEB_PORT=6334
# Docker Compose Configuration
DAVE_AGENT_PORT=8000
DAVE_AGENT_CONTAINER=dave-agent
QDRANT_CONTAINER=long-term-memory
DAVE_AGENT_CONTAINER_DEBUG=dave-agent-debug
QDRANT_CONTAINER_DEBUG=qdrant-storage-debug
DEBUG_PORT=5678
DEBUG_WAIT_FOR_ATTACH=true
# D4Science Python Library Configuration
D4S_ENV=production
#D4S_CONFIG_DIR=/etc/d4science
#D4S_CONFIG_FILENAME=d4s_config.json
#DAVE_CONFIG=/etc/dave_config.json
# D4Science Python Library Configuration for local execution
D4S_CONFIG_DIR=./config
D4S_CONFIG_FILENAME=d4s_config.json
# DAVE Agent Configuration for local execution
DAVE_CONFIG=./config/dave_config.json
# Other encoder options (uncomment to use):
# ENCODER_MODEL=all-mpnet-base-v2
# VECTOR_SIZE=768
#
# ENCODER_MODEL=multi-qa-mpnet-base-dot-v1
# VECTOR_SIZE=768
#
# ENCODER_MODEL=paraphrase-multilingual-MiniLM-L12-v2
# VECTOR_SIZE=384

1
.python-version Normal file
View File

@ -0,0 +1 @@
3.10.12

14
.vscode/launch.json vendored
View File

@ -20,6 +20,20 @@
}
],
"justMyCode": true
},
{
"name": "Python: Pytest",
"type": "python",
"request": "launch",
"module": "pytest",
"args": [
"tests/"
],
"justMyCode": true,
"env": {
"D4S_ENV": "prod",
"D4S_CONFIG_DIR": "${workspaceFolder}/config"
}
}
]
}

9
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,9 @@
{
"python.testing.pytestArgs": [
"tests"
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true,
"python.defaultInterpreterPath": "${workspaceFolder}/.venv/bin/python",
"python.testing.cwd": "${workspaceFolder}"
}

View File

@ -1,17 +1,21 @@
# Stage 1: Base image with Python and uv
FROM python:3.11-slim AS base
WORKDIR /app
RUN pip install uv
RUN pip install uv && \
apt-get update && \
apt-get install -y build-essential
# Stage 2: Builder stage with all dependencies and source code
FROM base AS builder
# Copy project definition and all source code
COPY pyproject.toml ./
COPY src ./src
COPY d4science_lib ./d4science_lib
# Install all dependencies (including dev) using a cache mount
RUN --mount=type=cache,target=/root/.cache/pip \
uv pip install --system -e .[dev] -e ./d4science_lib[dev]
COPY src ./src
RUN uv pip install --system -e .
RUN uv pip install --system -e ".[dev,tools]"
# Stage 3: Development image (dependencies only, no source code)
# Source code will be mounted via docker-compose.yaml

View File

@ -39,6 +39,15 @@ The application uses an `.env` file to manage environment-specific variables, su
2. **Edit the `.env` file**:
Review the variables in the `.env` file and adjust them if necessary.
> **Note on Caching:** The project uses a named volume (`uv-cache`) to create a persistent cache for the `uv` package manager. This will significantly speed up subsequent builds, as packages will not need to be re-downloaded.
>
> #### Development vs. Production Mounts
>
> The project is configured to handle the source code differently for development and production:
>
> * **Development (`debug.yaml`):** Your local `src` directory is mounted as a volume. This allows for hot-reloading, where changes you make to the code are immediately reflected in the running container.
> * **Production (`compose.yaml`):** The `src` directory is copied into the Docker image during the build process. This creates a self-contained, immutable image, which is the standard for production deployments.
#### D4Science Configuration (config/ directory)
The agent requires specific configuration and credential files to interact with D4Science services.
@ -92,7 +101,7 @@ This mode is for active development. It mounts the local source code into the co
## Development Scripts
### Documentation Scraper (`devel/scrape_docs.py`)
### Documentation Scraper (`tools/scrape_docs.py`)
This script is a utility to download the official ADK documentation as HTML files and convert them to Markdown. This can be useful for local reference or potentially for feeding the documentation into the agent's knowledge base in the future.
@ -101,10 +110,10 @@ This script is a utility to download the official ADK documentation as HTML file
- You must have installed the `[tools]` dependencies (`uv pip install -e ".[dev,tools]"`).
**Usage:**
1. Ensure your virtual environment is activated (`source .venv/bin/activate`).
2. Run the script from the project root directory:
```bash
python devel/scrape_docs.py
python tools/scrape_docs.py
```
The script will process the files and place the Markdown output in the `docs/adk-docs/markdown` directory.
@ -139,14 +148,14 @@ From the project root, create and activate a Python virtual environment. `uv` wi
Install the project dependencies from `pyproject.toml` in editable mode. The `[dev]` extra includes packages for debugging, and `[tools]` includes packages for running development scripts.
```bash
uv pip install -e ".[dev,tools]"
uv pip install -e ".[dev,tools,test]"
```
##### Updating Dependencies
If you modify the `pyproject.toml` file, re-run the installation command to sync your virtual environment. `uv` is fast, so a fresh install is often as quick as an upgrade.
```bash
uv pip install -e ".[dev,tools]"
uv pip install -e ".[dev,tools,test]"
```
#### 4. Running the Agent Locally

View File

@ -1,3 +1,6 @@
volumes:
uv-cache:
services:
dave-agent:
build:
@ -12,6 +15,7 @@ services:
- ./config/d4s_config.json:${D4S_CONFIG_DIR}/${D4S_CONFIG_FILENAME}
- ./config/d4s_credentials.json:/root/.d4science/auth.${D4S_ENV}.json
- ./mnt/local_content:/app/local_content
- uv-cache:/root/.cache
env_file:
- ./.env
depends_on:

@ -1 +1 @@
Subproject commit 4d5b313ad15ae8ede27c9167b95ce94715df0c56
Subproject commit 0684c0ee4788d97e92df2f3dbf8b8036e64aa4e6

View File

@ -1,3 +1,6 @@
volumes:
uv-cache:
services:
dave-agent:
build:
@ -15,6 +18,8 @@ services:
- ./config/dave_config.json:/etc/dave_config.json
- ./config/d4s_credentials.json:/root/.d4science/auth.${D4S_ENV}.json
- ./mnt/shared:/app/shared
- ./mnt/logs:/app/logs
- ./mnt/uv-cache:/root/.cache
env_file:
- ./.env
command:

60
docs/architecture.md Normal file
View File

@ -0,0 +1,60 @@
# Agent Architecture (v2.5 - Final)
This document describes the final, implemented architecture of the AI agent system. The design uses a pure "Coordinator and Toolbelt" pattern, where a high-level `RootDispatcherAgent` orchestrates all tasks by chaining its specialist tools.
## General Diagram
```mermaid
graph TD
subgraph "Tier 1: Root Dispatcher (Coordinator)"
Root[🤖 RootDispatcherAgent]
end
subgraph "Tier 2: Primary Tools"
RR[🤖 RetrievalRouterAgent]
DP[🤖 DocumentProcessorAgent]
RM[🤖 RagManagerAgent]
end
subgraph "Tier 3: Retrieval Specialists (Tools of RetrievalRouter)"
S_Local[🤖 LocalFileAgent]
S_Remote[🤖 DataRetrieverAgent]
S_WS[🤖 WorkspaceAgent]
end
%% --- Root Dispatcher Connections ---
Root -- chains --> RR
Root -- chains --> DP
Root -- chains --> RM
%% --- RetrievalRouter Connections ---
RR -- chooses --> S_Local
RR -- chooses --> S_Remote
RR -- chooses --> S_WS
```
## Tier 1: Root Dispatcher (The Coordinator)
### RootDispatcherAgent
- **File**: `src/dave_agent/agent.py`
- **Role**: The single entry point and central orchestrator. It analyzes the user's goal and chains its tools together to perform complex, multi-step operations like document ingestion.
- **Tools (as `AgentTool`)**:
- `RetrievalRouterAgent`: The dedicated gateway for any data retrieval or listing task.
- `DocumentProcessorAgent`: The specialist for analyzing text content.
- `RagManagerAgent`: The specialist for all long-term memory interactions.
## Tier 2: Primary Tools (The Toolbelt)
This tier consists of the agents directly available to the `RootDispatcher`.
- **`RetrievalRouterAgent`**: A specialized router that selects the correct data source specialist (`LocalFileAgent`, `WorkspaceAgent`, `DataRetrieverAgent`).
- **`DocumentProcessorAgent`**: Analyzes text to extract structured information.
- **`RagManagerAgent`**: Manages long-term memory.
## Tier 3: Low-Level Specialists (The Workers)
These agents perform single, focused tasks and are orchestrated by the agents in the tiers above them.
- **`LocalFileAgent`**: Reads files from configured local directories via aliases.
- **`WorkspaceAgent`**: Interacts with the D4Science remote workspace.
- **`DataRetrieverAgent`**: Fetches content from remote URLs.

239
freeze.txt Normal file
View File

@ -0,0 +1,239 @@
absolufy-imports==0.3.1
aiohappyeyeballs==2.6.1
aiohttp==3.12.15
aiosignal==1.4.0
alembic==1.16.5
annotated-types==0.7.0
anyio==4.10.0
appnope==0.1.4
asciitree==0.3.3
asttokens==2.4.1
async-timeout==4.0.3
attrs==24.2.0
Authlib==1.6.3
backports.asyncio.runner==1.2.0
beautifulsoup4==4.13.5
cachetools==5.5.2
cads-api-client==1.5.2
Cartopy==0.23.0
cdsapi==0.7.5
certifi==2024.8.30
cf-units==3.3.0
cffi==1.17.1
cfgrib==0.9.14.1
cftime==1.6.4.post1
charset-normalizer==3.4.0
click==8.2.1
cloudpickle==3.1.0
comm==0.2.2
contourpy==1.2.1
coverage==7.10.6
cryptography==45.0.7
cycler==0.12.1
d4science-copernicus-cds==1.0.0
-e git+ssh://gitea@code-repo.d4science.org/gCubeSystem/d4science-python-library.git@0684c0ee4788d97e92df2f3dbf8b8036e64aa4e6#egg=d4science_lib&subdirectory=../../d4science_lib
dask==2024.8.1
dataclasses-json==0.6.7
datapi==0.1.1
-e git+ssh://gitea@code-repo.d4science.org/D4Science/dave_agent.git@dec6273050cf9df6e42e823e01440122cf42bd94#egg=dave_agent
debugpy==1.8.8
decorator==5.1.1
docstring_parser==0.17.0
docutils==0.21.2
eccodes==2.38.3
exceptiongroup==1.2.2
executing==2.1.0
fastapi==0.116.1
fasteners==0.19
fastjsonschema==2.20.0
filelock==3.19.1
findlibs==0.0.5
fonttools==4.54.1
frozenlist==1.7.0
fsspec==2024.10.0
google-adk==1.13.0
google-api-core==2.25.1
google-api-python-client==2.181.0
google-auth==2.40.3
google-auth-httplib2==0.2.0
google-cloud-aiplatform==1.111.0
google-cloud-appengine-logging==1.6.2
google-cloud-audit-log==0.3.2
google-cloud-bigquery==3.36.0
google-cloud-bigtable==2.32.0
google-cloud-core==2.4.3
google-cloud-logging==3.12.1
google-cloud-resource-manager==1.14.2
google-cloud-secret-manager==2.24.0
google-cloud-spanner==3.57.0
google-cloud-speech==2.33.0
google-cloud-storage==2.19.0
google-cloud-trace==1.16.2
google-crc32c==1.7.1
google-genai==1.33.0
google-resumable-media==2.7.2
googleapis-common-protos==1.70.0
graphviz==0.21
grpc-google-iam-v1==0.14.2
grpc-interceptor==0.15.4
grpcio==1.74.0
grpcio-status==1.74.0
h11==0.16.0
h2==4.3.0
hf-xet==1.1.9
hpack==4.1.0
httpcore==1.0.9
httplib2==0.30.0
httpx==0.28.1
httpx-sse==0.4.1
huggingface-hub==0.34.4
hyperframe==6.1.0
idna==3.10
importlib_metadata==8.5.0
iniconfig==2.1.0
ipykernel==6.29.5
ipython==8.29.0
ipywidgets==8.1.5
iris==1.0.7
jedi==0.19.1
Jinja2==3.1.4
joblib==1.5.2
jsonpatch==1.33
jsonpointer==3.0.0
jsonschema==4.23.0
jsonschema-specifications==2024.10.1
jupyter_client==8.6.3
jupyter_core==5.7.2
jupyterlab_widgets==3.0.13
kiwisolver==1.4.7
langchain==0.3.27
langchain-community==0.3.29
langchain-core==0.3.75
langchain-text-splitters==0.3.11
langsmith==0.4.25
llvmlite==0.41.1
locket==1.0.0
Mako==1.3.10
markdown-it-py==4.0.0
markdownify==1.2.0
MarkupSafe==3.0.2
marshmallow==3.26.1
matplotlib==3.8.4
matplotlib-inline==0.1.7
mcp==1.13.1
mdurl==0.1.2
mpmath==1.3.0
multidict==6.6.4
multiurl==0.3.2
mypy_extensions==1.1.0
nbformat==5.10.4
nest-asyncio==1.6.0
netCDF4==1.7.2
networkx==3.4.2
numcodecs==0.13.1
numpy==2.2.6
oauthlib==3.3.1
opentelemetry-api==1.36.0
opentelemetry-exporter-gcp-trace==1.9.0
opentelemetry-resourcedetector-gcp==1.9.0a0
opentelemetry-sdk==1.36.0
opentelemetry-semantic-conventions==0.57b0
orjson==3.11.3
packaging==24.2
pandas==2.2.3
parso==0.8.4
partd==1.4.2
patsy==0.5.6
pdfminer.six==20250506
pexpect==4.9.0
pillow==11.0.0
platformdirs==4.3.6
pluggy==1.6.0
portalocker==3.2.0
prompt_toolkit==3.0.48
propcache==0.3.2
properscoring==0.1
proto-plus==1.26.1
protobuf==6.32.0
psutil==6.1.0
ptyprocess==0.7.0
pure_eval==0.2.3
pyasn1==0.6.1
pyasn1_modules==0.4.2
pycparser==2.22
pydantic==2.11.7
pydantic-settings==2.10.1
pydantic_core==2.33.2
Pygments==2.18.0
pyparsing==3.2.0
pyproj==3.7.0
pyshp==2.3.1
pytest==8.4.2
pytest-asyncio==1.1.0
pytest-cov==6.2.1
pytest-mock==3.15.0
python-dateutil==2.9.0.post0
python-dotenv==1.1.1
python-multipart==0.0.20
pytz==2024.2
PyYAML==6.0.2
pyzmq==26.2.0
qdrant-client==1.15.1
referencing==0.35.1
regex==2025.9.1
requests==2.32.5
requests-mock==1.12.1
requests-oauthlib==2.0.0
requests-toolbelt==1.0.0
rich==14.1.0
rpds-py==0.21.0
rsa==4.9.1
safetensors==0.6.2
scikit-learn==1.7.1
scipy==1.13.1
scitools-iris==3.10.0
sentence-transformers==5.1.0
shapely==2.0.6
shellingham==1.5.4
six==1.16.0
sniffio==1.3.1
soupsieve==2.8
SQLAlchemy==2.0.43
sqlalchemy-spanner==1.16.0
sqlparse==0.5.3
sse-starlette==3.0.2
stack-data==0.6.3
starlette==0.47.3
statsmodels==0.14.4
sympy==1.14.0
tenacity==8.5.0
threadpoolctl==3.6.0
tokenizers==0.22.0
tomli==2.2.1
toolz==1.0.0
torch==2.8.0
tornado==6.4.1
tqdm==4.67.0
traitlets==5.14.3
transformers==4.56.1
typer==0.17.3
typing-inspect==0.9.0
typing-inspection==0.4.1
typing_extensions==4.12.2
tzdata==2024.2
tzlocal==5.3.1
uritemplate==4.2.0
urllib3==2.2.3
uvicorn==0.35.0
watchdog==6.0.0
wcwidth==0.2.13
websockets==15.0.1
widgetsnbextension==4.0.13
xarray==2023.12.0
xhistogram==0.3.2
xskillscore==0.0.26
xxhash==3.5.0
yarl==1.20.1
zarr==2.17.1
zipp==3.20.2
zstandard==0.24.0

View File

@ -11,22 +11,35 @@ authors = [
]
requires-python = ">=3.9"
dependencies = [
"google-adk",
"google-adk[eval]",
"qdrant-client",
"sentence-transformers",
"langchain-core",
"langchain-community",
"pdfminer.six",
"beautifulsoup4",
"d4science_lib"
]
[tool.uv.sources]
d4science-lib = { path = "d4science_lib"}
[tool.setuptools.packages.find]
where = ["src"]
[project.optional-dependencies]
dev = [
"pytest",
"debugpy",
"beautifulsoup4",
"markdownify"
]
test = [
"pytest",
"pytest-asyncio",
"pytest-dotenv",
"requests-oauthlib",
"requests>=2.25.0",
"pydantic>=2.4.0",
"typer",
]

View File

@ -1,2 +1,4 @@
debugpy
BeautifulSoup
BeautifulSoup
pytest
pytest-asyncio

View File

@ -2,6 +2,12 @@ import json
import os
from typing import Any, Dict
# models: https://ai.google.dev/gemini-api/docs/models?hl=it
# DEFAULT_FLASH_MODEL = "gemini-2.5-flash"
DEFAULT_FLASH_MODEL = "gemini-2.5-flash-lite"
DEFAULT_PRO_MODEL = "gemini-2.5-pro"
class ConfigManager:
"""
A class to manage configurations from a JSON file.
@ -27,8 +33,8 @@ class ConfigManager:
"flash": "gemini-2.5-flash",
"pro": "gemini-2.5-pro"
})
self.FLASH_MODEL = self.llm_models.get('flash', 'gemini-2.5-flash')
self.PRO_MODEL = self.llm_models.get('pro', 'gemini-2.5-pro')
self.FLASH_MODEL = self.llm_models.get('flash', DEFAULT_FLASH_MODEL)
self.PRO_MODEL = self.llm_models.get('pro', DEFAULT_PRO_MODEL)
def _load_config(self) -> Dict[str, Any]:
"""

View File

@ -1,3 +1,4 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
@ -16,6 +17,7 @@
from google.adk.agents import LlmAgent
from google.adk.tools.agent_tool import AgentTool
from .logging_config import setup_logging
from .prompts import ROOT_DISPATCHER_PROMPT
from .sub_agents.retrieval_router.agent import retrieval_router_agent
from .sub_agents.document_processor.agent import document_processor_agent
@ -24,6 +26,9 @@ from .sub_agents.rag_manager.agent import rag_manager_agent
from config_manager import config
MODEL = config.FLASH_MODEL
# Setup logging for the entire application
setup_logging()
# The root_agent is a dispatcher that chains specialist agents to perform complex tasks.
root_agent = LlmAgent(
name="RootDispatcherAgent",

View File

@ -0,0 +1,52 @@
import logging
import sys
import os
def setup_logging():
"""
Configures logging for the application.
- INFO and higher logs are sent to the console (stdout).
- If the LOG_FILE_PATH environment variable is set, DEBUG and higher logs
are also sent to the specified file.
"""
log_file_path = os.getenv("LOG_FILE_PATH")
# Get the root logger
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG) # Set the lowest level to capture all messages
# Silence pdfminer's verbose DEBUG logs
logging.getLogger("pdfminer").setLevel(logging.INFO)
# Create a formatter
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Create and add console handler for INFO level
if not any(isinstance(h, logging.StreamHandler) for h in root_logger.handlers):
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(formatter)
root_logger.addHandler(console_handler)
# Create and add file handler for DEBUG level only if LOG_FILE_PATH is set
if log_file_path:
if not any(isinstance(h, logging.FileHandler) for h in root_logger.handlers):
try:
# Ensure the directory for the log file exists
log_dir = os.path.dirname(log_file_path)
if log_dir:
os.makedirs(log_dir, exist_ok=True)
file_handler = logging.FileHandler(log_file_path, mode='a')
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
root_logger.addHandler(file_handler)
print(f"Logging DEBUG output to {log_file_path}")
logging.info(f"Logging is configured. INFO logs to console, DEBUG logs to file {log_file_path}")
except Exception as e:
logging.error(f"Failed to configure file logging to {log_file_path}: {e}")
else:
logging.info("Logging is configured. INFO logs to console. File logging is disabled.")

View File

@ -0,0 +1,17 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Data Retriever Agent."""
from .agent import data_retriever_agent

View File

@ -0,0 +1,32 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Data Retriever Agent for remote sources."""
from google.adk.agents import LlmAgent
from . import prompt
from .tools import fetch_from_url_tool
from config_manager import config
MODEL = config.FLASH_MODEL
data_retriever_agent = LlmAgent(
name="DataRetrieverAgent",
model=MODEL,
description="Fetches document content from a remote URL.",
instruction=prompt.DATA_RETRIEVER_PROMPT,
tools=[
fetch_from_url_tool,
],
)

View File

@ -0,0 +1,26 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Prompt for the Data Retriever Agent."""
DATA_RETRIEVER_PROMPT = """
System Role: You are a data retrieval specialist. Your job is to fetch the content of a document based on an identifier provided by the user.
Workflow:
1. Analyze the user's input to identify the source of the document.
2. If the input is a URL (starts with http or https), use the `fetch_from_url` tool.
3. If the input is a file path, use the `read_local_file` tool.
4. If you are unsure, ask the user for clarification.
5. The tool will save the content as an artifact and make its name available in the session state. Your job is to successfully call the correct tool.
"""

View File

@ -0,0 +1,44 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tools for the Data Retriever Agent."""
from google.adk.tools import FunctionTool
from google.adk.tools.tool_context import ToolContext
def fetch_from_url(url: str, tool_context: ToolContext) -> dict:
"""
Fetches the text content from a given URL.
Args:
url: The URL to fetch the content from.
tool_context: The context for the tool.
Returns:
A dictionary containing the status and the retrieved text content
or an error message. The text content is saved as an artifact.
"""
# TODO: Implement the actual logic to fetch from URL.
print(f"Fetching content from {url}...")
text_content = f"This is the simulated text content from {url}."
# Save content to an artifact and pass the name in the state
artifact_name = f"retrieved_content_{tool_context.session_id}"
tool_context.save_artifact_from_string(artifact_name, text_content)
tool_context.state['temp:retrieved_artifact_name'] = artifact_name
return {"status": "success", "content_length": len(text_content)}
fetch_from_url_tool = FunctionTool(func=fetch_from_url)

View File

@ -28,5 +28,4 @@ document_processor_agent = LlmAgent(
description="Analyzes the content of a document (PDF, text, etc.) to extract metadata, keywords, and entities.",
instruction=prompt.DOCUMENT_PROCESSOR_PROMPT,
tools=[tools.extract_text_from_artifact],
output_key="processed_data",
)

View File

@ -0,0 +1,47 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Prompt for the Document Processor Agent."""
DOCUMENT_PROCESSOR_PROMPT = """
System Role: You are a document analysis specialist. Your task is to process the text content of a document, extract key information, and return it in a structured format.
Workflow:
1. **Extract Text**: Your first step is to get the document's text content. To do this, you MUST call the `extract_text_from_artifact` tool with no arguments. This tool will load a file from the session, extract its text, and provide it back to you.
2. **Analyze the Text**: Once the tool returns the text string, carefully analyze the entire document content.
3. **Extract and Generate Information**: Based on your analysis of the text, extract and generate the following information. If a particular piece of information is not available, use "Not found".
4. **Format the Output**: Present the extracted information clearly under the following distinct headings.
# **Document Analysis**
## **Metadata**
- **Title**: [Display the full title of the document]
- **Authors**: [List all authors, including affiliations if available, e.g., "John Doe (University of Science)"]
## **Content Summary**
- **Abstract**: [Display the full abstract text. If no abstract is present, generate a concise one-paragraph summary of the document.]
- **Keywords**: [List up to 10 main topics or keywords derived from the paper.]
## **Entity Extraction**
- **Geographical Locations**: [List all specific geographical locations (cities, countries, regions) mentioned in the document.]
- **Tags**: [Provide a list of 5-7 relevant tags or labels that categorize the document's content (e.g., "Machine Learning", "Climate Change", "Economic Policy").]
# **Output Format**
**CRITICAL**: Your final output MUST begin directly with the `# **Document Analysis**` heading.
DO NOT include any introductory sentences, conversational text, or summaries of your actions.
Your response MUST ONLY contain the structured analysis.
"""

View File

@ -0,0 +1,22 @@
from google.adk.agents import LlmAgent
from ..workspace.agent import workspace_agent
from ..local_file_agent.agent import local_file_agent
from config_manager import config
MODEL = config.FLASH_MODEL
data_source_router_agent = LlmAgent(
name="DataSourceRouterAgent",
description="First step in an ingestion workflow. Determines the source of a file (e.g., D4Science Workspace, local filesystem) and delegates the file retrieval to the appropriate sub-agent. The sub-agent will save the file as a session artifact.",
instruction="""
You are a router responsible for determining the source of a file.
Based on the user's request and the descriptions of the available sub-agents, delegate the task to the appropriate agent to retrieve the file.
Your sole responsibility is to route the request; do not attempt to retrieve the file yourself.
""",
model=MODEL,
sub_agents=[
workspace_agent,
local_file_agent,
],
)

View File

@ -34,6 +34,9 @@ async def extract_text_from_artifact(tool_context: ToolContext) -> str:
# Use an in-memory byte stream for extraction
pdf_file = io.BytesIO(file_bytes)
text = extract_text(pdf_file)
# dump the byte stream to file for debugging
with open("/app/mnt/shared/unshared_content/debug_extracted.md", "wb") as f:
f.write(text.encode("utf-8"))
return text
except Exception as e:
return f"Error extracting text from PDF: {e}"

View File

@ -0,0 +1,24 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Prompt for the Local File Agent."""
LOCAL_FILE_AGENT_PROMPT = """
You are a specialist agent for reading local files from pre-configured directories.
Workflow:
1. The user will ask to read a file. The path must be in the format 'alias/filename.txt'.
2. Use the `read_file_by_alias` tool to read the file.
3. If the user does not know which aliases are available, they might ask you to list them. In that case, use the `list_aliases` tool.
"""

View File

@ -0,0 +1 @@
from .agent import rag_manager_agent

View File

@ -0,0 +1,26 @@
from google.adk.agents import LlmAgent
from .tools import (
search_memory,
store_in_memory,
forget_source,
memory_status,
list_memory_entries,
)
from .prompts import get_rag_agent_instructions
from config_manager import config
MODEL = config.FLASH_MODEL
rag_manager_agent = LlmAgent(
name="RagManagerAgent",
description="Handles all interactions with the long-term memory (RAG system), such as searching for and storing information.",
instruction=get_rag_agent_instructions(),
model=MODEL,
tools=[
search_memory,
store_in_memory,
forget_source,
memory_status,
list_memory_entries,
],
)

View File

@ -0,0 +1,9 @@
def get_rag_agent_instructions() -> str:
"""Returns the instructional prompt for the RagAgent."""
return """
You are a specialist agent responsible for managing and querying the long-term memory system.
Your capabilities are focused on searching for information, storing new facts, and managing memory sources.
Use your tools to answer user questions based on the knowledge stored in the vector database.
If you cannot find an answer to a question, you must clearly state that the information is not available in your knowledge base. Do not attempt to answer questions that are outside the scope of your stored knowledge.
"""

View File

@ -0,0 +1,66 @@
import logging
from typing import Dict, Any, Optional
# Import the shared RAG instance
from ...tools.common import rag
logging.basicConfig(level=logging.INFO)
def search_memory(query: str, source: Optional[str] = None, max_results: int = 5, min_score: float = 0.5) -> Dict[str, Any]:
"""Search the long-term memory (vector database) for relevant information."""
try:
logging.info(f"Searching memory for query: '{query}' with source: '{source or ''}'")
results = rag.search(query, source=source, limit=max_results, score_threshold=min_score)
if not results:
return {"status": "success", "message": "No relevant information found in memory", "query": query, "results": []}
return {"status": "success", "query": query, "results": results, "total_found": len(results)}
except Exception as e:
return {"status": "error", "message": f"Error searching memory: {str(e)}", "results": []}
def store_in_memory(content: str, source: str = "user_input", category: str = "general") -> Dict[str, Any]:
"""Store a simple string of information in long-term memory."""
try:
from langchain_core.documents import Document
doc = Document(page_content=content, metadata={"source": source, "category": category})
success = rag.add_documents([doc])
if success:
return {"status": "success", "message": f"Content stored in memory under source '{source}'", "source": source, "category": category}
else:
return {"status": "error", "message": "Failed to store content in memory"}
except Exception as e:
return {"status": "error", "message": f"Error storing in memory: {str(e)}"}
def forget_source(source: str) -> Dict[str, Any]:
"""Remove all information from a specific source in long-term memory."""
try:
success = rag.delete_by_source(source)
if success:
return {"status": "success", "message": f"All content from source '{source}' has been removed from memory"}
else:
return {"status": "error", "message": f"Failed to remove content from source '{source}'"}
except Exception as e:
return {"status": "error", "message": f"Error removing from memory: {str(e)}"}
def memory_status() -> Dict[str, Any]:
"""Get status information about the long-term memory system."""
try:
info = rag.get_collection_info()
return {"status": "success", "memory_info": info}
except Exception as e:
return {"status": "error", "message": f"Error getting memory status: {str(e)}"}
def list_memory_entries(metadata_filter: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""List the titles of files in memory, optionally filtering by metadata."""
try:
summary = rag.summarize_by_source(metadata_filter=metadata_filter)
if not summary:
return {"status": "success", "message": "No entries found matching the specified criteria.", "files": []}
files = [{"title": source, "chunks": count} for source, count in summary.items()]
return {"status": "success", "files": files, "total_files": len(files)}
except Exception as e:
return {"status": "error", "message": f"Error listing memory entries: {str(e)}"}

View File

@ -0,0 +1,17 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Retrieval Router Agent."""
from .agent import retrieval_router_agent

View File

@ -0,0 +1,39 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Retrieval Router Agent."""
from google.adk.agents import LlmAgent
from google.adk.tools.agent_tool import AgentTool
from . import prompt
from ..data_retriever.agent import data_retriever_agent
from ..document_processor.agent import document_processor_agent
from ..local_file_agent.agent import local_file_agent
from ..workspace_agent.agent import root_agent as workspace_agent
from config_manager import config
MODEL = config.FLASH_MODEL
retrieval_router_agent = LlmAgent(
name="RetrievalRouterAgent",
model=MODEL,
description="Handles ANY request related to fetching or listing data from various sources (local files, workspace, URLs). Use this agent for all data source interactions.",
instruction=prompt.RETRIEVAL_ROUTER_PROMPT,
tools=[
AgentTool(agent=data_retriever_agent),
AgentTool(agent=local_file_agent),
AgentTool(agent=workspace_agent),
AgentTool(agent=document_processor_agent),
],
)

View File

@ -0,0 +1,35 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Prompt for the Retrieval Router Agent."""
RETRIEVAL_ROUTER_PROMPT = """
System Role: You are a data gateway and workflow orchestrator. Your purpose is to handle all requests related to fetching, listing, and analyzing data from various sources.
# Core Workflows
1. **Fetch/List Only Workflow**:
- If the user asks to "list", "show", or "find" files, use the appropriate retrieval agent (`WorkspaceAgent` or `LocalFileAgent`) and stop. The list of files is the final answer.
2. **Fetch and Analyze Workflow**:
- If the user's request includes keywords like "analyze", "process", "summarize", or "read the content of", you MUST follow this two-step process:
1. **Step 1: Retrieve the File**: Call the correct retrieval agent (`WorkspaceAgent`, `LocalFileAgent`, or `DataRetrieverAgent`) to fetch the file. This action saves the file to a session artifact.
2. **Step 2: Process the File**: Immediately after Step 1 is successful, you MUST call the `DocumentProcessorAgent`. This agent will use the artifact from Step 1 and perform the analysis. The output of the `DocumentProcessorAgent` is the final answer.
# Tool Selection Logic
- **To retrieve a file from a URL**: Use the `DataRetrieverAgent`.
- **To retrieve a file from a local path**: Use the `LocalFileAgent`.
- **To retrieve a file from the D4Science Workspace**: Use the `WorkspaceAgent`.
- **To analyze a file that has just been retrieved**: Use the `DocumentProcessorAgent`.
"""

View File

@ -1,117 +0,0 @@
import logging
import traceback
import os
from google.adk.tools import ToolContext
from google.genai.types import Part
# Import shared components
from ...tools.common import get_storagehub_client
logging.basicConfig(level=logging.INFO)
def list_files(path: str, tool_context: ToolContext) -> str:
"""
Lists all files at the specified D4Science Workspace path.
Args:
path (str): The directory path to list files from.
Returns:
str: JSON-formatted list of files and folders with their metadata.
"""
try:
session_id = tool_context._invocation_context.session.id
storagehub_client = get_storagehub_client(session_id)
root_folder = storagehub_client.workspace_manager.getWorkspace(as_object=False)
root_id = root_folder['id']
if not path or path == "/" or path == "":
target_id = root_id
else:
clean_path = path.strip("/")
if not clean_path:
target_id = root_id
else:
try:
item = storagehub_client.items_manager.getByRelativePath(
parent_id=root_id,
relative_path=clean_path,
as_object=False
)
target_id = item['id']
except Exception:
target_id = path
files = storagehub_client.items_manager.listById(
item_id=target_id,
exclude=["hl:accounting"],
as_object=False
)
return str(files)
except Exception as e:
logging.error(f"An error occurred in the list_files tool: {e}")
logging.error(traceback.format_exc())
return f"Error executing list_files: {e}"
def download_file(path: str, tool_context: ToolContext) -> str:
"""
Downloads a file from the D4Science Workspace and saves it as a session artifact.
Args:
path (str): The workspace path of the file to download.
Returns:
str: A confirmation message.
"""
temp_path = None
try:
session_id = tool_context._invocation_context.session.id
storagehub_client = get_storagehub_client(session_id)
root_folder = storagehub_client.workspace_manager.getWorkspace(as_object=False)
root_id = root_folder['id']
target_id = path # Default to path if resolution fails
if not path or path == "/" or path == "":
target_id = root_id
else:
clean_path = path.strip("/")
if clean_path:
try:
item = storagehub_client.items_manager.getByRelativePath(
parent_id=root_id, relative_path=clean_path, as_object=False
)
target_id = item['id']
except Exception:
logging.warning(f"Could not resolve path '{clean_path}', assuming it's an ID.")
item = storagehub_client.items_manager.get(item_id=target_id, as_object=False)
item_name = item["name"]
artifact_name = f"downloaded_{target_id}_{item_name}"
logging.info(f"Downloading file '{item_name}' (ID: {target_id}) to save as artifact '{artifact_name}'.")
# Download to a temporary path to read its bytes
temp_path = f'/tmp/{artifact_name}'
storagehub_client.items_manager.download(item_id=target_id, destination=temp_path)
with open(temp_path, "rb") as f:
file_bytes = f.read()
# Save content as an artifact
artifact_part = Part(inline_data=types.Blob(mime_type="application/octet-stream", data=file_bytes))
tool_context.save_artifact(artifact_name, artifact_part)
# Save artifact info to state for the next step
tool_context.state['temp:artifact_name'] = artifact_name
tool_context.state['temp:item_name'] = item_name
return f"Successfully downloaded file '{item_name}' and stored it as a session artifact."
except Exception as e:
logging.error(f"An error occurred in the download_file tool: {e}")
logging.error(traceback.format_exc())
return f"Error executing download_file: {e}"
finally:
# Clean up the intermediate temporary file immediately
if temp_path and os.path.exists(temp_path):
os.remove(temp_path)

View File

@ -0,0 +1,20 @@
from google.adk.agents import LlmAgent
from .tools import list_path, download_file, get_path_from_id, get_id_from_path
from .prompts import get_workspace_agent_instructions
from config_manager import config
MODEL = config.FLASH_MODEL
root_agent = LlmAgent(
name="WorkspaceAgent",
description="Handles interactions with the D4Science Workspace, such as listing the content of a Workspace path and retrieving Workspace files.",
instruction=get_workspace_agent_instructions(),
model=MODEL,
tools=[
list_path,
download_file,
get_path_from_id,
get_id_from_path,
],
)

View File

@ -0,0 +1,29 @@
def get_workspace_agent_instructions() -> str:
"""Returns the instructional prompt for the WorkspaceAgent."""
return """
# Role: D4Science Workspace Specialist
You are an expert agent responsible for interacting with the D4Science Workspace service. Your sole purpose is to help users list and download files from their personal workspace by constructing the correct file path or ID.
# Context: D4Science Workspace Pathing
The D4Science Workspace uses a virtual, absolute pathing system.
- The root, `/`, represents the user's home directory (e.g., `/Home/user.name/Workspace`).
- All paths you construct MUST be absolute, starting from `/`.
- Users can refer to items by their path, their unique item ID, or by describing their location.
# Core Directive: Path Construction
Your primary task is to analyze the user's natural language request, use your resolution tools if necessary, and construct the precise `path` or `item_id` argument for the final tool call.
# Reasoning Workflow
1. **Analyze the Request**: Understand if the user wants to `list`, `download`, or `resolve` an item.
2. **Check for Ambiguity**: If the user's request is ambiguous (e.g., "what is the ID of test.pdf?"), use the `get_id_from_path` or `get_path_from_id` tools to find the missing information.
3. **Construct the Final Call**: Once you have the correct identifier (either a full path or an ID), call the primary tool (`list_path` or `download_file`) to complete the user's request.
# Examples
- **Direct Command**: User: "analyze `/test2.pdf`" -> Call `download_file` with `path`: `/test2.pdf`.
- **Descriptive Command**: User: "file `test.pdf` in folder `works`" -> Call `download_file` with `path`: `/works/test.pdf`.
- **Resolution Query**: User: "what is the path of item `ab9e2bd4...`?" -> Call `get_path_from_id` with `item_id`: `ab9e2bd4...`.
# CRITICAL RULE: Final Output
When calling `list_path` or `download_file`, the `path` parameter MUST ALWAYS be a valid absolute path starting with `/` or a single UUID. Do not add extra text or explanations.
"""

View File

@ -0,0 +1,205 @@
import logging
import traceback
import os
from typing import Optional
from google.adk.tools import ToolContext
from google.genai.types import Part, Blob
# Import shared components and models
from ...tools.common import get_storagehub_client
from d4science_lib.d4science.storagehub.models import Item, FolderItem
logging.basicConfig(level=logging.INFO)
DEFAULT_EXCLUDE_FIELDS = ["hl:accounting"]
def list_path(path: str, tool_context: ToolContext) -> str:
"""
Lists all files at the specified D4Science Workspace path.
Args:
path (str): The directory path to list files from.
Returns:
str: JSON-formatted list of files and folders with their metadata.
"""
try:
session_id = tool_context._invocation_context.session.id
storagehub_client = get_storagehub_client(session_id)
root_folder = storagehub_client.workspace_manager.getWorkspace(as_object=True)
root_id = root_folder.id
target_id = root_id
if path and path.strip() and path.strip() != "/":
clean_path = path.strip("/")
try:
item: Item = storagehub_client.items_manager.getByRelativePath(
parent_id=root_id,
relative_path=clean_path,
as_object=True, # Get an Item object,
exclude=DEFAULT_EXCLUDE_FIELDS
)
target_id = item.id
except Exception:
logging.warning(f"Could not resolve path '{clean_path}', assuming it's an ID.")
target_id = path
# listById still returns a list of dicts, so we keep as_object=False here
files = storagehub_client.items_manager.listById(
item_id=target_id,
exclude=["hl:accounting"],
as_object=False
)
return str(files)
except Exception as e:
logging.error(f"An error occurred in the list_files tool: {e}")
logging.error(traceback.format_exc())
return f"Error executing list_files: {e}"
async def download_file(path: str, tool_context: ToolContext) -> str:
"""
Downloads a file from the D4Science Workspace and saves it as a session artifact.
It first tries to resolve the path as a relative path, and if that fails,
it treats the path as a direct item ID.
Args:
path (str): The workspace path or item ID of the file to download.
Returns:
str: A confirmation message or an error.
"""
if not path or path.strip() == "/":
return "Error: Invalid path provided. Please specify a valid file path or ID."
temp_path = None
try:
session_id = tool_context._invocation_context.session.id
storagehub_client = get_storagehub_client(session_id)
root_folder = storagehub_client.workspace_manager.getWorkspace(as_object=True)
root_id = root_folder.id
item: Optional[Item] = None
clean_path = path.strip("/")
# Attempt 1: Resolve as a relative path
try:
item = storagehub_client.items_manager.getByRelativePath(
parent_id=root_id, relative_path=clean_path, as_object=True
)
logging.info(f"Successfully resolved path '{clean_path}' to item ID '{item.id}'.")
except Exception:
logging.warning(f"Could not resolve '{clean_path}' as a relative path.")
return f"Error: Could not find file with path '{path}'."
if not item:
return f"Error: Could not find file with path '{path}'."
# Check if the item is a file using its type attribute
# Common types are 'FILE', 'FOLDER', 'IMAGE', 'PDF' etc.
# if "FOLDER" in item.
# return f"Error: The specified path '{path}' points to a Folder, not a file."
for t in [FolderItem]:
if isinstance(item, t):
return f"Error: The specified path '{path}' points to a {t} item, not a file."
if not item.content:
return f"Error: The specified item '{item.name}' does not have downloadable content."
target_id = item.id
mime_type = item.content.mime_type
item_name = item.name
artifact_name = f"downloaded_{target_id}_{item_name}"
logging.info(f"Downloading file '{item_name}' (ID: {target_id}) to save as artifact '{artifact_name}'.")
# Download to a temporary path to read its bytes
temp_dir = "/tmp/dave_downloads"
os.makedirs(temp_dir, exist_ok=True)
temp_path = os.path.join(temp_dir, artifact_name)
storagehub_client.items_manager.download(item_id=target_id, destination=temp_path)
with open(temp_path, "rb") as f:
file_bytes = f.read()
# Save content as an artifact
artifact_part = Part(inline_data=Blob(mime_type=mime_type, data=file_bytes))
await tool_context.save_artifact(artifact_name, artifact_part)
# Save artifact info to state for the next step
tool_context.state['temp:retrieved_artifact_name'] = artifact_name
return f"Successfully downloaded file '{item_name}' and stored it as a session artifact."
except Exception as e:
logging.error(f"An error occurred in the download_file tool: {e}")
logging.error(traceback.format_exc())
return f"Error executing download_file: {e}"
finally:
# Clean up the intermediate temporary file immediately
if temp_path and os.path.exists(temp_path):
os.remove(temp_path)
def get_workspace_root(tool_context: ToolContext) -> str:
"""
Retrieves the root folder path of the D4Science Workspace.
Returns:
str: The absolute path of the workspace root.
"""
try:
session_id = tool_context._invocation_context.session.id
storagehub_client = get_storagehub_client(session_id)
root_folder = storagehub_client.workspace_manager.getWorkspace(as_object=True)
return root_folder.path
except Exception as e:
logging.error(f"Error in get_workspace_root: {e}")
return f"Error: Could not retrieve workspace root. Reason: {e}"
def get_path_from_id(item_id: str, tool_context: ToolContext) -> str:
"""
Retrieves the full, absolute workspace path for a given item ID.
Args:
item_id (str): The unique identifier of the workspace item.
Returns:
str: The absolute path of the item or an error message.
"""
try:
session_id = tool_context._invocation_context.session.id
storagehub_client = get_storagehub_client(session_id)
item: Item = storagehub_client.items_manager.get(
item_id=item_id,
as_object=True,
exclude=DEFAULT_EXCLUDE_FIELDS
)
return item.path
except Exception as e:
logging.error(f"Error in get_path_from_id: {e}")
return f"Error: Could not retrieve path for ID '{item_id}'. Reason: {e}"
def get_id_from_path(path: str, tool_context: ToolContext) -> str:
"""
Retrieves the unique item ID for a given absolute workspace path.
Args:
path (str): The absolute path of the item in the workspace.
Returns:
str: The unique item ID or an error message.
"""
try:
session_id = tool_context._invocation_context.session.id
storagehub_client = get_storagehub_client(session_id)
root_folder = storagehub_client.workspace_manager.getWorkspace(as_object=True)
clean_path = path.strip("/")
item: Item = storagehub_client.items_manager.getByRelativePath(
parent_id=root_folder.id, relative_path=clean_path, as_object=True,
exclude=DEFAULT_EXCLUDE_FIELDS
)
return item.id
except Exception as e:
logging.error(f"Error in get_id_from_path: {e}")
return f"Error: Could not retrieve ID for path '{path}'. Reason: {e}"