import chromadb from chromadb.config import Settings import requests import html2text import pandas as pd import logging import re import urllib import os from langchain_community.document_loaders import PyPDFDirectoryLoader from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain.schema.document import Document #from langchain.vectorstores.chroma import Chroma logging.basicConfig(filename='chroma.log', level=logging.DEBUG) #constants collection_name = "janet_knowledge" POSTS_URL = "https://api.d4science.org/rest/2/posts/get-posts-vre/" GCAT_URL = "https://api.d4science.org/catalogue/items/" PDF_PATH = "./PDF" TXT_PATH = "./TXT" headers = {"gcube-token": "2c1e8f88-461c-42c0-8cc1-b7660771c9a3-843339462", "Accept": "application/json"} docs = {} """ Starting the DB: docker pull chromadb/chroma docker run -p 8000:8000 -e CHROMA_SERVER_AUTHN_CREDENTIALS_PROVIDER="chromadb.authn.token.TokenConfigServerAuthCredentialsProvider" -e CHROMA_SERVER_AUTHN_PROVIDER="chromadb.authn.token.TokenAuthenticationServerProvider" -e CHROMA_SERVER_AUTHN_CREDENTIALS="myToken" -e CHROMA_SERVER_AUTHN_TOKEN_TRANSPORT_HEADER="X_CHROMA_TOKEN" -v /Users/ahmed/Desktop/chroma/:/chroma/chroma chromadb/chroma """ client = chromadb.HttpClient( host="localhost", port=8000, headers={"Authorization": "Bearer myToken"} #settings=Settings(chroma_client_auth_provider="token", chroma_client_auth_credentials="myToken"), ) collection = client.get_or_create_collection(name=collection_name) def getFilename_fromCd(cd): if not cd: return None fname = re.findall('filename=(.+)', cd) if len(fname) == 0: return None return fname[0] def load_documents(): document_loader = PyPDFDirectoryLoader(PDF_PATH) logging.debug("loading docs") return document_loader.load() def split_documents(documents): text_splitter = RecursiveCharacterTextSplitter( chunk_size=800, chunk_overlap=80, length_function=len, is_separator_regex=False, ) logging.debug(f"splitting {len(documents)} documents") return text_splitter.split_documents(documents) def calculate_chunk_ids(chunks): # This will create IDs like "data/monopoly.pdf:6:2" # Page Source : Page Number : Chunk Index last_page_id = None current_chunk_index = 0 for chunk in chunks: source = chunk.metadata.get("source") page = chunk.metadata.get("page") current_page_id = f"{source}:{page}" logging.debug(f"chunking {source} page {page}") # If the page ID is the same as the last one, increment the index. if current_page_id == last_page_id: current_chunk_index += 1 else: current_chunk_index = 0 # Calculate the chunk ID. chunk_id = f"{current_page_id}:{current_chunk_index}" last_page_id = current_page_id # Add it to the page meta-data. chunk.metadata["id"] = chunk_id #logging.debug(docs[source]) #chunk.metadata["extras"] = docs["./"+source]['metadata'] #needs primitive types logging.debug(f"we have {len(chunks)} chunks") return chunks def add_to_chroma(chunks): # Calculate Page IDs. chunks_with_ids = calculate_chunk_ids(chunks) # Add or Update the documents. existing_items = collection.get(include=[]) # IDs are always included by default existing_ids = set(existing_items["ids"]) logging.debug(f"Number of existing documents in DB: {len(existing_ids)}") # Only add documents that don't exist in the DB. new_chunks = [] for chunk in chunks_with_ids: if chunk.metadata["id"] not in existing_ids: new_chunks.append(chunk) if len(new_chunks): logging.debug(f"Adding new documents: {len(new_chunks)}") new_chunk_ids = [chunk.metadata["id"] for chunk in new_chunks] chunk_metas = [chunk.metadata for chunk in new_chunks] chunk_content = [chunk.page_content for chunk in new_chunks] logging.debug(f"chunks {len(new_chunks)}") logging.debug(f"chunk_id: {len(new_chunk_ids)} ") collection.add(documents=chunk_content, metadatas=chunk_metas, ids=new_chunk_ids) else: logging.debug("No new documents to add") def get_posts(url, headers): h = html2text.HTML2Text() h.ignore_links = True try: posts = requests.get(url, headers=headers) posts = posts.json()['result'] for post in posts: author = post['full_name'].lower() key = post['key'] content = h.handle(post['description']).replace('\n', ' ').lower() date = post['time'] tags = [] for word in content.split(): if word[0] == '#': tags.append(word[1:]) logging.debug(f"""Adding post {id} whose author is {author} and content is {content} to the index.""") filename = "./TXT/post-"+key+".txt" open(filename, 'w').write(content) #add_doc(filename, {'source': f"""post-{key}""", 'author': author, 'date': date, 'tags': tags}, key) except Exception as e: logging.error(e) def get_catalogue(url, headers): try: response = requests.get(url, headers=headers) items = response.json() items_data = [] for item in items: api_url = url + item + '/' logging.debug(f"Getting resources of {api_url}") response = requests.get(api_url, headers=headers) try: items_data.append(response.json()) except Exception as err: logging.error(err) logging.error(f"response is {response}") for item in items_data: logging.debug(item) for el in item['extras']: if el['key'] == 'system:type': rsrc = el['value'] if el['key'] == 'Item URL': item_url = el['value'] tags = [] for tag in item['tags']: tags.append(tag['name'].lower()) title = item['title'].lower() author = item['author'].lower() notes = item['notes'].lower() date = item['metadata_created'] resources = [] for resource in item['resources']: logging.debug(resource) resources.append( {'name': resource['name'].lower(), 'url': resource['url'], 'description': resource['description'].lower()}) if rsrc == 'Paper': r = requests.get(resource['url'], headers=headers) filename = "" if 'application/pdf' in r.headers.get('content-type'): filename = './PDF/'+item['title']+"-"+getFilename_fromCd(r.headers.get('content-disposition'))+".pdf" urllib.request.urlretrieve(resource['url'], filename) if 'text/plain' in r.headers.get('content-type'): filename = './TXT/'+item['title']+"-"+getFilename_fromCd(r.headers.get('content-disposition'))+".txt" urllib.request.urlretrieve(resource['url'], filename) add_doc(filename, {'source': f"paper-{title}", 'author': author, 'date': date, 'tags': tags}, filename) filename = "./TXT/paper-"+title+"-desc.txt" open(filename, 'w').write(resource['description']) add_doc(filename, {'source': f"paper-{title}", 'author': author, 'date': date, 'tags': tags}, filename) if rsrc == 'Dataset': filename = "./TXT/dataset-"+title+".txt" open(filename, 'w').write(resource['description']) add_doc(filename, {'source': f"dataset-{title}", 'author': author, 'date': date, 'tags': tags}, filename) except Exception as e: logging.error(e) def clear_dir(): try: for dir in ['./PDF', './TXT']: files = os.listdir(dir) for file in files: file_path = os.path.join(dir, file) if os.path.isfile(file_path): os.remove(file_path) logging.debug("All files deleted successfully.") except Exception as e: logging.error("Error occurred while deleting files.") logging.error(e) def create_dir(dir): if not os.path.exists(dir): os.makedirs(dir) def add_doc(filename, metadata, id): docs[id] = {'filename': filename, 'metadata': metadata} logging.debug(f"checking {docs[id]}") def main(): #clear_dir() #create_dir("./PDF/") #create_dir("./TXT") #get_posts(POSTS_URL, headers=headers) #get_catalogue(GCAT_URL, headers=headers) #documents = load_documents() #chunks = split_documents(documents) #add_to_chroma(chunks) results = collection.query( query_texts=["who is leondardo candela?"], n_results=3 ) print(results) if __name__ == "__main__": main() #clear_dir() #collection.add( # documents=["This is a document about cat", "This is a document about car"], # metadatas=[{"category": "animal"}, {"category": "vehicle"}], # ids=["id1", "id2"] #) #results = collection.query( # query_texts=["vehicle"], # n_results=1 #) #print(results) #def main(): #get stuff from the VRE #add if not there #while True #receive nots from vre and add new additions #if __name__ == "__main__": # main()