453 lines
23 KiB
Python
453 lines
23 KiB
Python
from datetime import datetime
|
|
import pandas as pd
|
|
import string
|
|
import re
|
|
import requests
|
|
import os
|
|
from io import BytesIO
|
|
from tqdm.auto import tqdm
|
|
import numpy as np
|
|
import math
|
|
import faiss
|
|
import pdfquery
|
|
import urllib.request
|
|
import time
|
|
import threading
|
|
import html2text
|
|
from datasets import Dataset
|
|
|
|
class VRE:
|
|
def __init__(self, name, token, retriever, directory='/app/'):
|
|
self.name = name
|
|
self.token = token
|
|
self.catalogue_url = 'https://api.d4science.org/catalogue/items/'
|
|
self.socialnetwork_url = 'https://api.d4science.org/rest/2/posts/get-posts-vre/'
|
|
self.headers = {"gcube-token": self.token, "Accept": "application/json"}
|
|
self.lastupdatetime = datetime.strptime('2021-01-01T00:00:00.000000', '%Y-%m-%dT%H:%M:%S.%f').timestamp()
|
|
self.postlastupdate = datetime.strptime('2021-01-01T00:00:00.000000', '%Y-%m-%dT%H:%M:%S.%f').timestamp()
|
|
self.retriever = retriever
|
|
self.directory = directory
|
|
self.post_counter = 0
|
|
self.paper_counter = 0
|
|
self.dataset_counter = 0
|
|
self.content_counter = 0
|
|
self.db = {'paper_db': pd.read_json(self.directory + self.name + '_paper.json') if os.path.isfile(self.directory + self.name + '_paper.json') else pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created', 'url']),
|
|
'dataset_db': pd.read_json(self.directory + self.name + '_dataset.json') if os.path.isfile(self.directory + self.name + '_dataset.json') else pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created', 'url']),
|
|
'content_db': pd.read_json(self.directory + self.name + '_content.json') if os.path.isfile(self.directory + self.name + '_content.json') else pd.DataFrame(columns=['id', 'paperid', 'content']),
|
|
'post_db': pd.read_json(self.directory + self.name + '_post.json') if os.path.isfile(self.directory + self.name + '_post.json') else pd.DataFrame(columns=['id', 'author', 'content', 'time', 'tags'])}
|
|
self.index = {'dataset_titles_index': None if not os.path.isfile(self.directory + 'janet_dataset_titles_index') else Dataset.load_from_disk(self.directory + 'janet_dataset_titles_index'),
|
|
'paper_titles_index': None if not os.path.isfile(self.directory + 'janet_paper_titles_index') else Dataset.load_from_disk(self.directory + 'janet_paper_titles_index'),
|
|
'dataset_desc_index': None if not os.path.isfile(self.directory + 'janet_dataset_desc_index') else Dataset.load_from_disk(self.directory + 'janet_dataset_desc_index'),
|
|
'paper_desc_index': None if not os.path.isfile(self.directory + 'janet_paper_desc_index') else Dataset.load_from_disk(self.directory + 'janet_paper_desc_index'),
|
|
'content_index': None if not os.path.isfile(self.directory + 'janet_content_index') else Dataset.load_from_disk(self.directory + 'janet_content_index'),
|
|
'post_index': None if not os.path.isfile(self.directory + 'janet_post_index') else Dataset.load_from_disk(self.directory + 'janet_post_index')}
|
|
self.new_income = False
|
|
|
|
def init(self):
|
|
#first run
|
|
self.get_content()
|
|
if self.index['dataset_titles_index'] is None:
|
|
self.create_index('dataset_db', 'title', 'dataset_titles_index', 'janet_dataset_titles_index')
|
|
#self.populate_index('dataset_db', 'title', 'dataset_titles_index', 'janet_dataset_titles_index')
|
|
|
|
if self.index['dataset_desc_index'] is None:
|
|
self.create_index('dataset_db', 'notes', 'dataset_desc_index', 'janet_dataset_desc_index')
|
|
#self.populate_index('dataset_db', 'notes', 'dataset_desc_index', 'janet_dataset_desc_index')
|
|
|
|
if self.index['paper_titles_index'] is None:
|
|
self.create_index('paper_db', 'title', 'paper_titles_index', 'janet_paper_titles_index')
|
|
#self.populate_index('paper_db', 'title', 'paper_titles_index', 'janet_paper_titles_index')
|
|
|
|
if self.index['paper_desc_index'] is None:
|
|
self.create_index('paper_db', 'notes', 'paper_desc_index', 'janet_paper_desc_index')
|
|
#self.populate_index('paper_db', 'notes', 'paper_desc_index', 'janet_paper_desc_index')
|
|
|
|
if self.index['content_index'] is None:
|
|
self.create_index('content_db', 'content', 'content_index', 'janet_content_index')
|
|
#self.populate_index('content_db', 'content', 'content_index', 'janet_content_index')
|
|
|
|
if self.index['post_index'] is None:
|
|
self.create_index('post_db', 'content', 'post_index', 'janet_post_index')
|
|
#self.populate_index('post_db', 'content', 'post_index', 'janet_post_index')
|
|
|
|
def index_periodic_update(self):
|
|
if self.new_income:
|
|
if len(self.db['content_db'])%100 != 0:
|
|
self.create_index('content_db', 'content', 'content_index', 'janet_content_index')
|
|
#self.populate_index('content_db', 'content', 'content_index', 'janet_content_index')
|
|
if len(self.db['post_db'])%100 != 0:
|
|
self.create_index('post_db', 'content', 'post_index', 'janet_post_index')
|
|
#self.populate_index('post_db', 'content', 'post_index', 'janet_post_index')
|
|
if len(self.db['paper_db'])%100 != 0:
|
|
self.create_index('paper_db', 'title', 'paper_titles_index', 'janet_paper_titles_index')
|
|
self.create_index('paper_db', 'notes', 'paper_desc_index', 'janet_paper_desc_index')
|
|
#self.populate_index('paper_db', 'title', 'paper_titles_index', 'janet_paper_titles_index')
|
|
#self.populate_index('paper_db', 'notes', 'paper_desc_index', 'janet_paper_desc_index')
|
|
if len(self.db['dataset_db'])%100 != 0:
|
|
self.create_index('dataset_db', 'title', 'dataset_titles_index', 'janet_dataset_titles_index')
|
|
self.create_index('dataset_db', 'notes', 'dataset_desc_index', 'janet_dataset_desc_index')
|
|
#self.populate_index('dataset_db', 'title', 'dataset_titles_index', 'janet_dataset_titles_index')
|
|
#self.populate_index('dataset_db', 'notes', 'dataset_desc_index', 'janet_dataset_desc_index')
|
|
self.new_income = False
|
|
|
|
def create_index(self, db_type, attribute, index_type, filename):
|
|
filename = self.directory + filename
|
|
to_index = self.db[db_type][attribute]
|
|
|
|
dataset = Dataset.from_pandas(self.db[db_type])
|
|
embeddings_dataset = dataset.map(
|
|
lambda x: {"embeddings": self.retriever.encode([x[attribute]])[0]}
|
|
)
|
|
embeddings_dataset.save_to_disk(filename)
|
|
self.index[index_type] = embeddings_dataset
|
|
#faiss.write_index(self.index[index_type], filename)
|
|
|
|
def populate_index(self, db_type, attribute, index_type, filename):
|
|
filename = self.directory + filename
|
|
to_index = self.db[db_type][attribute]
|
|
for info in to_index:
|
|
sentence_embedding = np.array(self.retriever.encode([info]))
|
|
self.index[index_type].add(sentence_embedding)
|
|
faiss.write_index(self.index[index_type], filename)
|
|
|
|
def get_content(self):
|
|
h = html2text.HTML2Text()
|
|
h.ignore_links = True
|
|
#posts
|
|
posts = requests.get(self.socialnetwork_url, headers=self.headers)
|
|
posts = posts.json()['result']
|
|
post_df = pd.DataFrame(columns=['id', 'author', 'content', 'time', 'tags'])
|
|
|
|
for post in posts:
|
|
author = post['full_name'].lower()
|
|
content = h.handle(post['description']).replace('\n', ' ').lower()
|
|
date = post['time']
|
|
tags = []
|
|
for word in content.split():
|
|
if word[0] == '#':
|
|
tags.append(word[1:])
|
|
if date > self.postlastupdate:
|
|
self.postlastupdate = date
|
|
self.post_counter += 1
|
|
post_df.loc[str(self.post_counter)] = [self.post_counter, author, content, date, tags]
|
|
|
|
#catalog
|
|
response = requests.get(self.catalogue_url, headers=self.headers)
|
|
items = response.json()
|
|
items_data = []
|
|
for item in items:
|
|
api_url = self.catalogue_url + item + '/'
|
|
response = requests.get(api_url, headers=self.headers)
|
|
items_data.append(response.json())
|
|
|
|
keys = ['type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created']
|
|
|
|
paper_df = pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created', 'url'])
|
|
dataset_df = pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created', 'url'])
|
|
content_df = pd.DataFrame(columns=['id', 'paperid', 'content'])
|
|
content_df = self.get_vre_info(content_df)
|
|
|
|
for item in items_data:
|
|
for el in item['extras']:
|
|
if el['key'] == 'system:type':
|
|
rsrc = el['value']
|
|
if el['key'] == 'Item URL':
|
|
url = el['value']
|
|
resources = []
|
|
for resource in item['resources']:
|
|
resources.append(
|
|
{'name': resource['name'].lower(), 'url': resource['url'], 'description': resource['description'].lower()})
|
|
tags = []
|
|
for tag in item['tags']:
|
|
tags.append(tag['name'].lower())
|
|
title = item['title'].lower()
|
|
author = item['author'].lower()
|
|
notes = item['notes'].lower()
|
|
date = datetime.strptime(item['metadata_created'], '%Y-%m-%dT%H:%M:%S.%f').timestamp()
|
|
if date > self.lastupdatetime:
|
|
self.lastupdatetime = date
|
|
if rsrc == 'Paper':
|
|
self.paper_counter += 1
|
|
paper_df.loc[str(self.paper_counter)] = [self.paper_counter, rsrc, resources, tags, title, author, notes, date, url]
|
|
content_df = self.get_pdf_content(item, content_df)
|
|
content_df = self.get_txt_content(item, content_df)
|
|
if rsrc == 'Dataset':
|
|
self.dataset_counter += 1
|
|
dataset_df.loc[str(self.dataset_counter)] = [self.dataset_counter, rsrc, resources, tags, title, author, notes, date, url]
|
|
|
|
self.db['paper_db'] = paper_df.sort_values(by='metadata_created', ascending=True)
|
|
self.db['dataset_db'] = dataset_df.sort_values(by='metadata_created', ascending=True)
|
|
self.db['post_db'] = post_df.sort_values(by='time', ascending=True)
|
|
|
|
#other_content_df = pd.DataFrame(columns=['id', 'paperid', 'content'])
|
|
for i, post in post_df.iterrows():
|
|
if post['author'] != "catalogue":
|
|
self.content_counter+=1
|
|
content_df.loc[str(self.content_counter)] = [self.content_counter, -1, post['author'] + ' posted: ' + post['content'] + ' It is about ' + ', '.join(post['tags'])]
|
|
"""
|
|
for i, description in dataset_df.iterrows():
|
|
self.content_counter+=1
|
|
other_content_df.loc[str(self.content_counter)] = [self.content_counter, -2, description['title'] + ' is a dataset. ' + description['notes'] + ' It is about ' + ', '.join(description['tags']) ]
|
|
for i, description in paper_df.iterrows():
|
|
self.content_counter+=1
|
|
other_content_df.loc[str(self.content_counter)] = [self.content_counter, -3, description['title'] + ' is a paper. ' + description['notes'] + ' It is about ' + ', '.join(description['tags']) ]
|
|
"""
|
|
self.db['content_db'] = content_df
|
|
self.db['paper_db'].to_json(self.directory + self.name + '_paper.json')
|
|
self.db['dataset_db'].to_json(self.directory + self.name + '_dataset.json')
|
|
|
|
self.db['content_db'].to_json(self.directory + self.name + '_content.json')
|
|
self.db['post_db'].to_json(self.directory+self.name+'_post.json')
|
|
|
|
# modify query
|
|
def get_vre_update(self):
|
|
print("Getting new items")
|
|
h = html2text.HTML2Text()
|
|
h.ignore_links = True
|
|
#posts
|
|
posts = requests.get(self.socialnetwork_url, headers=self.headers)
|
|
posts = posts.json()['result']
|
|
new_posts = []
|
|
for post in posts:
|
|
if post['time'] > self.postlastupdate:
|
|
new_posts.append(post)
|
|
|
|
post_df = pd.DataFrame(columns=['id', 'author', 'content', 'time', 'tags'])
|
|
|
|
for post in new_posts:
|
|
author = post['full_name'].lower()
|
|
content = h.handle(post['description']).replace('\n', ' ').lower()
|
|
date = post['time']
|
|
tags = []
|
|
for word in content.split():
|
|
if word[0] == '#':
|
|
tags.append(word[1:])
|
|
if date > self.postlastupdate:
|
|
self.postlastupdate = date
|
|
self.post_counter += 1
|
|
post_df.loc[str(self.post_counter)] = [self.post_counter, author, content, date, tags]
|
|
|
|
#catalog
|
|
response = requests.get(self.catalogue_url, headers=self.headers)
|
|
items = response.json()
|
|
items_data = []
|
|
for item in items:
|
|
api_url = self.catalogue_url + item + '/'
|
|
response = requests.get(api_url, headers=self.headers)
|
|
if datetime.strptime(response.json()['metadata_created'],'%Y-%m-%dT%H:%M:%S.%f').timestamp() > self.lastupdatetime:
|
|
items_data.append(response.json())
|
|
|
|
keys = ['type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created']
|
|
|
|
paper_df = pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created', 'url'])
|
|
dataset_df = pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created', 'url'])
|
|
content_df = pd.DataFrame(columns=['id', 'paperid', 'content'])
|
|
|
|
for item in items_data:
|
|
for el in item['extras']:
|
|
if el['key'] == 'system:type':
|
|
rsrc = el['value']
|
|
if el['key'] == 'Item URL':
|
|
url = el['value']
|
|
resources = []
|
|
for resource in item['resources']:
|
|
resources.append(
|
|
{'name': resource['name'].lower(), 'url': resource['url'], 'description': resource['description'].lower()})
|
|
tags = []
|
|
for tag in item['tags']:
|
|
tags.append(tag['name'].lower())
|
|
title = item['title'].lower()
|
|
author = item['author'].lower()
|
|
notes = item['notes'].lower()
|
|
date = datetime.strptime(item['metadata_created'], '%Y-%m-%dT%H:%M:%S.%f').timestamp()
|
|
if date > self.lastupdatetime:
|
|
self.lastupdatetime = date
|
|
|
|
if rsrc == 'Paper':
|
|
self.paper_counter += 1
|
|
paper_df.loc[str(self.paper_counter)] = [self.paper_counter, rsrc, resources, tags, title, author, notes, date, url]
|
|
content_df = self.get_pdf_content(item, content_df)
|
|
content_df = self.get_txt_content(item, content_df)
|
|
if rsrc == 'Dataset':
|
|
self.dataset_counter += 1
|
|
dataset_df.loc[str(self.dataset_counter)] = [self.dataset_counter, rsrc, resources, tags, title, author, notes, date, url]
|
|
|
|
self.db['paper_db'] = pd.concat([self.db['paper_db'], paper_df.sort_values(by='metadata_created', ascending=True)])
|
|
self.db['dataset_db'] = pd.concat([self.db['dataset_db'], dataset_df.sort_values(by='metadata_created', ascending=True)])
|
|
|
|
|
|
self.db['post_db'] = pd.concat([self.db['post_db'], post_df.sort_values(by='time', ascending=True)])
|
|
self.db['post_db'].to_json(self.directory+self.name+'_post.json')
|
|
|
|
for i, post in post_df.iterrows():
|
|
if post['author'] != "catalogue":
|
|
self.content_counter+=1
|
|
content_df.loc[str(self.content_counter)] = [self.content_counter, -1, post['author'] + ' posted: ' + post['content'] + ' It is about ' + ', '.join(post['tags'])]
|
|
"""
|
|
for i, description in dataset_df.iterrows():
|
|
self.content_counter+=1
|
|
other_content_df.loc[str(self.content_counter)] = [self.content_counter, -2, description['title'] + ' is a dataset. ' + description['notes'] + ' It is about ' + ', '.join(description['tags']) ]
|
|
for i, description in paper_df.iterrows():
|
|
self.content_counter+=1
|
|
other_content_df.loc[str(self.content_counter)] = [self.content_counter, -3, description['title'] + ' is a paper. ' + description['notes'] + ' It is about ' + ', '.join(description['tags']) ]
|
|
"""
|
|
|
|
self.db['paper_db'].to_json(self.directory + self.name + '_paper.json')
|
|
self.db['dataset_db'].to_json(self.directory + self.name + '_dataset.json')
|
|
self.db['content_db'] = pd.concat([self.db['content_db'], content_df])
|
|
self.db['content_db'].to_json(self.directory + self.name + '_content.json')
|
|
if not paper_df.empty or not dataset_df.empty or not content_df.empty or not post_df.empty:
|
|
self.new_income = True
|
|
|
|
def remove_suffix(self, input_string, suffix):
|
|
if suffix and input_string.endswith(suffix):
|
|
return input_string[:-len(suffix)]
|
|
return input_string
|
|
|
|
def remove_useless_dots(self, line):
|
|
modline = ''
|
|
for i in range(0, len(line)):
|
|
if line[i] != '.':
|
|
modline+=line[i]
|
|
if line[i] == '.':
|
|
if line[i-2] == ' ' or line[i-2] in string.punctuation:
|
|
continue
|
|
if line[i-1] == '.':
|
|
continue
|
|
if line[i-3] == ' ' and line[i-2] == 'a' and line[i-1] == 'l':
|
|
continue
|
|
modline+=line[i]
|
|
modline = re.sub(r'\.+', ".", modline)
|
|
modline = re.sub("\[.*?\]","", modline)
|
|
return modline
|
|
|
|
def check_if_sentence(self, sentence):
|
|
if (len(sentence.split())) > 9 or '.' in sentence:
|
|
return True
|
|
return False
|
|
|
|
def get_abstract(self, text):
|
|
abstract_start = 0
|
|
abstract_end = len(text)
|
|
for i in range(0, len(text)):
|
|
if len(text[i].split()) > 0:
|
|
words = text[i].split()
|
|
if words[0].lower() == 'abstract':
|
|
abstract_start = i
|
|
for j in range(i+1, len(text)):
|
|
if len(text[j]) == 0 and j > i+5:
|
|
abstract_end = j
|
|
break
|
|
break
|
|
return abstract_start, abstract_end
|
|
|
|
def useful_index(self, text):
|
|
start = 0
|
|
end = len(text)
|
|
for i in range(0, len(text)):
|
|
if len(text[i].split()) > 0:
|
|
words = text[i].split()
|
|
if words[0].lower() in ['bibliography','references']:
|
|
if i < end:
|
|
end = i
|
|
if words[0].lower() in ['introduction', '1 introduction', '1. introduction', '1.introduction']:
|
|
start = i
|
|
if words[0].lower() in ['acknowledgement', 'acknowledgements']:
|
|
if i < end:
|
|
end = i
|
|
return start, end
|
|
|
|
def get_line_sentences(self, text, i):
|
|
mytext = self.remove_useless_dots(text[i])
|
|
if self.check_if_sentence(mytext):
|
|
splits = mytext.split('.')
|
|
for j in range(len(splits)):
|
|
if j+1 < len(splits):
|
|
splits[j] = splits[j]+'. '
|
|
if j == len(splits)-1:
|
|
splits[j] = self.remove_suffix(splits[j], '-') #splits[j].removesuffix('-')
|
|
return splits, i+1
|
|
else:
|
|
return [], i+1
|
|
|
|
def parts_to_sentences(self, parts):
|
|
sentences = []
|
|
sentence = ''
|
|
for part in parts:
|
|
sentence += part
|
|
if '.' in sentence:
|
|
sentences.append(sentence)
|
|
sentence = ''
|
|
return sentences
|
|
|
|
def get_pdf_content(self, item, df):
|
|
for rsrc in tqdm(item['resources']):
|
|
response = requests.get(rsrc['url'])
|
|
if 'application/pdf' in response.headers.get('content-type'):
|
|
urllib.request.urlretrieve(rsrc['url'], self.directory + "janet.pdf")
|
|
pdf = pdfquery.PDFQuery(self.directory + "janet.pdf")
|
|
pdf.load()
|
|
#pages = pdf.pq('LTPage')
|
|
text = []
|
|
|
|
for i, el in enumerate(pdf.tree.getiterator()):
|
|
if el.tag == 'LTTextLineHorizontal' or el.tag == 'LTTextBoxHorizontal':
|
|
text.append(el.text)
|
|
|
|
paragraphs = []
|
|
parts = []
|
|
i, end = self.useful_index(text)
|
|
while i < end:
|
|
sent, i = self.get_line_sentences(text, i)
|
|
for part in sent:
|
|
if part!='':
|
|
x = part
|
|
if len(part) > 1 and part[0] == ' ':
|
|
x = part[1:]
|
|
if len(part) > 2 and part[1] == ' ':
|
|
x = part[2:]
|
|
parts.append(x)
|
|
sentences = self.parts_to_sentences(parts)
|
|
for i in range(0, len(sentences)-4, 5):
|
|
paragraph = sentences[i] + sentences[i+1] + sentences[i+2] + sentences[i+3] + sentences[i+4]
|
|
paragraphs.append(paragraph)
|
|
for paragraph in tqdm(paragraphs):
|
|
self.content_counter += 1
|
|
df.loc[str(self.content_counter)] = [self.content_counter, self.paper_counter, paragraph]
|
|
|
|
start, end = self.get_abstract(text)
|
|
abstract = ''
|
|
for i in range(start, end):
|
|
abstract += text[i]
|
|
self.content_counter += 1
|
|
df.loc[str(self.content_counter)] = [self.content_counter, self.paper_counter, abstract]
|
|
return df
|
|
|
|
def get_vre_info(self, df):
|
|
with open('info.txt', 'r') as file:
|
|
content = file.read().replace('\n', ' ')
|
|
content = self.remove_useless_dots(content)
|
|
self.content_counter += 1
|
|
df.loc[str(self.content_counter)] = [self.content_counter, -6, content]
|
|
return df
|
|
|
|
def get_txt_content(self, item, df):
|
|
for rsrc in tqdm(item['resources']):
|
|
response = requests.get(rsrc['url'])
|
|
if 'text/plain' in response.headers.get('content-type'):
|
|
content = response.text
|
|
content = self.remove_useless_dots(content)
|
|
sentences = content.split('.')
|
|
paragraphs = []
|
|
for i in range(0, len(sentences)-4, 5):
|
|
paragraph = sentences[i] + '. ' + sentences[i+1]+ '. ' + sentences[i+2]+ '. ' + sentences[i+3] + '. ' + sentences[i+4]+ '. '
|
|
paragraphs.append(paragraph)
|
|
for paragraph in tqdm(paragraphs):
|
|
self.content_counter += 1
|
|
df.loc[str(self.content_counter)] = [self.content_counter, self.paper_counter, paragraph]
|
|
return df
|
|
def get_db(self):
|
|
return self.db
|
|
def get_index(self):
|
|
return self.index
|