Janet/JanetBackEnd/VRE.py

236 lines
13 KiB
Python

from datetime import datetime
import pandas as pd
import requests
import os
from io import BytesIO
import PyPDF2
from tqdm.auto import tqdm
import numpy as np
import math
import faiss
import time
import threading
class VRE:
def __init__(self, name, token, retriever, directory='./'):
self.name = name
self.token = token
self.catalogue_url = 'https://api.d4science.org/catalogue/items/'
self.headers = headers = {"gcube-token": self.token, "Accept": "application/json"}
self.lastupdatetime = datetime.strptime('2021-01-01T00:00:00.000000', '%Y-%m-%dT%H:%M:%S.%f').timestamp()
self.retriever = retriever
self.directory = directory
self.paper_counter = 0
self.dataset_counter = 0
self.content_counter = 0
self.db = {'paper_db': pd.read_json(self.directory + self.name + '_paper.json') if os.path.isfile(self.directory + self.name + '_paper.json') else pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created']),
'dataset_db': pd.read_json(self.directory + self.name + '_dataset.json') if os.path.isfile(self.directory + self.name + '_dataset.json') else pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created']),
'content_db': pd.read_json(self.directory + self.name + '_content.json') if os.path.isfile(self.directory + self.name + '_content.json') else pd.DataFrame(columns=['id', 'paperid', 'content'])}
self.index = {'dataset_titles_index': None if not os.path.isfile(self.directory + 'janet_dataset_titles_index') else faiss.read_index('janet_dataset_titles_index'),
'paper_titles_index': None if not os.path.isfile(self.directory + 'janet_paper_titles_index') else faiss.read_index('janet_paper_titles_index'),
'dataset_desc_index': None if not os.path.isfile(self.directory + 'janet_dataset_desc_index') else faiss.read_index('janet_dataset_desc_index'),
'paper_desc_index': None if not os.path.isfile(self.directory + 'janet_paper_desc_index') else faiss.read_index('janet_paper_desc_index'),
'content_index': None if not os.path.isfile(self.directory + 'janet_content_index') else faiss.read_index('janet_content_index')}
self.new_income = False
def init(self):
#first run
if not os.path.isfile(self.directory + self.name + '_dataset' + '.json') or not os.path.isfile(self.directory + self.name + '_paper' + '.json') or not os.path.isfile(self.directory + self.name + '_content' + '.json'):
self.get_content()
if self.index['dataset_titles_index'] is None:
self.create_index('dataset_db', 'title', 'dataset_titles_index', 'janet_dataset_titles_index')
self.populate_index('dataset_db', 'title', 'dataset_titles_index', 'janet_dataset_titles_index')
if self.index['dataset_desc_index'] is None:
self.create_index('dataset_db', 'notes', 'dataset_desc_index', 'janet_dataset_desc_index')
self.populate_index('dataset_db', 'notes', 'dataset_desc_index', 'janet_dataset_desc_index')
if self.index['paper_titles_index'] is None:
self.create_index('paper_db', 'title', 'paper_titles_index', 'janet_paper_titles_index')
self.populate_index('paper_db', 'title', 'paper_titles_index', 'janet_paper_titles_index')
if self.index['paper_desc_index'] is None:
self.create_index('paper_db', 'notes', 'paper_desc_index', 'janet_paper_desc_index')
self.populate_index('paper_db', 'notes', 'paper_desc_index', 'janet_paper_desc_index')
if self.index['content_index'] is None:
self.create_index('content_db', 'content', 'content_index', 'janet_content_index')
self.populate_index('content_db', 'content', 'content_index', 'janet_content_index')
def index_periodic_update(self):
if self.new_income:
if len(self.db['content_db'])%100 != 0:
self.create_index('content_db', 'content', 'content_index', 'janet_content_index')
self.populate_index('content_db', 'content', 'content_index', 'janet_content_index')
if len(self.db['paper_db'])%100 != 0:
self.create_index('paper_db', 'title', 'paper_titles_index', 'janet_paper_titles_index')
self.populate_index('paper_db', 'title', 'paper_titles_index', 'janet_paper_titles_index')
self.create_index('paper_db', 'notes', 'paper_desc_index', 'janet_paper_desc_index')
self.populate_index('paper_db', 'notes', 'paper_desc_index', 'janet_paper_desc_index')
if len(self.db['dataset_db'])%100 != 0:
self.create_index('dataset_db', 'title', 'dataset_titles_index', 'janet_dataset_titles_index')
self.populate_index('dataset_db', 'title', 'dataset_titles_index', 'janet_dataset_titles_index')
self.create_index('dataset_db', 'notes', 'dataset_desc_index', 'janet_dataset_desc_index')
self.populate_index('dataset_db', 'notes', 'dataset_desc_index', 'janet_dataset_desc_index')
self.new_income = False
def create_index(self, db_type, attribute, index_type, filename):
to_index = self.db[db_type][attribute]
for i, info in enumerate(to_index):
if i == 0:
emb = self.retriever.encode([info])
sentence_embeddings = np.array(emb)
else:
emb = self.retriever.encode([info])
sentence_embeddings = np.append(sentence_embeddings, emb, axis=0)
# number of partitions of the coarse quantizer = number of posting lists
# as rule of thumb, 4*sqrt(N) < nlist < 16*sqrt(N), where N is the size of the database
nlist = int(4 * math.sqrt(len(sentence_embeddings))) if int(4 * math.sqrt(len(sentence_embeddings))) < len(sentence_embeddings) else len(sentence_embeddings)-1
code_size = 8 # = number of subquantizers = number of sub-vectors
n_bits = 4 if len(sentence_embeddings) >= 2**4 else int(math.log2(len(sentence_embeddings))) # n_bits of each code (8 -> 1 byte codes)
d = sentence_embeddings.shape[1]
coarse_quantizer = faiss.IndexFlatL2(d) # will keep centroids of coarse quantizer (for inverted list)
self.index[index_type] = faiss.IndexIVFPQ(coarse_quantizer, d, nlist, code_size, n_bits)
self.index[index_type].train(sentence_embeddings) # train on a random subset to speed up k-means (NOTE: ensure they are randomly chosen!)
faiss.write_index(self.index[index_type], filename)
def populate_index(self, db_type, attribute, index_type, filename):
to_index = self.db[db_type][attribute]
for info in to_index:
sentence_embedding = np.array(self.retriever.encode([info]))
self.index[index_type].add(sentence_embedding)
faiss.write_index(self.index[index_type], filename)
def get_content(self):
response = requests.get(self.catalogue_url, headers=self.headers)
items = response.json()
items_data = []
for item in items:
api_url = self.catalogue_url + item + '/'
response = requests.get(api_url, headers=self.headers)
items_data.append(response.json())
keys = ['type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created']
paper_df = pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created'])
dataset_df = pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created'])
content_df = pd.DataFrame(columns=['id', 'paperid', 'content'])
for item in items_data:
for el in item['extras']:
if el['key'] == 'system:type':
rsrc = el['value']
resources = []
for resource in item['resources']:
resources.append(
{'name': resource['name'].lower(), 'url': resource['url'], 'description': resource['description'].lower()})
tags = []
for tag in item['tags']:
tags.append(tag['name'].lower())
title = item['title'].lower()
author = item['author'].lower()
notes = item['notes'].lower()
date = datetime.strptime(item['metadata_created'], '%Y-%m-%dT%H:%M:%S.%f').timestamp()
if date > self.lastupdatetime:
self.lastupdatetime = date
if rsrc == 'Paper':
self.paper_counter += 1
paper_df.loc[str(self.paper_counter)] = [self.paper_counter, rsrc, resources, tags, title, author, notes, date]
content_df = self.get_pdf_content(item, content_df)
content_df = self.get_txt_content(item, content_df)
if rsrc == 'Dataset':
self.dataset_counter += 1
dataset_df.loc[str(self.dataset_counter)] = [self.dataset_counter, rsrc, resources, tags, title, author, notes, date]
self.db['paper_db'] = paper_df.sort_values(by='metadata_created', ascending=True)
self.db['dataset_db'] = dataset_df.sort_values(by='metadata_created', ascending=True)
self.db['content_db'] = content_df
self.db['paper_db'].to_json(self.name + '_paper.json')
self.db['dataset_db'].to_json(self.name + '_dataset.json')
self.db['content_db'].to_json(self.name + '_content.json')
# modify query
def get_vre_update(self):
print("Getting new items")
response = requests.get(self.catalogue_url, headers=self.headers)
items = response.json()
items_data = []
for item in items:
api_url = self.catalogue_url + item + '/'
response = requests.get(api_url, headers=self.headers)
if datetime.strptime(response.json()['metadata_created'],'%Y-%m-%dT%H:%M:%S.%f').timestamp() > self.lastupdatetime:
items_data.append(response.json())
keys = ['type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created']
paper_df = pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created'])
dataset_df = pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created'])
content_df = pd.DataFrame(columns=['id', 'paperid', 'content'])
for item in items_data:
for el in item['extras']:
if el['key'] == 'system:type':
rsrc = el['value']
resources = []
for resource in item['resources']:
resources.append(
{'name': resource['name'].lower(), 'url': resource['url'], 'description': resource['description'].lower()})
tags = []
for tag in item['tags']:
tags.append(tag['name'].lower())
title = item['title'].lower()
author = item['author'].lower()
notes = item['notes'].lower()
date = datetime.strptime(item['metadata_created'], '%Y-%m-%dT%H:%M:%S.%f').timestamp()
if date > self.lastupdatetime:
self.lastupdatetime = date
if rsrc == 'Paper':
self.paper_counter += 1
paper_df.loc[str(self.paper_counter)] = [self.paper_counter, rsrc, resources, tags, title, author, notes, date]
content_df = self.get_pdf_content(item, content_df)
content_df = self.get_txt_content(item, content_df)
if rsrc == 'Dataset':
self.dataset_counter += 1
dataset_df.loc[str(self.dataset_counter)] = [self.dataset_counter, rsrc, resources, tags, title, author, notes, date]
self.db['paper_db'] = pd.concat([self.db['paper_db'], paper_df.sort_values(by='metadata_created', ascending=True)])
self.db['dataset_db'] = pd.concat([self.db['dataset_db'], dataset_df.sort_values(by='metadata_created', ascending=True)])
self.db['paper_db'].to_json(self.name + '_paper.json')
self.db['dataset_db'].to_json(self.name + '_dataset.json')
self.db['content_db'] = pd.concat([self.db['content_db'], content_df])
self.db['content_db'].to_json(self.name + '_content.json')
if not paper_df.empty or not dataset_df.empty or not content_df.empty:
self.new_income = True
def get_pdf_content(self, item, df):
for rsrc in tqdm(item['resources']):
response = requests.get(rsrc['url'])
if 'application/pdf' in response.headers.get('content-type'):
my_raw_data = response.content
with BytesIO(my_raw_data) as data:
read_pdf = PyPDF2.PdfReader(data)
for page in tqdm(range(len(read_pdf.pages))):
content = read_pdf.pages[page].extract_text()
self.content_counter += 1
df.loc[str(self.content_counter)] = [self.content_counter, self.paper_counter, content]
return df
def get_txt_content(self, item, df):
for rsrc in tqdm(item['resources']):
response = requests.get(rsrc['url'])
if 'text/plain' in response.headers.get('content-type'):
content = response.text
self.content_counter += 1
df.loc[str(self.content_counter)] = [self.content_counter, self.paper_counter, content]
return df
def get_db(self):
return self.db
def get_index(self):
return self.index