JanetBackEnd/VRE.py

419 lines
22 KiB
Python

from datetime import datetime
import pandas as pd
import string
import re
import requests
import os
from io import BytesIO
from tqdm.auto import tqdm
import numpy as np
import math
import faiss
import pdfquery
import urllib.request
import time
import threading
import html2text
class VRE:
def __init__(self, name, token, retriever, directory='/app/'):
self.name = name
self.token = token
self.catalogue_url = 'https://api.d4science.org/catalogue/items/'
self.socialnetwork_url = 'https://api.d4science.org/rest/2/posts/get-posts-vre/'
self.headers = headers = {"gcube-token": self.token, "Accept": "application/json"}
self.lastupdatetime = datetime.strptime('2021-01-01T00:00:00.000000', '%Y-%m-%dT%H:%M:%S.%f').timestamp()
self.postlastupdate = datetime.strptime('2021-01-01T00:00:00.000000', '%Y-%m-%dT%H:%M:%S.%f').timestamp()
self.retriever = retriever
self.directory = directory
self.post_counter = 0
self.paper_counter = 0
self.dataset_counter = 0
self.content_counter = 0
self.db = {'paper_db': pd.read_json(self.directory + self.name + '_paper.json') if os.path.isfile(self.directory + self.name + '_paper.json') else pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created']),
'dataset_db': pd.read_json(self.directory + self.name + '_dataset.json') if os.path.isfile(self.directory + self.name + '_dataset.json') else pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created']),
'content_db': pd.read_json(self.directory + self.name + '_content.json') if os.path.isfile(self.directory + self.name + '_content.json') else pd.DataFrame(columns=['id', 'paperid', 'content']),
'post_db': pd.read_json(self.directory + self.name + '_post.json') if os.path.isfile(self.directory + self.name + '_post.json') else pd.DataFrame(columns=['id', 'author', 'content', 'time', 'tags'])}
self.index = {'dataset_titles_index': None if not os.path.isfile(self.directory + 'janet_dataset_titles_index') else faiss.read_index(self.directory + 'janet_dataset_titles_index'),
'paper_titles_index': None if not os.path.isfile(self.directory + 'janet_paper_titles_index') else faiss.read_index(self.directory + 'janet_paper_titles_index'),
'dataset_desc_index': None if not os.path.isfile(self.directory + 'janet_dataset_desc_index') else faiss.read_index(self.directory + 'janet_dataset_desc_index'),
'paper_desc_index': None if not os.path.isfile(self.directory + 'janet_paper_desc_index') else faiss.read_index(self.directory + 'janet_paper_desc_index'),
'content_index': None if not os.path.isfile(self.directory + 'janet_content_index') else faiss.read_index(self.directory + 'janet_content_index'),
'post_index': None if not os.path.isfile(self.directory + 'janet_post_index') else faiss.read_index(self.directory + 'janet_post_index')}
self.new_income = False
def init(self):
#first run
if not os.path.isfile(self.directory + self.name + '_dataset' + '.json') or not os.path.isfile(self.directory + self.name + '_paper' + '.json') or not os.path.isfile(self.directory + self.name + '_content' + '.json') or not os.path.isfile(self.directory + self.name + '_post' + '.json'):
self.get_content()
if self.index['dataset_titles_index'] is None:
self.create_index('dataset_db', 'title', 'dataset_titles_index', 'janet_dataset_titles_index')
self.populate_index('dataset_db', 'title', 'dataset_titles_index', 'janet_dataset_titles_index')
if self.index['dataset_desc_index'] is None:
self.create_index('dataset_db', 'notes', 'dataset_desc_index', 'janet_dataset_desc_index')
self.populate_index('dataset_db', 'notes', 'dataset_desc_index', 'janet_dataset_desc_index')
if self.index['paper_titles_index'] is None:
self.create_index('paper_db', 'title', 'paper_titles_index', 'janet_paper_titles_index')
self.populate_index('paper_db', 'title', 'paper_titles_index', 'janet_paper_titles_index')
if self.index['paper_desc_index'] is None:
self.create_index('paper_db', 'notes', 'paper_desc_index', 'janet_paper_desc_index')
self.populate_index('paper_db', 'notes', 'paper_desc_index', 'janet_paper_desc_index')
if self.index['content_index'] is None:
self.create_index('content_db', 'content', 'content_index', 'janet_content_index')
self.populate_index('content_db', 'content', 'content_index', 'janet_content_index')
if self.index['post_index'] is None:
self.create_index('post_db', 'content', 'post_index', 'janet_post_index')
self.populate_index('post_db', 'content', 'post_index', 'janet_post_index')
def index_periodic_update(self):
if self.new_income:
if len(self.db['content_db'])%100 != 0:
self.create_index('content_db', 'content', 'content_index', 'janet_content_index')
self.populate_index('content_db', 'content', 'content_index', 'janet_content_index')
if len(self.db['post_db'])%100 != 0:
self.create_index('post_db', 'content', 'post_index', 'janet_post_index')
self.populate_index('post_db', 'content', 'post_index', 'janet_post_index')
if len(self.db['paper_db'])%100 != 0:
self.create_index('paper_db', 'title', 'paper_titles_index', 'janet_paper_titles_index')
self.populate_index('paper_db', 'title', 'paper_titles_index', 'janet_paper_titles_index')
self.create_index('paper_db', 'notes', 'paper_desc_index', 'janet_paper_desc_index')
self.populate_index('paper_db', 'notes', 'paper_desc_index', 'janet_paper_desc_index')
if len(self.db['dataset_db'])%100 != 0:
self.create_index('dataset_db', 'title', 'dataset_titles_index', 'janet_dataset_titles_index')
self.populate_index('dataset_db', 'title', 'dataset_titles_index', 'janet_dataset_titles_index')
self.create_index('dataset_db', 'notes', 'dataset_desc_index', 'janet_dataset_desc_index')
self.populate_index('dataset_db', 'notes', 'dataset_desc_index', 'janet_dataset_desc_index')
self.new_income = False
def create_index(self, db_type, attribute, index_type, filename):
filename = self.directory + filename
to_index = self.db[db_type][attribute]
for i, info in enumerate(to_index):
if i == 0:
emb = self.retriever.encode([info])
sentence_embeddings = np.array(emb)
else:
emb = self.retriever.encode([info])
sentence_embeddings = np.append(sentence_embeddings, emb, axis=0)
# number of partitions of the coarse quantizer = number of posting lists
# as rule of thumb, 4*sqrt(N) < nlist < 16*sqrt(N), where N is the size of the database
nlist = int(4 * math.sqrt(len(sentence_embeddings))) if int(4 * math.sqrt(len(sentence_embeddings))) < len(sentence_embeddings) else len(sentence_embeddings)-1
code_size = 8 # = number of subquantizers = number of sub-vectors
n_bits = 4 if len(sentence_embeddings) >= 2**4 else int(math.log2(len(sentence_embeddings))) # n_bits of each code (8 -> 1 byte codes)
d = sentence_embeddings.shape[1]
coarse_quantizer = faiss.IndexFlatL2(d) # will keep centroids of coarse quantizer (for inverted list)
self.index[index_type] = faiss.IndexIVFPQ(coarse_quantizer, d, nlist, code_size, n_bits)
self.index[index_type].train(sentence_embeddings) # train on a random subset to speed up k-means (NOTE: ensure they are randomly chosen!)
faiss.write_index(self.index[index_type], filename)
def populate_index(self, db_type, attribute, index_type, filename):
filename = self.directory + filename
to_index = self.db[db_type][attribute]
for info in to_index:
sentence_embedding = np.array(self.retriever.encode([info]))
self.index[index_type].add(sentence_embedding)
faiss.write_index(self.index[index_type], filename)
def get_content(self):
h = html2text.HTML2Text()
h.ignore_links = True
#posts
posts = requests.get(self.socialnetwork_url, headers=self.headers)
posts = posts.json()['result']
post_df = pd.DataFrame(columns=['id', 'author', 'content', 'time', 'tags'])
for post in posts:
author = post['full_name'].lower()
content = h.handle(post['description']).replace('\n', ' ').lower()
date = post['time']
tags = []
for word in content.split():
if word[0] == '#':
tags.append(word[1:])
if date > self.postlastupdate:
self.postlastupdate = date
self.post_counter += 1
post_df.loc[str(self.post_counter)] = [self.post_counter, author, content, date, tags]
#catalog
response = requests.get(self.catalogue_url, headers=self.headers)
items = response.json()
items_data = []
for item in items:
api_url = self.catalogue_url + item + '/'
response = requests.get(api_url, headers=self.headers)
items_data.append(response.json())
keys = ['type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created']
paper_df = pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created'])
dataset_df = pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created'])
content_df = pd.DataFrame(columns=['id', 'paperid', 'content'])
for item in items_data:
for el in item['extras']:
if el['key'] == 'system:type':
rsrc = el['value']
resources = []
for resource in item['resources']:
resources.append(
{'name': resource['name'].lower(), 'url': resource['url'], 'description': resource['description'].lower()})
tags = []
for tag in item['tags']:
tags.append(tag['name'].lower())
title = item['title'].lower()
author = item['author'].lower()
notes = item['notes'].lower()
date = datetime.strptime(item['metadata_created'], '%Y-%m-%dT%H:%M:%S.%f').timestamp()
if date > self.lastupdatetime:
self.lastupdatetime = date
if rsrc == 'Paper':
self.paper_counter += 1
paper_df.loc[str(self.paper_counter)] = [self.paper_counter, rsrc, resources, tags, title, author, notes, date]
content_df = self.get_pdf_content(item, content_df)
content_df = self.get_txt_content(item, content_df)
if rsrc == 'Dataset':
self.dataset_counter += 1
dataset_df.loc[str(self.dataset_counter)] = [self.dataset_counter, rsrc, resources, tags, title, author, notes, date]
self.db['paper_db'] = paper_df.sort_values(by='metadata_created', ascending=True)
self.db['dataset_db'] = dataset_df.sort_values(by='metadata_created', ascending=True)
self.db['post_db'] = post_df.sort_values(by='time', ascending=True)
self.db['content_db'] = content_df
self.db['paper_db'].to_json(self.directory + self.name + '_paper.json')
self.db['dataset_db'].to_json(self.directory + self.name + '_dataset.json')
self.db['content_db'].to_json(self.directory + self.name + '_content.json')
self.db['post_db'].to_json(self.directory+self.name+'_post.json')
# modify query
def get_vre_update(self):
print("Getting new items")
h = html2text.HTML2Text()
h.ignore_links = True
#posts
posts = requests.get(self.socialnetwork_url, headers=self.headers)
posts = posts.json()['result']
new_posts = []
for post in posts:
if post['time'] > self.postlastupdate:
new_posts.append(post)
post_df = pd.DataFrame(columns=['id', 'author', 'content', 'time', 'tags'])
for post in new_posts:
author = post['full_name'].lower()
content = h.handle(post['description']).replace('\n', ' ').lower()
date = post['time']
tags = []
for word in content.split():
if word[0] == '#':
tags.append(word[1:])
if date > self.postlastupdate:
self.postlastupdate = date
self.post_counter += 1
post_df.loc[str(self.post_counter)] = [self.post_counter, author, content, date, tags]
#catalog
response = requests.get(self.catalogue_url, headers=self.headers)
items = response.json()
items_data = []
for item in items:
api_url = self.catalogue_url + item + '/'
response = requests.get(api_url, headers=self.headers)
if datetime.strptime(response.json()['metadata_created'],'%Y-%m-%dT%H:%M:%S.%f').timestamp() > self.lastupdatetime:
items_data.append(response.json())
keys = ['type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created']
paper_df = pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created'])
dataset_df = pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created'])
content_df = pd.DataFrame(columns=['id', 'paperid', 'content'])
for item in items_data:
for el in item['extras']:
if el['key'] == 'system:type':
rsrc = el['value']
resources = []
for resource in item['resources']:
resources.append(
{'name': resource['name'].lower(), 'url': resource['url'], 'description': resource['description'].lower()})
tags = []
for tag in item['tags']:
tags.append(tag['name'].lower())
title = item['title'].lower()
author = item['author'].lower()
notes = item['notes'].lower()
date = datetime.strptime(item['metadata_created'], '%Y-%m-%dT%H:%M:%S.%f').timestamp()
if date > self.lastupdatetime:
self.lastupdatetime = date
if rsrc == 'Paper':
self.paper_counter += 1
paper_df.loc[str(self.paper_counter)] = [self.paper_counter, rsrc, resources, tags, title, author, notes, date]
content_df = self.get_pdf_content(item, content_df)
content_df = self.get_txt_content(item, content_df)
if rsrc == 'Dataset':
self.dataset_counter += 1
dataset_df.loc[str(self.dataset_counter)] = [self.dataset_counter, rsrc, resources, tags, title, author, notes, date]
self.db['paper_db'] = pd.concat([self.db['paper_db'], paper_df.sort_values(by='metadata_created', ascending=True)])
self.db['dataset_db'] = pd.concat([self.db['dataset_db'], dataset_df.sort_values(by='metadata_created', ascending=True)])
self.db['post_db'] = pd.concat([self.db['post_db'], post_df.sort_values(by='time', ascending=True)])
self.db['post_db'].to_json(self.directory+self.name+'_post.json')
self.db['paper_db'].to_json(self.directory + self.name + '_paper.json')
self.db['dataset_db'].to_json(self.directory + self.name + '_dataset.json')
self.db['content_db'] = pd.concat([self.db['content_db'], content_df])
self.db['content_db'].to_json(self.directory + self.name + '_content.json')
if not paper_df.empty or not dataset_df.empty or not content_df.empty or not post_df.empty:
self.new_income = True
def remove_useless_dots(self, line):
modline = ''
for i in range(0, len(line)):
if line[i] != '.':
modline+=line[i]
if line[i] == '.':
if line[i-2] == ' ' or line[i-2] in string.punctuation:
continue
if line[i-1] == '.':
continue
if line[i-3] == ' ' and line[i-2] == 'a' and line[i-1] == 'l':
continue
modline+=line[i]
modline = re.sub(r'\.+', ".", modline)
return modline
def check_if_sentence(self, sentence):
if (len(sentence.split())) > 9 or '.' in sentence:
return True
return False
def get_abstract(self, text):
abstract_start = 0
abstract_end = len(text)
for i in range(0, len(text)):
if len(text[i].split()) > 0:
words = text[i].split()
if words[0].lower() == 'abstract':
abstract_start = i
for j in range(i+1, len(text)):
if len(text[j]) == 0 and j > i+5:
abstract_end = j
break
break
return abstract_start, abstract_end
def useful_index(self, text):
start = 0
end = len(text)
for i in range(0, len(text)):
if len(text[i].split()) > 0:
words = text[i].split()
if words[0].lower() in ['bibliography','references']:
if i < end:
end = i
if words[0].lower() in ['introduction', '1 introduction', '1. introduction', '1.introduction']:
start = i
if words[0].lower() in ['acknowledgement', 'acknowledgements']:
if i < end:
end = i
return start, end
def get_line_sentences(self, text, i):
mytext = self.remove_useless_dots(text[i])
if self.check_if_sentence(mytext):
splits = mytext.split('.')
for j in range(len(splits)):
if j+1 < len(splits):
splits[j] = splits[j]+'. '
if j == len(splits)-1:
splits[j] = splits[j].removesuffix('-')
return splits, i+1
else:
return [], i+1
def parts_to_sentences(self, parts):
sentences = []
sentence = ''
for part in parts:
sentence += part
if '.' in sentence:
sentences.append(sentence)
sentence = ''
return sentences
def get_pdf_content(self, item, df):
for rsrc in tqdm(item['resources']):
response = requests.get(rsrc['url'])
if 'application/pdf' in response.headers.get('content-type'):
urllib.request.urlretrieve(rsrc['url'], self.directory + "janet.pdf")
pdf = pdfquery.PDFQuery(self.directory + "janet.pdf")
pdf.load()
#pages = pdf.pq('LTPage')
text = []
for i, el in enumerate(pdf.tree.getiterator()):
if el.tag == 'LTTextLineHorizontal' or el.tag == 'LTTextBoxHorizontal':
text.append(el.text)
paragraphs = []
parts = []
i, end = self.useful_index(text)
while i < end:
sent, i = self.get_line_sentences(text, i)
for part in sent:
if part!='':
x = part
if len(part) > 1 and part[0] == ' ':
x = part[1:]
if len(part) > 2 and part[1] == ' ':
x = part[2:]
parts.append(x)
sentences = self.parts_to_sentences(parts)
for i in range(0, len(sentences)-4, 5):
paragraph = sentences[i] + sentences[i+1] + sentences[i+2] + sentences[i+3] + sentences[i+4]
paragraphs.append(paragraph)
for paragraph in tqdm(paragraphs):
self.content_counter += 1
df.loc[str(self.content_counter)] = [self.content_counter, self.paper_counter, paragraph]
start, end = self.get_abstract(text)
abstract = ''
for i in range(start, end):
abstract += text[i]
self.content_counter += 1
df.loc[str(self.content_counter)] = [self.content_counter, self.paper_counter, abstract]
return df
def get_txt_content(self, item, df):
for rsrc in tqdm(item['resources']):
response = requests.get(rsrc['url'])
if 'text/plain' in response.headers.get('content-type'):
content = response.text
content = self.remove_useless_dots(content)
sentences = content.split('.')
paragraphs = []
for i in range(0, len(sentences)-4, 5):
paragraph = sentences[i] + '. ' + sentences[i+1]+ '. ' + sentences[i+2]+ '. ' + sentences[i+3] + '. ' + sentences[i+4]+ '. '
paragraphs.append(paragraph)
for paragraph in tqdm(paragraphs):
self.content_counter += 1
df.loc[str(self.content_counter)] = [self.content_counter, self.paper_counter, paragraph]
return df
def get_db(self):
return self.db
def get_index(self):
return self.index