ChromaDB Client
This commit is contained in:
commit
ed0107e99c
|
@ -0,0 +1,277 @@
|
|||
import chromadb
|
||||
from chromadb.config import Settings
|
||||
import requests
|
||||
import html2text
|
||||
import pandas as pd
|
||||
import logging
|
||||
import re
|
||||
import urllib
|
||||
import os
|
||||
|
||||
from langchain_community.document_loaders import PyPDFDirectoryLoader
|
||||
from langchain_text_splitters import RecursiveCharacterTextSplitter
|
||||
from langchain.schema.document import Document
|
||||
#from langchain.vectorstores.chroma import Chroma
|
||||
|
||||
logging.basicConfig(filename='chroma.log', level=logging.DEBUG)
|
||||
|
||||
#constants
|
||||
collection_name = "janet_knowledge"
|
||||
|
||||
POSTS_URL = "https://api.d4science.org/rest/2/posts/get-posts-vre/"
|
||||
GCAT_URL = "https://api.d4science.org/catalogue/items/"
|
||||
PDF_PATH = "./PDF"
|
||||
TXT_PATH = "./TXT"
|
||||
headers = {"gcube-token": "2c1e8f88-461c-42c0-8cc1-b7660771c9a3-843339462", "Accept": "application/json"}
|
||||
|
||||
docs = {}
|
||||
|
||||
|
||||
"""
|
||||
Starting the DB:
|
||||
docker pull chromadb/chroma
|
||||
docker run -p 8000:8000 -e CHROMA_SERVER_AUTHN_CREDENTIALS_PROVIDER="chromadb.authn.token.TokenConfigServerAuthCredentialsProvider" -e CHROMA_SERVER_AUTHN_PROVIDER="chromadb.authn.token.TokenAuthenticationServerProvider" -e CHROMA_SERVER_AUTHN_CREDENTIALS="myToken" -e CHROMA_SERVER_AUTHN_TOKEN_TRANSPORT_HEADER="X_CHROMA_TOKEN" -v /Users/ahmed/Desktop/chroma/:/chroma/chroma chromadb/chroma
|
||||
"""
|
||||
|
||||
client = chromadb.HttpClient(
|
||||
host="localhost",
|
||||
port=8000,
|
||||
headers={"Authorization": "Bearer myToken"}
|
||||
#settings=Settings(chroma_client_auth_provider="token", chroma_client_auth_credentials="myToken"),
|
||||
)
|
||||
collection = client.get_or_create_collection(name=collection_name)
|
||||
|
||||
|
||||
def getFilename_fromCd(cd):
|
||||
if not cd:
|
||||
return None
|
||||
fname = re.findall('filename=(.+)', cd)
|
||||
if len(fname) == 0:
|
||||
return None
|
||||
return fname[0]
|
||||
|
||||
def load_documents():
|
||||
document_loader = PyPDFDirectoryLoader(PDF_PATH)
|
||||
logging.debug("loading docs")
|
||||
return document_loader.load()
|
||||
|
||||
|
||||
def split_documents(documents):
|
||||
text_splitter = RecursiveCharacterTextSplitter(
|
||||
chunk_size=800,
|
||||
chunk_overlap=80,
|
||||
length_function=len,
|
||||
is_separator_regex=False,
|
||||
)
|
||||
logging.debug(f"splitting {len(documents)} documents")
|
||||
return text_splitter.split_documents(documents)
|
||||
|
||||
def calculate_chunk_ids(chunks):
|
||||
|
||||
# This will create IDs like "data/monopoly.pdf:6:2"
|
||||
# Page Source : Page Number : Chunk Index
|
||||
|
||||
last_page_id = None
|
||||
current_chunk_index = 0
|
||||
|
||||
for chunk in chunks:
|
||||
source = chunk.metadata.get("source")
|
||||
page = chunk.metadata.get("page")
|
||||
current_page_id = f"{source}:{page}"
|
||||
logging.debug(f"chunking {source} page {page}")
|
||||
|
||||
# If the page ID is the same as the last one, increment the index.
|
||||
if current_page_id == last_page_id:
|
||||
current_chunk_index += 1
|
||||
else:
|
||||
current_chunk_index = 0
|
||||
|
||||
# Calculate the chunk ID.
|
||||
chunk_id = f"{current_page_id}:{current_chunk_index}"
|
||||
last_page_id = current_page_id
|
||||
|
||||
# Add it to the page meta-data.
|
||||
chunk.metadata["id"] = chunk_id
|
||||
#logging.debug(docs[source])
|
||||
#chunk.metadata["extras"] = docs["./"+source]['metadata'] #needs primitive types
|
||||
logging.debug(f"we have {len(chunks)} chunks")
|
||||
return chunks
|
||||
|
||||
|
||||
def add_to_chroma(chunks):
|
||||
# Calculate Page IDs.
|
||||
chunks_with_ids = calculate_chunk_ids(chunks)
|
||||
|
||||
# Add or Update the documents.
|
||||
existing_items = collection.get(include=[]) # IDs are always included by default
|
||||
existing_ids = set(existing_items["ids"])
|
||||
logging.debug(f"Number of existing documents in DB: {len(existing_ids)}")
|
||||
|
||||
# Only add documents that don't exist in the DB.
|
||||
new_chunks = []
|
||||
for chunk in chunks_with_ids:
|
||||
if chunk.metadata["id"] not in existing_ids:
|
||||
new_chunks.append(chunk)
|
||||
|
||||
if len(new_chunks):
|
||||
logging.debug(f"Adding new documents: {len(new_chunks)}")
|
||||
new_chunk_ids = [chunk.metadata["id"] for chunk in new_chunks]
|
||||
chunk_metas = [chunk.metadata for chunk in new_chunks]
|
||||
chunk_content = [chunk.page_content for chunk in new_chunks]
|
||||
logging.debug(f"chunks {len(new_chunks)}")
|
||||
logging.debug(f"chunk_id: {len(new_chunk_ids)} ")
|
||||
collection.add(documents=chunk_content,
|
||||
metadatas=chunk_metas,
|
||||
ids=new_chunk_ids)
|
||||
else:
|
||||
logging.debug("No new documents to add")
|
||||
|
||||
|
||||
|
||||
def get_posts(url, headers):
|
||||
h = html2text.HTML2Text()
|
||||
h.ignore_links = True
|
||||
try:
|
||||
posts = requests.get(url, headers=headers)
|
||||
posts = posts.json()['result']
|
||||
for post in posts:
|
||||
author = post['full_name'].lower()
|
||||
key = post['key']
|
||||
content = h.handle(post['description']).replace('\n', ' ').lower()
|
||||
date = post['time']
|
||||
tags = []
|
||||
for word in content.split():
|
||||
if word[0] == '#':
|
||||
tags.append(word[1:])
|
||||
logging.debug(f"""Adding post {id} whose author is {author} and content is {content} to the index.""")
|
||||
filename = "./TXT/post-"+key+".txt"
|
||||
open(filename, 'w').write(content)
|
||||
#add_doc(filename, {'source': f"""post-{key}""", 'author': author, 'date': date, 'tags': tags}, key)
|
||||
except Exception as e:
|
||||
logging.error(e)
|
||||
|
||||
def get_catalogue(url, headers):
|
||||
try:
|
||||
response = requests.get(url, headers=headers)
|
||||
items = response.json()
|
||||
items_data = []
|
||||
for item in items:
|
||||
api_url = url + item + '/'
|
||||
logging.debug(f"Getting resources of {api_url}")
|
||||
response = requests.get(api_url, headers=headers)
|
||||
try:
|
||||
items_data.append(response.json())
|
||||
except Exception as err:
|
||||
logging.error(err)
|
||||
logging.error(f"response is {response}")
|
||||
|
||||
for item in items_data:
|
||||
logging.debug(item)
|
||||
for el in item['extras']:
|
||||
if el['key'] == 'system:type':
|
||||
rsrc = el['value']
|
||||
if el['key'] == 'Item URL':
|
||||
item_url = el['value']
|
||||
tags = []
|
||||
for tag in item['tags']:
|
||||
tags.append(tag['name'].lower())
|
||||
title = item['title'].lower()
|
||||
author = item['author'].lower()
|
||||
notes = item['notes'].lower()
|
||||
date = item['metadata_created']
|
||||
resources = []
|
||||
for resource in item['resources']:
|
||||
logging.debug(resource)
|
||||
resources.append(
|
||||
{'name': resource['name'].lower(), 'url': resource['url'], 'description': resource['description'].lower()})
|
||||
if rsrc == 'Paper':
|
||||
r = requests.get(resource['url'], headers=headers)
|
||||
filename = ""
|
||||
if 'application/pdf' in r.headers.get('content-type'):
|
||||
filename = './PDF/'+item['title']+"-"+getFilename_fromCd(r.headers.get('content-disposition'))+".pdf"
|
||||
urllib.request.urlretrieve(resource['url'], filename)
|
||||
if 'text/plain' in r.headers.get('content-type'):
|
||||
filename = './TXT/'+item['title']+"-"+getFilename_fromCd(r.headers.get('content-disposition'))+".txt"
|
||||
urllib.request.urlretrieve(resource['url'], filename)
|
||||
add_doc(filename, {'source': f"paper-{title}", 'author': author, 'date': date, 'tags': tags}, filename)
|
||||
filename = "./TXT/paper-"+title+"-desc.txt"
|
||||
open(filename, 'w').write(resource['description'])
|
||||
add_doc(filename, {'source': f"paper-{title}", 'author': author, 'date': date, 'tags': tags}, filename)
|
||||
if rsrc == 'Dataset':
|
||||
filename = "./TXT/dataset-"+title+".txt"
|
||||
open(filename, 'w').write(resource['description'])
|
||||
add_doc(filename, {'source': f"dataset-{title}", 'author': author, 'date': date, 'tags': tags}, filename)
|
||||
|
||||
except Exception as e:
|
||||
logging.error(e)
|
||||
|
||||
def clear_dir():
|
||||
try:
|
||||
for dir in ['./PDF', './TXT']:
|
||||
files = os.listdir(dir)
|
||||
for file in files:
|
||||
file_path = os.path.join(dir, file)
|
||||
if os.path.isfile(file_path):
|
||||
os.remove(file_path)
|
||||
logging.debug("All files deleted successfully.")
|
||||
except Exception as e:
|
||||
logging.error("Error occurred while deleting files.")
|
||||
logging.error(e)
|
||||
|
||||
def create_dir(dir):
|
||||
if not os.path.exists(dir):
|
||||
os.makedirs(dir)
|
||||
|
||||
def add_doc(filename, metadata, id):
|
||||
docs[id] = {'filename': filename, 'metadata': metadata}
|
||||
logging.debug(f"checking {docs[id]}")
|
||||
|
||||
|
||||
def main():
|
||||
#clear_dir()
|
||||
#create_dir("./PDF/")
|
||||
#create_dir("./TXT")
|
||||
#get_posts(POSTS_URL, headers=headers)
|
||||
#get_catalogue(GCAT_URL, headers=headers)
|
||||
|
||||
#documents = load_documents()
|
||||
#chunks = split_documents(documents)
|
||||
#add_to_chroma(chunks)
|
||||
|
||||
results = collection.query(
|
||||
query_texts=["who is leondardo candela?"],
|
||||
n_results=3
|
||||
)
|
||||
print(results)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
#clear_dir()
|
||||
|
||||
#collection.add(
|
||||
# documents=["This is a document about cat", "This is a document about car"],
|
||||
# metadatas=[{"category": "animal"}, {"category": "vehicle"}],
|
||||
# ids=["id1", "id2"]
|
||||
#)
|
||||
|
||||
#results = collection.query(
|
||||
# query_texts=["vehicle"],
|
||||
# n_results=1
|
||||
#)
|
||||
#print(results)
|
||||
|
||||
|
||||
|
||||
#def main():
|
||||
#get stuff from the VRE
|
||||
#add if not there
|
||||
#while True
|
||||
#receive nots from vre and add new additions
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
#if __name__ == "__main__":
|
||||
# main()
|
Loading…
Reference in New Issue