JanetBackEnd/VRE.py

453 lines
23 KiB
Python

from datetime import datetime
import pandas as pd
import string
import re
import requests
import os
from io import BytesIO
from tqdm.auto import tqdm
import numpy as np
import math
import faiss
import pdfquery
import urllib.request
import time
import threading
import html2text
from datasets import Dataset
class VRE:
def __init__(self, name, token, retriever, directory='/app/'):
self.name = name
self.token = token
self.catalogue_url = 'https://api.d4science.org/catalogue/items/'
self.socialnetwork_url = 'https://api.d4science.org/rest/2/posts/get-posts-vre/'
self.headers = {"gcube-token": self.token, "Accept": "application/json"}
self.lastupdatetime = datetime.strptime('2021-01-01T00:00:00.000000', '%Y-%m-%dT%H:%M:%S.%f').timestamp()
self.postlastupdate = datetime.strptime('2021-01-01T00:00:00.000000', '%Y-%m-%dT%H:%M:%S.%f').timestamp()
self.retriever = retriever
self.directory = directory
self.post_counter = 0
self.paper_counter = 0
self.dataset_counter = 0
self.content_counter = 0
self.db = {'paper_db': pd.read_json(self.directory + self.name + '_paper.json') if os.path.isfile(self.directory + self.name + '_paper.json') else pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created', 'url']),
'dataset_db': pd.read_json(self.directory + self.name + '_dataset.json') if os.path.isfile(self.directory + self.name + '_dataset.json') else pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created', 'url']),
'content_db': pd.read_json(self.directory + self.name + '_content.json') if os.path.isfile(self.directory + self.name + '_content.json') else pd.DataFrame(columns=['id', 'paperid', 'content']),
'post_db': pd.read_json(self.directory + self.name + '_post.json') if os.path.isfile(self.directory + self.name + '_post.json') else pd.DataFrame(columns=['id', 'author', 'content', 'time', 'tags'])}
self.index = {'dataset_titles_index': None if not os.path.isfile(self.directory + 'janet_dataset_titles_index') else Dataset.load_from_disk(self.directory + 'janet_dataset_titles_index'),
'paper_titles_index': None if not os.path.isfile(self.directory + 'janet_paper_titles_index') else Dataset.load_from_disk(self.directory + 'janet_paper_titles_index'),
'dataset_desc_index': None if not os.path.isfile(self.directory + 'janet_dataset_desc_index') else Dataset.load_from_disk(self.directory + 'janet_dataset_desc_index'),
'paper_desc_index': None if not os.path.isfile(self.directory + 'janet_paper_desc_index') else Dataset.load_from_disk(self.directory + 'janet_paper_desc_index'),
'content_index': None if not os.path.isfile(self.directory + 'janet_content_index') else Dataset.load_from_disk(self.directory + 'janet_content_index'),
'post_index': None if not os.path.isfile(self.directory + 'janet_post_index') else Dataset.load_from_disk(self.directory + 'janet_post_index')}
self.new_income = False
def init(self):
#first run
self.get_content()
if self.index['dataset_titles_index'] is None:
self.create_index('dataset_db', 'title', 'dataset_titles_index', 'janet_dataset_titles_index')
#self.populate_index('dataset_db', 'title', 'dataset_titles_index', 'janet_dataset_titles_index')
if self.index['dataset_desc_index'] is None:
self.create_index('dataset_db', 'notes', 'dataset_desc_index', 'janet_dataset_desc_index')
#self.populate_index('dataset_db', 'notes', 'dataset_desc_index', 'janet_dataset_desc_index')
if self.index['paper_titles_index'] is None:
self.create_index('paper_db', 'title', 'paper_titles_index', 'janet_paper_titles_index')
#self.populate_index('paper_db', 'title', 'paper_titles_index', 'janet_paper_titles_index')
if self.index['paper_desc_index'] is None:
self.create_index('paper_db', 'notes', 'paper_desc_index', 'janet_paper_desc_index')
#self.populate_index('paper_db', 'notes', 'paper_desc_index', 'janet_paper_desc_index')
if self.index['content_index'] is None:
self.create_index('content_db', 'content', 'content_index', 'janet_content_index')
#self.populate_index('content_db', 'content', 'content_index', 'janet_content_index')
if self.index['post_index'] is None:
self.create_index('post_db', 'content', 'post_index', 'janet_post_index')
#self.populate_index('post_db', 'content', 'post_index', 'janet_post_index')
def index_periodic_update(self):
if self.new_income:
if len(self.db['content_db'])%100 != 0:
self.create_index('content_db', 'content', 'content_index', 'janet_content_index')
#self.populate_index('content_db', 'content', 'content_index', 'janet_content_index')
if len(self.db['post_db'])%100 != 0:
self.create_index('post_db', 'content', 'post_index', 'janet_post_index')
#self.populate_index('post_db', 'content', 'post_index', 'janet_post_index')
if len(self.db['paper_db'])%100 != 0:
self.create_index('paper_db', 'title', 'paper_titles_index', 'janet_paper_titles_index')
self.create_index('paper_db', 'notes', 'paper_desc_index', 'janet_paper_desc_index')
#self.populate_index('paper_db', 'title', 'paper_titles_index', 'janet_paper_titles_index')
#self.populate_index('paper_db', 'notes', 'paper_desc_index', 'janet_paper_desc_index')
if len(self.db['dataset_db'])%100 != 0:
self.create_index('dataset_db', 'title', 'dataset_titles_index', 'janet_dataset_titles_index')
self.create_index('dataset_db', 'notes', 'dataset_desc_index', 'janet_dataset_desc_index')
#self.populate_index('dataset_db', 'title', 'dataset_titles_index', 'janet_dataset_titles_index')
#self.populate_index('dataset_db', 'notes', 'dataset_desc_index', 'janet_dataset_desc_index')
self.new_income = False
def create_index(self, db_type, attribute, index_type, filename):
filename = self.directory + filename
to_index = self.db[db_type][attribute]
dataset = Dataset.from_pandas(self.db[db_type])
embeddings_dataset = dataset.map(
lambda x: {"embeddings": self.retriever.encode([x[attribute]])[0]}
)
embeddings_dataset.save_to_disk(filename)
self.index[index_type] = embeddings_dataset
#faiss.write_index(self.index[index_type], filename)
def populate_index(self, db_type, attribute, index_type, filename):
filename = self.directory + filename
to_index = self.db[db_type][attribute]
for info in to_index:
sentence_embedding = np.array(self.retriever.encode([info]))
self.index[index_type].add(sentence_embedding)
faiss.write_index(self.index[index_type], filename)
def get_content(self):
h = html2text.HTML2Text()
h.ignore_links = True
#posts
posts = requests.get(self.socialnetwork_url, headers=self.headers)
posts = posts.json()['result']
post_df = pd.DataFrame(columns=['id', 'author', 'content', 'time', 'tags'])
for post in posts:
author = post['full_name'].lower()
content = h.handle(post['description']).replace('\n', ' ').lower()
date = post['time']
tags = []
for word in content.split():
if word[0] == '#':
tags.append(word[1:])
if date > self.postlastupdate:
self.postlastupdate = date
self.post_counter += 1
post_df.loc[str(self.post_counter)] = [self.post_counter, author, content, date, tags]
#catalog
response = requests.get(self.catalogue_url, headers=self.headers)
items = response.json()
items_data = []
for item in items:
api_url = self.catalogue_url + item + '/'
response = requests.get(api_url, headers=self.headers)
items_data.append(response.json())
keys = ['type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created']
paper_df = pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created', 'url'])
dataset_df = pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created', 'url'])
content_df = pd.DataFrame(columns=['id', 'paperid', 'content'])
content_df = self.get_vre_info(content_df)
for item in items_data:
for el in item['extras']:
if el['key'] == 'system:type':
rsrc = el['value']
if el['key'] == 'Item URL':
url = el['value']
resources = []
for resource in item['resources']:
resources.append(
{'name': resource['name'].lower(), 'url': resource['url'], 'description': resource['description'].lower()})
tags = []
for tag in item['tags']:
tags.append(tag['name'].lower())
title = item['title'].lower()
author = item['author'].lower()
notes = item['notes'].lower()
date = datetime.strptime(item['metadata_created'], '%Y-%m-%dT%H:%M:%S.%f').timestamp()
if date > self.lastupdatetime:
self.lastupdatetime = date
if rsrc == 'Paper':
self.paper_counter += 1
paper_df.loc[str(self.paper_counter)] = [self.paper_counter, rsrc, resources, tags, title, author, notes, date, url]
content_df = self.get_pdf_content(item, content_df)
content_df = self.get_txt_content(item, content_df)
if rsrc == 'Dataset':
self.dataset_counter += 1
dataset_df.loc[str(self.dataset_counter)] = [self.dataset_counter, rsrc, resources, tags, title, author, notes, date, url]
self.db['paper_db'] = paper_df.sort_values(by='metadata_created', ascending=True)
self.db['dataset_db'] = dataset_df.sort_values(by='metadata_created', ascending=True)
self.db['post_db'] = post_df.sort_values(by='time', ascending=True)
#other_content_df = pd.DataFrame(columns=['id', 'paperid', 'content'])
for i, post in post_df.iterrows():
if post['author'] != "catalogue":
self.content_counter+=1
content_df.loc[str(self.content_counter)] = [self.content_counter, -1, post['author'] + ' posted: ' + post['content'] + ' It is about ' + ', '.join(post['tags'])]
"""
for i, description in dataset_df.iterrows():
self.content_counter+=1
other_content_df.loc[str(self.content_counter)] = [self.content_counter, -2, description['title'] + ' is a dataset. ' + description['notes'] + ' It is about ' + ', '.join(description['tags']) ]
for i, description in paper_df.iterrows():
self.content_counter+=1
other_content_df.loc[str(self.content_counter)] = [self.content_counter, -3, description['title'] + ' is a paper. ' + description['notes'] + ' It is about ' + ', '.join(description['tags']) ]
"""
self.db['content_db'] = content_df
self.db['paper_db'].to_json(self.directory + self.name + '_paper.json')
self.db['dataset_db'].to_json(self.directory + self.name + '_dataset.json')
self.db['content_db'].to_json(self.directory + self.name + '_content.json')
self.db['post_db'].to_json(self.directory+self.name+'_post.json')
# modify query
def get_vre_update(self):
print("Getting new items")
h = html2text.HTML2Text()
h.ignore_links = True
#posts
posts = requests.get(self.socialnetwork_url, headers=self.headers)
posts = posts.json()['result']
new_posts = []
for post in posts:
if post['time'] > self.postlastupdate:
new_posts.append(post)
post_df = pd.DataFrame(columns=['id', 'author', 'content', 'time', 'tags'])
for post in new_posts:
author = post['full_name'].lower()
content = h.handle(post['description']).replace('\n', ' ').lower()
date = post['time']
tags = []
for word in content.split():
if word[0] == '#':
tags.append(word[1:])
if date > self.postlastupdate:
self.postlastupdate = date
self.post_counter += 1
post_df.loc[str(self.post_counter)] = [self.post_counter, author, content, date, tags]
#catalog
response = requests.get(self.catalogue_url, headers=self.headers)
items = response.json()
items_data = []
for item in items:
api_url = self.catalogue_url + item + '/'
response = requests.get(api_url, headers=self.headers)
if datetime.strptime(response.json()['metadata_created'],'%Y-%m-%dT%H:%M:%S.%f').timestamp() > self.lastupdatetime:
items_data.append(response.json())
keys = ['type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created']
paper_df = pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created', 'url'])
dataset_df = pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created', 'url'])
content_df = pd.DataFrame(columns=['id', 'paperid', 'content'])
for item in items_data:
for el in item['extras']:
if el['key'] == 'system:type':
rsrc = el['value']
if el['key'] == 'Item URL':
url = el['value']
resources = []
for resource in item['resources']:
resources.append(
{'name': resource['name'].lower(), 'url': resource['url'], 'description': resource['description'].lower()})
tags = []
for tag in item['tags']:
tags.append(tag['name'].lower())
title = item['title'].lower()
author = item['author'].lower()
notes = item['notes'].lower()
date = datetime.strptime(item['metadata_created'], '%Y-%m-%dT%H:%M:%S.%f').timestamp()
if date > self.lastupdatetime:
self.lastupdatetime = date
if rsrc == 'Paper':
self.paper_counter += 1
paper_df.loc[str(self.paper_counter)] = [self.paper_counter, rsrc, resources, tags, title, author, notes, date, url]
content_df = self.get_pdf_content(item, content_df)
content_df = self.get_txt_content(item, content_df)
if rsrc == 'Dataset':
self.dataset_counter += 1
dataset_df.loc[str(self.dataset_counter)] = [self.dataset_counter, rsrc, resources, tags, title, author, notes, date, url]
self.db['paper_db'] = pd.concat([self.db['paper_db'], paper_df.sort_values(by='metadata_created', ascending=True)])
self.db['dataset_db'] = pd.concat([self.db['dataset_db'], dataset_df.sort_values(by='metadata_created', ascending=True)])
self.db['post_db'] = pd.concat([self.db['post_db'], post_df.sort_values(by='time', ascending=True)])
self.db['post_db'].to_json(self.directory+self.name+'_post.json')
for i, post in post_df.iterrows():
if post['author'] != "catalogue":
self.content_counter+=1
content_df.loc[str(self.content_counter)] = [self.content_counter, -1, post['author'] + ' posted: ' + post['content'] + ' It is about ' + ', '.join(post['tags'])]
"""
for i, description in dataset_df.iterrows():
self.content_counter+=1
other_content_df.loc[str(self.content_counter)] = [self.content_counter, -2, description['title'] + ' is a dataset. ' + description['notes'] + ' It is about ' + ', '.join(description['tags']) ]
for i, description in paper_df.iterrows():
self.content_counter+=1
other_content_df.loc[str(self.content_counter)] = [self.content_counter, -3, description['title'] + ' is a paper. ' + description['notes'] + ' It is about ' + ', '.join(description['tags']) ]
"""
self.db['paper_db'].to_json(self.directory + self.name + '_paper.json')
self.db['dataset_db'].to_json(self.directory + self.name + '_dataset.json')
self.db['content_db'] = pd.concat([self.db['content_db'], content_df])
self.db['content_db'].to_json(self.directory + self.name + '_content.json')
if not paper_df.empty or not dataset_df.empty or not content_df.empty or not post_df.empty:
self.new_income = True
def remove_suffix(self, input_string, suffix):
if suffix and input_string.endswith(suffix):
return input_string[:-len(suffix)]
return input_string
def remove_useless_dots(self, line):
modline = ''
for i in range(0, len(line)):
if line[i] != '.':
modline+=line[i]
if line[i] == '.':
if line[i-2] == ' ' or line[i-2] in string.punctuation:
continue
if line[i-1] == '.':
continue
if line[i-3] == ' ' and line[i-2] == 'a' and line[i-1] == 'l':
continue
modline+=line[i]
modline = re.sub(r'\.+', ".", modline)
modline = re.sub("\[.*?\]","", modline)
return modline
def check_if_sentence(self, sentence):
if (len(sentence.split())) > 9 or '.' in sentence:
return True
return False
def get_abstract(self, text):
abstract_start = 0
abstract_end = len(text)
for i in range(0, len(text)):
if len(text[i].split()) > 0:
words = text[i].split()
if words[0].lower() == 'abstract':
abstract_start = i
for j in range(i+1, len(text)):
if len(text[j]) == 0 and j > i+5:
abstract_end = j
break
break
return abstract_start, abstract_end
def useful_index(self, text):
start = 0
end = len(text)
for i in range(0, len(text)):
if len(text[i].split()) > 0:
words = text[i].split()
if words[0].lower() in ['bibliography','references']:
if i < end:
end = i
if words[0].lower() in ['introduction', '1 introduction', '1. introduction', '1.introduction']:
start = i
if words[0].lower() in ['acknowledgement', 'acknowledgements']:
if i < end:
end = i
return start, end
def get_line_sentences(self, text, i):
mytext = self.remove_useless_dots(text[i])
if self.check_if_sentence(mytext):
splits = mytext.split('.')
for j in range(len(splits)):
if j+1 < len(splits):
splits[j] = splits[j]+'. '
if j == len(splits)-1:
splits[j] = self.remove_suffix(splits[j], '-') #splits[j].removesuffix('-')
return splits, i+1
else:
return [], i+1
def parts_to_sentences(self, parts):
sentences = []
sentence = ''
for part in parts:
sentence += part
if '.' in sentence:
sentences.append(sentence)
sentence = ''
return sentences
def get_pdf_content(self, item, df):
for rsrc in tqdm(item['resources']):
response = requests.get(rsrc['url'])
if 'application/pdf' in response.headers.get('content-type'):
urllib.request.urlretrieve(rsrc['url'], self.directory + "janet.pdf")
pdf = pdfquery.PDFQuery(self.directory + "janet.pdf")
pdf.load()
#pages = pdf.pq('LTPage')
text = []
for i, el in enumerate(pdf.tree.getiterator()):
if el.tag == 'LTTextLineHorizontal' or el.tag == 'LTTextBoxHorizontal':
text.append(el.text)
paragraphs = []
parts = []
i, end = self.useful_index(text)
while i < end:
sent, i = self.get_line_sentences(text, i)
for part in sent:
if part!='':
x = part
if len(part) > 1 and part[0] == ' ':
x = part[1:]
if len(part) > 2 and part[1] == ' ':
x = part[2:]
parts.append(x)
sentences = self.parts_to_sentences(parts)
for i in range(0, len(sentences)-4, 5):
paragraph = sentences[i] + sentences[i+1] + sentences[i+2] + sentences[i+3] + sentences[i+4]
paragraphs.append(paragraph)
for paragraph in tqdm(paragraphs):
self.content_counter += 1
df.loc[str(self.content_counter)] = [self.content_counter, self.paper_counter, paragraph]
start, end = self.get_abstract(text)
abstract = ''
for i in range(start, end):
abstract += text[i]
self.content_counter += 1
df.loc[str(self.content_counter)] = [self.content_counter, self.paper_counter, abstract]
return df
def get_vre_info(self, df):
with open('info.txt', 'r') as file:
content = file.read().replace('\n', ' ')
content = self.remove_useless_dots(content)
self.content_counter += 1
df.loc[str(self.content_counter)] = [self.content_counter, -6, content]
return df
def get_txt_content(self, item, df):
for rsrc in tqdm(item['resources']):
response = requests.get(rsrc['url'])
if 'text/plain' in response.headers.get('content-type'):
content = response.text
content = self.remove_useless_dots(content)
sentences = content.split('.')
paragraphs = []
for i in range(0, len(sentences)-4, 5):
paragraph = sentences[i] + '. ' + sentences[i+1]+ '. ' + sentences[i+2]+ '. ' + sentences[i+3] + '. ' + sentences[i+4]+ '. '
paragraphs.append(paragraph)
for paragraph in tqdm(paragraphs):
self.content_counter += 1
df.loc[str(self.content_counter)] = [self.content_counter, self.paper_counter, paragraph]
return df
def get_db(self):
return self.db
def get_index(self):
return self.index