236 lines
13 KiB
Python
236 lines
13 KiB
Python
from datetime import datetime
|
|
import pandas as pd
|
|
import requests
|
|
import os
|
|
from io import BytesIO
|
|
import PyPDF2
|
|
from tqdm.auto import tqdm
|
|
import numpy as np
|
|
import math
|
|
import faiss
|
|
import time
|
|
import threading
|
|
|
|
class VRE:
|
|
def __init__(self, name, token, retriever, directory='./'):
|
|
self.name = name
|
|
self.token = token
|
|
self.catalogue_url = 'https://api.d4science.org/catalogue/items/'
|
|
self.headers = headers = {"gcube-token": self.token, "Accept": "application/json"}
|
|
self.lastupdatetime = datetime.strptime('2021-01-01T00:00:00.000000', '%Y-%m-%dT%H:%M:%S.%f').timestamp()
|
|
self.retriever = retriever
|
|
self.directory = directory
|
|
self.paper_counter = 0
|
|
self.dataset_counter = 0
|
|
self.content_counter = 0
|
|
self.db = {'paper_db': pd.read_json(self.directory + self.name + '_paper.json') if os.path.isfile(self.directory + self.name + '_paper.json') else pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created']),
|
|
'dataset_db': pd.read_json(self.directory + self.name + '_dataset.json') if os.path.isfile(self.directory + self.name + '_dataset.json') else pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created']),
|
|
'content_db': pd.read_json(self.directory + self.name + '_content.json') if os.path.isfile(self.directory + self.name + '_content.json') else pd.DataFrame(columns=['id', 'paperid', 'content'])}
|
|
self.index = {'dataset_titles_index': None if not os.path.isfile(self.directory + 'janet_dataset_titles_index') else faiss.read_index('janet_dataset_titles_index'),
|
|
'paper_titles_index': None if not os.path.isfile(self.directory + 'janet_paper_titles_index') else faiss.read_index('janet_paper_titles_index'),
|
|
'dataset_desc_index': None if not os.path.isfile(self.directory + 'janet_dataset_desc_index') else faiss.read_index('janet_dataset_desc_index'),
|
|
'paper_desc_index': None if not os.path.isfile(self.directory + 'janet_paper_desc_index') else faiss.read_index('janet_paper_desc_index'),
|
|
'content_index': None if not os.path.isfile(self.directory + 'janet_content_index') else faiss.read_index('janet_content_index')}
|
|
self.new_income = False
|
|
|
|
def init(self):
|
|
#first run
|
|
if not os.path.isfile(self.directory + self.name + '_dataset' + '.json') or not os.path.isfile(self.directory + self.name + '_paper' + '.json') or not os.path.isfile(self.directory + self.name + '_content' + '.json'):
|
|
self.get_content()
|
|
if self.index['dataset_titles_index'] is None:
|
|
self.create_index('dataset_db', 'title', 'dataset_titles_index', 'janet_dataset_titles_index')
|
|
self.populate_index('dataset_db', 'title', 'dataset_titles_index', 'janet_dataset_titles_index')
|
|
|
|
if self.index['dataset_desc_index'] is None:
|
|
self.create_index('dataset_db', 'notes', 'dataset_desc_index', 'janet_dataset_desc_index')
|
|
self.populate_index('dataset_db', 'notes', 'dataset_desc_index', 'janet_dataset_desc_index')
|
|
|
|
if self.index['paper_titles_index'] is None:
|
|
self.create_index('paper_db', 'title', 'paper_titles_index', 'janet_paper_titles_index')
|
|
self.populate_index('paper_db', 'title', 'paper_titles_index', 'janet_paper_titles_index')
|
|
|
|
if self.index['paper_desc_index'] is None:
|
|
self.create_index('paper_db', 'notes', 'paper_desc_index', 'janet_paper_desc_index')
|
|
self.populate_index('paper_db', 'notes', 'paper_desc_index', 'janet_paper_desc_index')
|
|
|
|
if self.index['content_index'] is None:
|
|
self.create_index('content_db', 'content', 'content_index', 'janet_content_index')
|
|
self.populate_index('content_db', 'content', 'content_index', 'janet_content_index')
|
|
|
|
|
|
def index_periodic_update(self):
|
|
if self.new_income:
|
|
if len(self.db['content_db'])%100 != 0:
|
|
self.create_index('content_db', 'content', 'content_index', 'janet_content_index')
|
|
self.populate_index('content_db', 'content', 'content_index', 'janet_content_index')
|
|
if len(self.db['paper_db'])%100 != 0:
|
|
self.create_index('paper_db', 'title', 'paper_titles_index', 'janet_paper_titles_index')
|
|
self.populate_index('paper_db', 'title', 'paper_titles_index', 'janet_paper_titles_index')
|
|
self.create_index('paper_db', 'notes', 'paper_desc_index', 'janet_paper_desc_index')
|
|
self.populate_index('paper_db', 'notes', 'paper_desc_index', 'janet_paper_desc_index')
|
|
if len(self.db['dataset_db'])%100 != 0:
|
|
self.create_index('dataset_db', 'title', 'dataset_titles_index', 'janet_dataset_titles_index')
|
|
self.populate_index('dataset_db', 'title', 'dataset_titles_index', 'janet_dataset_titles_index')
|
|
self.create_index('dataset_db', 'notes', 'dataset_desc_index', 'janet_dataset_desc_index')
|
|
self.populate_index('dataset_db', 'notes', 'dataset_desc_index', 'janet_dataset_desc_index')
|
|
self.new_income = False
|
|
|
|
def create_index(self, db_type, attribute, index_type, filename):
|
|
to_index = self.db[db_type][attribute]
|
|
for i, info in enumerate(to_index):
|
|
if i == 0:
|
|
emb = self.retriever.encode([info])
|
|
sentence_embeddings = np.array(emb)
|
|
else:
|
|
emb = self.retriever.encode([info])
|
|
sentence_embeddings = np.append(sentence_embeddings, emb, axis=0)
|
|
|
|
# number of partitions of the coarse quantizer = number of posting lists
|
|
# as rule of thumb, 4*sqrt(N) < nlist < 16*sqrt(N), where N is the size of the database
|
|
nlist = int(4 * math.sqrt(len(sentence_embeddings))) if int(4 * math.sqrt(len(sentence_embeddings))) < len(sentence_embeddings) else len(sentence_embeddings)-1
|
|
code_size = 8 # = number of subquantizers = number of sub-vectors
|
|
n_bits = 4 if len(sentence_embeddings) >= 2**4 else int(math.log2(len(sentence_embeddings))) # n_bits of each code (8 -> 1 byte codes)
|
|
d = sentence_embeddings.shape[1]
|
|
coarse_quantizer = faiss.IndexFlatL2(d) # will keep centroids of coarse quantizer (for inverted list)
|
|
self.index[index_type] = faiss.IndexIVFPQ(coarse_quantizer, d, nlist, code_size, n_bits)
|
|
self.index[index_type].train(sentence_embeddings) # train on a random subset to speed up k-means (NOTE: ensure they are randomly chosen!)
|
|
faiss.write_index(self.index[index_type], filename)
|
|
|
|
def populate_index(self, db_type, attribute, index_type, filename):
|
|
to_index = self.db[db_type][attribute]
|
|
for info in to_index:
|
|
sentence_embedding = np.array(self.retriever.encode([info]))
|
|
self.index[index_type].add(sentence_embedding)
|
|
faiss.write_index(self.index[index_type], filename)
|
|
|
|
def get_content(self):
|
|
response = requests.get(self.catalogue_url, headers=self.headers)
|
|
items = response.json()
|
|
items_data = []
|
|
for item in items:
|
|
api_url = self.catalogue_url + item + '/'
|
|
response = requests.get(api_url, headers=self.headers)
|
|
items_data.append(response.json())
|
|
|
|
keys = ['type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created']
|
|
|
|
paper_df = pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created'])
|
|
dataset_df = pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created'])
|
|
content_df = pd.DataFrame(columns=['id', 'paperid', 'content'])
|
|
|
|
for item in items_data:
|
|
for el in item['extras']:
|
|
if el['key'] == 'system:type':
|
|
rsrc = el['value']
|
|
resources = []
|
|
for resource in item['resources']:
|
|
resources.append(
|
|
{'name': resource['name'].lower(), 'url': resource['url'], 'description': resource['description'].lower()})
|
|
tags = []
|
|
for tag in item['tags']:
|
|
tags.append(tag['name'].lower())
|
|
title = item['title'].lower()
|
|
author = item['author'].lower()
|
|
notes = item['notes'].lower()
|
|
date = datetime.strptime(item['metadata_created'], '%Y-%m-%dT%H:%M:%S.%f').timestamp()
|
|
if date > self.lastupdatetime:
|
|
self.lastupdatetime = date
|
|
if rsrc == 'Paper':
|
|
self.paper_counter += 1
|
|
paper_df.loc[str(self.paper_counter)] = [self.paper_counter, rsrc, resources, tags, title, author, notes, date]
|
|
content_df = self.get_pdf_content(item, content_df)
|
|
content_df = self.get_txt_content(item, content_df)
|
|
if rsrc == 'Dataset':
|
|
self.dataset_counter += 1
|
|
dataset_df.loc[str(self.dataset_counter)] = [self.dataset_counter, rsrc, resources, tags, title, author, notes, date]
|
|
|
|
self.db['paper_db'] = paper_df.sort_values(by='metadata_created', ascending=True)
|
|
self.db['dataset_db'] = dataset_df.sort_values(by='metadata_created', ascending=True)
|
|
self.db['content_db'] = content_df
|
|
|
|
self.db['paper_db'].to_json(self.name + '_paper.json')
|
|
self.db['dataset_db'].to_json(self.name + '_dataset.json')
|
|
|
|
self.db['content_db'].to_json(self.name + '_content.json')
|
|
|
|
# modify query
|
|
def get_vre_update(self):
|
|
print("Getting new items")
|
|
response = requests.get(self.catalogue_url, headers=self.headers)
|
|
items = response.json()
|
|
items_data = []
|
|
for item in items:
|
|
api_url = self.catalogue_url + item + '/'
|
|
response = requests.get(api_url, headers=self.headers)
|
|
if datetime.strptime(response.json()['metadata_created'],'%Y-%m-%dT%H:%M:%S.%f').timestamp() > self.lastupdatetime:
|
|
items_data.append(response.json())
|
|
|
|
keys = ['type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created']
|
|
|
|
paper_df = pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created'])
|
|
dataset_df = pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created'])
|
|
content_df = pd.DataFrame(columns=['id', 'paperid', 'content'])
|
|
|
|
for item in items_data:
|
|
for el in item['extras']:
|
|
if el['key'] == 'system:type':
|
|
rsrc = el['value']
|
|
resources = []
|
|
for resource in item['resources']:
|
|
resources.append(
|
|
{'name': resource['name'].lower(), 'url': resource['url'], 'description': resource['description'].lower()})
|
|
tags = []
|
|
for tag in item['tags']:
|
|
tags.append(tag['name'].lower())
|
|
title = item['title'].lower()
|
|
author = item['author'].lower()
|
|
notes = item['notes'].lower()
|
|
date = datetime.strptime(item['metadata_created'], '%Y-%m-%dT%H:%M:%S.%f').timestamp()
|
|
if date > self.lastupdatetime:
|
|
self.lastupdatetime = date
|
|
|
|
if rsrc == 'Paper':
|
|
self.paper_counter += 1
|
|
paper_df.loc[str(self.paper_counter)] = [self.paper_counter, rsrc, resources, tags, title, author, notes, date]
|
|
content_df = self.get_pdf_content(item, content_df)
|
|
content_df = self.get_txt_content(item, content_df)
|
|
if rsrc == 'Dataset':
|
|
self.dataset_counter += 1
|
|
dataset_df.loc[str(self.dataset_counter)] = [self.dataset_counter, rsrc, resources, tags, title, author, notes, date]
|
|
|
|
self.db['paper_db'] = pd.concat([self.db['paper_db'], paper_df.sort_values(by='metadata_created', ascending=True)])
|
|
self.db['dataset_db'] = pd.concat([self.db['dataset_db'], dataset_df.sort_values(by='metadata_created', ascending=True)])
|
|
|
|
self.db['paper_db'].to_json(self.name + '_paper.json')
|
|
self.db['dataset_db'].to_json(self.name + '_dataset.json')
|
|
self.db['content_db'] = pd.concat([self.db['content_db'], content_df])
|
|
self.db['content_db'].to_json(self.name + '_content.json')
|
|
if not paper_df.empty or not dataset_df.empty or not content_df.empty:
|
|
self.new_income = True
|
|
|
|
def get_pdf_content(self, item, df):
|
|
for rsrc in tqdm(item['resources']):
|
|
response = requests.get(rsrc['url'])
|
|
if 'application/pdf' in response.headers.get('content-type'):
|
|
my_raw_data = response.content
|
|
with BytesIO(my_raw_data) as data:
|
|
read_pdf = PyPDF2.PdfReader(data)
|
|
for page in tqdm(range(len(read_pdf.pages))):
|
|
content = read_pdf.pages[page].extract_text()
|
|
self.content_counter += 1
|
|
df.loc[str(self.content_counter)] = [self.content_counter, self.paper_counter, content]
|
|
return df
|
|
|
|
def get_txt_content(self, item, df):
|
|
for rsrc in tqdm(item['resources']):
|
|
response = requests.get(rsrc['url'])
|
|
if 'text/plain' in response.headers.get('content-type'):
|
|
content = response.text
|
|
self.content_counter += 1
|
|
df.loc[str(self.content_counter)] = [self.content_counter, self.paper_counter, content]
|
|
return df
|
|
def get_db(self):
|
|
return self.db
|
|
def get_index(self):
|
|
return self.index
|