from datetime import datetime import pandas as pd import requests import os from io import BytesIO import PyPDF2 from tqdm.auto import tqdm import numpy as np import math import faiss import time import threading class VRE: def __init__(self, name, token, retriever, directory='/app/'): self.name = name self.token = token self.catalogue_url = 'https://api.d4science.org/catalogue/items/' self.headers = headers = {"gcube-token": self.token, "Accept": "application/json"} self.lastupdatetime = datetime.strptime('2021-01-01T00:00:00.000000', '%Y-%m-%dT%H:%M:%S.%f').timestamp() self.retriever = retriever self.directory = directory self.paper_counter = 0 self.dataset_counter = 0 self.content_counter = 0 self.db = {'paper_db': pd.read_json(self.directory + self.name + '_paper.json') if os.path.isfile(self.directory + self.name + '_paper.json') else pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created']), 'dataset_db': pd.read_json(self.directory + self.name + '_dataset.json') if os.path.isfile(self.directory + self.name + '_dataset.json') else pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created']), 'content_db': pd.read_json(self.directory + self.name + '_content.json') if os.path.isfile(self.directory + self.name + '_content.json') else pd.DataFrame(columns=['id', 'paperid', 'content'])} self.index = {'dataset_titles_index': None if not os.path.isfile(self.directory + 'janet_dataset_titles_index') else faiss.read_index(self.directory + 'janet_dataset_titles_index'), 'paper_titles_index': None if not os.path.isfile(self.directory + 'janet_paper_titles_index') else faiss.read_index(self.directory + 'janet_paper_titles_index'), 'dataset_desc_index': None if not os.path.isfile(self.directory + 'janet_dataset_desc_index') else faiss.read_index(self.directory + 'janet_dataset_desc_index'), 'paper_desc_index': None if not os.path.isfile(self.directory + 'janet_paper_desc_index') else faiss.read_index(self.directory + 'janet_paper_desc_index'), 'content_index': None if not os.path.isfile(self.directory + 'janet_content_index') else faiss.read_index(self.directory + 'janet_content_index')} self.new_income = False def init(self): #first run if not os.path.isfile(self.directory + self.name + '_dataset' + '.json') or not os.path.isfile(self.directory + self.name + '_paper' + '.json') or not os.path.isfile(self.directory + self.name + '_content' + '.json'): self.get_content() if self.index['dataset_titles_index'] is None: self.create_index('dataset_db', 'title', 'dataset_titles_index', 'janet_dataset_titles_index') self.populate_index('dataset_db', 'title', 'dataset_titles_index', 'janet_dataset_titles_index') if self.index['dataset_desc_index'] is None: self.create_index('dataset_db', 'notes', 'dataset_desc_index', 'janet_dataset_desc_index') self.populate_index('dataset_db', 'notes', 'dataset_desc_index', 'janet_dataset_desc_index') if self.index['paper_titles_index'] is None: self.create_index('paper_db', 'title', 'paper_titles_index', 'janet_paper_titles_index') self.populate_index('paper_db', 'title', 'paper_titles_index', 'janet_paper_titles_index') if self.index['paper_desc_index'] is None: self.create_index('paper_db', 'notes', 'paper_desc_index', 'janet_paper_desc_index') self.populate_index('paper_db', 'notes', 'paper_desc_index', 'janet_paper_desc_index') if self.index['content_index'] is None: self.create_index('content_db', 'content', 'content_index', 'janet_content_index') self.populate_index('content_db', 'content', 'content_index', 'janet_content_index') def index_periodic_update(self): if self.new_income: if len(self.db['content_db'])%100 != 0: self.create_index('content_db', 'content', 'content_index', 'janet_content_index') self.populate_index('content_db', 'content', 'content_index', 'janet_content_index') if len(self.db['paper_db'])%100 != 0: self.create_index('paper_db', 'title', 'paper_titles_index', 'janet_paper_titles_index') self.populate_index('paper_db', 'title', 'paper_titles_index', 'janet_paper_titles_index') self.create_index('paper_db', 'notes', 'paper_desc_index', 'janet_paper_desc_index') self.populate_index('paper_db', 'notes', 'paper_desc_index', 'janet_paper_desc_index') if len(self.db['dataset_db'])%100 != 0: self.create_index('dataset_db', 'title', 'dataset_titles_index', 'janet_dataset_titles_index') self.populate_index('dataset_db', 'title', 'dataset_titles_index', 'janet_dataset_titles_index') self.create_index('dataset_db', 'notes', 'dataset_desc_index', 'janet_dataset_desc_index') self.populate_index('dataset_db', 'notes', 'dataset_desc_index', 'janet_dataset_desc_index') self.new_income = False def create_index(self, db_type, attribute, index_type, filename): filename = self.directory + filename to_index = self.db[db_type][attribute] for i, info in enumerate(to_index): if i == 0: emb = self.retriever.encode([info]) sentence_embeddings = np.array(emb) else: emb = self.retriever.encode([info]) sentence_embeddings = np.append(sentence_embeddings, emb, axis=0) # number of partitions of the coarse quantizer = number of posting lists # as rule of thumb, 4*sqrt(N) < nlist < 16*sqrt(N), where N is the size of the database nlist = int(4 * math.sqrt(len(sentence_embeddings))) if int(4 * math.sqrt(len(sentence_embeddings))) < len(sentence_embeddings) else len(sentence_embeddings)-1 code_size = 8 # = number of subquantizers = number of sub-vectors n_bits = 4 if len(sentence_embeddings) >= 2**4 else int(math.log2(len(sentence_embeddings))) # n_bits of each code (8 -> 1 byte codes) d = sentence_embeddings.shape[1] coarse_quantizer = faiss.IndexFlatL2(d) # will keep centroids of coarse quantizer (for inverted list) self.index[index_type] = faiss.IndexIVFPQ(coarse_quantizer, d, nlist, code_size, n_bits) self.index[index_type].train(sentence_embeddings) # train on a random subset to speed up k-means (NOTE: ensure they are randomly chosen!) faiss.write_index(self.index[index_type], filename) def populate_index(self, db_type, attribute, index_type, filename): filename = self.directory + filename to_index = self.db[db_type][attribute] for info in to_index: sentence_embedding = np.array(self.retriever.encode([info])) self.index[index_type].add(sentence_embedding) faiss.write_index(self.index[index_type], filename) def get_content(self): response = requests.get(self.catalogue_url, headers=self.headers) items = response.json() items_data = [] for item in items: api_url = self.catalogue_url + item + '/' response = requests.get(api_url, headers=self.headers) items_data.append(response.json()) keys = ['type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created'] paper_df = pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created']) dataset_df = pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created']) content_df = pd.DataFrame(columns=['id', 'paperid', 'content']) for item in items_data: for el in item['extras']: if el['key'] == 'system:type': rsrc = el['value'] resources = [] for resource in item['resources']: resources.append( {'name': resource['name'].lower(), 'url': resource['url'], 'description': resource['description'].lower()}) tags = [] for tag in item['tags']: tags.append(tag['name'].lower()) title = item['title'].lower() author = item['author'].lower() notes = item['notes'].lower() date = datetime.strptime(item['metadata_created'], '%Y-%m-%dT%H:%M:%S.%f').timestamp() if date > self.lastupdatetime: self.lastupdatetime = date if rsrc == 'Paper': self.paper_counter += 1 paper_df.loc[str(self.paper_counter)] = [self.paper_counter, rsrc, resources, tags, title, author, notes, date] content_df = self.get_pdf_content(item, content_df) content_df = self.get_txt_content(item, content_df) if rsrc == 'Dataset': self.dataset_counter += 1 dataset_df.loc[str(self.dataset_counter)] = [self.dataset_counter, rsrc, resources, tags, title, author, notes, date] self.db['paper_db'] = paper_df.sort_values(by='metadata_created', ascending=True) self.db['dataset_db'] = dataset_df.sort_values(by='metadata_created', ascending=True) self.db['content_db'] = content_df self.db['paper_db'].to_json(self.directory + self.name + '_paper.json') self.db['dataset_db'].to_json(self.directory + self.name + '_dataset.json') self.db['content_db'].to_json(self.directory + self.name + '_content.json') # modify query def get_vre_update(self): print("Getting new items") response = requests.get(self.catalogue_url, headers=self.headers) items = response.json() items_data = [] for item in items: api_url = self.catalogue_url + item + '/' response = requests.get(api_url, headers=self.headers) if datetime.strptime(response.json()['metadata_created'],'%Y-%m-%dT%H:%M:%S.%f').timestamp() > self.lastupdatetime: items_data.append(response.json()) keys = ['type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created'] paper_df = pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created']) dataset_df = pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created']) content_df = pd.DataFrame(columns=['id', 'paperid', 'content']) for item in items_data: for el in item['extras']: if el['key'] == 'system:type': rsrc = el['value'] resources = [] for resource in item['resources']: resources.append( {'name': resource['name'].lower(), 'url': resource['url'], 'description': resource['description'].lower()}) tags = [] for tag in item['tags']: tags.append(tag['name'].lower()) title = item['title'].lower() author = item['author'].lower() notes = item['notes'].lower() date = datetime.strptime(item['metadata_created'], '%Y-%m-%dT%H:%M:%S.%f').timestamp() if date > self.lastupdatetime: self.lastupdatetime = date if rsrc == 'Paper': self.paper_counter += 1 paper_df.loc[str(self.paper_counter)] = [self.paper_counter, rsrc, resources, tags, title, author, notes, date] content_df = self.get_pdf_content(item, content_df) content_df = self.get_txt_content(item, content_df) if rsrc == 'Dataset': self.dataset_counter += 1 dataset_df.loc[str(self.dataset_counter)] = [self.dataset_counter, rsrc, resources, tags, title, author, notes, date] self.db['paper_db'] = pd.concat([self.db['paper_db'], paper_df.sort_values(by='metadata_created', ascending=True)]) self.db['dataset_db'] = pd.concat([self.db['dataset_db'], dataset_df.sort_values(by='metadata_created', ascending=True)]) self.db['paper_db'].to_json(self.directory + self.name + '_paper.json') self.db['dataset_db'].to_json(self.directory + self.name + '_dataset.json') self.db['content_db'] = pd.concat([self.db['content_db'], content_df]) self.db['content_db'].to_json(self.directory + self.name + '_content.json') if not paper_df.empty or not dataset_df.empty or not content_df.empty: self.new_income = True def get_pdf_content(self, item, df): for rsrc in tqdm(item['resources']): response = requests.get(rsrc['url']) if 'application/pdf' in response.headers.get('content-type'): my_raw_data = response.content with BytesIO(my_raw_data) as data: read_pdf = PyPDF2.PdfReader(data) for page in tqdm(range(len(read_pdf.pages))): content = read_pdf.pages[page].extract_text() self.content_counter += 1 df.loc[str(self.content_counter)] = [self.content_counter, self.paper_counter, content] return df def get_txt_content(self, item, df): for rsrc in tqdm(item['resources']): response = requests.get(rsrc['url']) if 'text/plain' in response.headers.get('content-type'): content = response.text self.content_counter += 1 df.loc[str(self.content_counter)] = [self.content_counter, self.paper_counter, content] return df def get_db(self): return self.db def get_index(self): return self.index