JanetKnowledgeService/chroma.py

277 lines
9.4 KiB
Python

import chromadb
from chromadb.config import Settings
import requests
import html2text
import pandas as pd
import logging
import re
import urllib
import os
from langchain_community.document_loaders import PyPDFDirectoryLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain.schema.document import Document
#from langchain.vectorstores.chroma import Chroma
logging.basicConfig(filename='chroma.log', level=logging.DEBUG)
#constants
collection_name = "janet_knowledge"
POSTS_URL = "https://api.d4science.org/rest/2/posts/get-posts-vre/"
GCAT_URL = "https://api.d4science.org/catalogue/items/"
PDF_PATH = "./PDF"
TXT_PATH = "./TXT"
headers = {"gcube-token": "", "Accept": "application/json"}
docs = {}
"""
Starting the DB:
docker pull chromadb/chroma
docker run -p 8000:8000 -e CHROMA_SERVER_AUTHN_CREDENTIALS_PROVIDER="chromadb.authn.token.TokenConfigServerAuthCredentialsProvider" -e CHROMA_SERVER_AUTHN_PROVIDER="chromadb.authn.token.TokenAuthenticationServerProvider" -e CHROMA_SERVER_AUTHN_CREDENTIALS="myToken" -e CHROMA_SERVER_AUTHN_TOKEN_TRANSPORT_HEADER="X_CHROMA_TOKEN" -v /Users/ahmed/Desktop/chroma/:/chroma/chroma chromadb/chroma
"""
client = chromadb.HttpClient(
host="localhost",
port=8000,
headers={"Authorization": "Bearer myToken"}
#settings=Settings(chroma_client_auth_provider="token", chroma_client_auth_credentials="myToken"),
)
collection = client.get_or_create_collection(name=collection_name)
def getFilename_fromCd(cd):
if not cd:
return None
fname = re.findall('filename=(.+)', cd)
if len(fname) == 0:
return None
return fname[0]
def load_documents():
document_loader = PyPDFDirectoryLoader(PDF_PATH)
logging.debug("loading docs")
return document_loader.load()
def split_documents(documents):
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=800,
chunk_overlap=80,
length_function=len,
is_separator_regex=False,
)
logging.debug(f"splitting {len(documents)} documents")
return text_splitter.split_documents(documents)
def calculate_chunk_ids(chunks):
# This will create IDs like "data/monopoly.pdf:6:2"
# Page Source : Page Number : Chunk Index
last_page_id = None
current_chunk_index = 0
for chunk in chunks:
source = chunk.metadata.get("source")
page = chunk.metadata.get("page")
current_page_id = f"{source}:{page}"
logging.debug(f"chunking {source} page {page}")
# If the page ID is the same as the last one, increment the index.
if current_page_id == last_page_id:
current_chunk_index += 1
else:
current_chunk_index = 0
# Calculate the chunk ID.
chunk_id = f"{current_page_id}:{current_chunk_index}"
last_page_id = current_page_id
# Add it to the page meta-data.
chunk.metadata["id"] = chunk_id
#logging.debug(docs[source])
#chunk.metadata["extras"] = docs["./"+source]['metadata'] #needs primitive types
logging.debug(f"we have {len(chunks)} chunks")
return chunks
def add_to_chroma(chunks):
# Calculate Page IDs.
chunks_with_ids = calculate_chunk_ids(chunks)
# Add or Update the documents.
existing_items = collection.get(include=[]) # IDs are always included by default
existing_ids = set(existing_items["ids"])
logging.debug(f"Number of existing documents in DB: {len(existing_ids)}")
# Only add documents that don't exist in the DB.
new_chunks = []
for chunk in chunks_with_ids:
if chunk.metadata["id"] not in existing_ids:
new_chunks.append(chunk)
if len(new_chunks):
logging.debug(f"Adding new documents: {len(new_chunks)}")
new_chunk_ids = [chunk.metadata["id"] for chunk in new_chunks]
chunk_metas = [chunk.metadata for chunk in new_chunks]
chunk_content = [chunk.page_content for chunk in new_chunks]
logging.debug(f"chunks {len(new_chunks)}")
logging.debug(f"chunk_id: {len(new_chunk_ids)} ")
collection.add(documents=chunk_content,
metadatas=chunk_metas,
ids=new_chunk_ids)
else:
logging.debug("No new documents to add")
def get_posts(url, headers):
h = html2text.HTML2Text()
h.ignore_links = True
try:
posts = requests.get(url, headers=headers)
posts = posts.json()['result']
for post in posts:
author = post['full_name'].lower()
key = post['key']
content = h.handle(post['description']).replace('\n', ' ').lower()
date = post['time']
tags = []
for word in content.split():
if word[0] == '#':
tags.append(word[1:])
logging.debug(f"""Adding post {id} whose author is {author} and content is {content} to the index.""")
filename = "./TXT/post-"+key+".txt"
open(filename, 'w').write(content)
#add_doc(filename, {'source': f"""post-{key}""", 'author': author, 'date': date, 'tags': tags}, key)
except Exception as e:
logging.error(e)
def get_catalogue(url, headers):
try:
response = requests.get(url, headers=headers)
items = response.json()
items_data = []
for item in items:
api_url = url + item + '/'
logging.debug(f"Getting resources of {api_url}")
response = requests.get(api_url, headers=headers)
try:
items_data.append(response.json())
except Exception as err:
logging.error(err)
logging.error(f"response is {response}")
for item in items_data:
logging.debug(item)
for el in item['extras']:
if el['key'] == 'system:type':
rsrc = el['value']
if el['key'] == 'Item URL':
item_url = el['value']
tags = []
for tag in item['tags']:
tags.append(tag['name'].lower())
title = item['title'].lower()
author = item['author'].lower()
notes = item['notes'].lower()
date = item['metadata_created']
resources = []
for resource in item['resources']:
logging.debug(resource)
resources.append(
{'name': resource['name'].lower(), 'url': resource['url'], 'description': resource['description'].lower()})
if rsrc == 'Paper':
r = requests.get(resource['url'], headers=headers)
filename = ""
if 'application/pdf' in r.headers.get('content-type'):
filename = './PDF/'+item['title']+"-"+getFilename_fromCd(r.headers.get('content-disposition'))+".pdf"
urllib.request.urlretrieve(resource['url'], filename)
if 'text/plain' in r.headers.get('content-type'):
filename = './TXT/'+item['title']+"-"+getFilename_fromCd(r.headers.get('content-disposition'))+".txt"
urllib.request.urlretrieve(resource['url'], filename)
add_doc(filename, {'source': f"paper-{title}", 'author': author, 'date': date, 'tags': tags}, filename)
filename = "./TXT/paper-"+title+"-desc.txt"
open(filename, 'w').write(resource['description'])
add_doc(filename, {'source': f"paper-{title}", 'author': author, 'date': date, 'tags': tags}, filename)
if rsrc == 'Dataset':
filename = "./TXT/dataset-"+title+".txt"
open(filename, 'w').write(resource['description'])
add_doc(filename, {'source': f"dataset-{title}", 'author': author, 'date': date, 'tags': tags}, filename)
except Exception as e:
logging.error(e)
def clear_dir():
try:
for dir in ['./PDF', './TXT']:
files = os.listdir(dir)
for file in files:
file_path = os.path.join(dir, file)
if os.path.isfile(file_path):
os.remove(file_path)
logging.debug("All files deleted successfully.")
except Exception as e:
logging.error("Error occurred while deleting files.")
logging.error(e)
def create_dir(dir):
if not os.path.exists(dir):
os.makedirs(dir)
def add_doc(filename, metadata, id):
docs[id] = {'filename': filename, 'metadata': metadata}
logging.debug(f"checking {docs[id]}")
def main():
#clear_dir()
#create_dir("./PDF/")
#create_dir("./TXT")
#get_posts(POSTS_URL, headers=headers)
#get_catalogue(GCAT_URL, headers=headers)
#documents = load_documents()
#chunks = split_documents(documents)
#add_to_chroma(chunks)
results = collection.query(
query_texts=["who is leondardo candela?"],
n_results=3
)
print(results)
if __name__ == "__main__":
main()
#clear_dir()
#collection.add(
# documents=["This is a document about cat", "This is a document about car"],
# metadatas=[{"category": "animal"}, {"category": "vehicle"}],
# ids=["id1", "id2"]
#)
#results = collection.query(
# query_texts=["vehicle"],
# n_results=1
#)
#print(results)
#def main():
#get stuff from the VRE
#add if not there
#while True
#receive nots from vre and add new additions
#if __name__ == "__main__":
# main()