from datetime import datetime import pandas as pd import string import re import requests import os from io import BytesIO from tqdm.auto import tqdm import numpy as np import math import faiss import pdfquery import urllib.request import time import threading import html2text from datasets import Dataset class VRE: def __init__(self, name, token, retriever, directory='/app/'): self.name = name self.token = token self.catalogue_url = 'https://api.d4science.org/catalogue/items/' self.socialnetwork_url = 'https://api.d4science.org/rest/2/posts/get-posts-vre/' self.headers = {"gcube-token": self.token, "Accept": "application/json"} self.lastupdatetime = datetime.strptime('2021-01-01T00:00:00.000000', '%Y-%m-%dT%H:%M:%S.%f').timestamp() self.postlastupdate = datetime.strptime('2021-01-01T00:00:00.000000', '%Y-%m-%dT%H:%M:%S.%f').timestamp() self.retriever = retriever self.directory = directory self.post_counter = 0 self.paper_counter = 0 self.dataset_counter = 0 self.content_counter = 0 self.db = {'paper_db': pd.read_json(self.directory + self.name + '_paper.json') if os.path.isfile(self.directory + self.name + '_paper.json') else pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created', 'url']), 'dataset_db': pd.read_json(self.directory + self.name + '_dataset.json') if os.path.isfile(self.directory + self.name + '_dataset.json') else pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created', 'url']), 'content_db': pd.read_json(self.directory + self.name + '_content.json') if os.path.isfile(self.directory + self.name + '_content.json') else pd.DataFrame(columns=['id', 'paperid', 'content']), 'post_db': pd.read_json(self.directory + self.name + '_post.json') if os.path.isfile(self.directory + self.name + '_post.json') else pd.DataFrame(columns=['id', 'author', 'content', 'time', 'tags'])} self.index = {'dataset_titles_index': None if not os.path.isfile(self.directory + 'janet_dataset_titles_index') else Dataset.load_from_disk(self.directory + 'janet_dataset_titles_index'), 'paper_titles_index': None if not os.path.isfile(self.directory + 'janet_paper_titles_index') else Dataset.load_from_disk(self.directory + 'janet_paper_titles_index'), 'dataset_desc_index': None if not os.path.isfile(self.directory + 'janet_dataset_desc_index') else Dataset.load_from_disk(self.directory + 'janet_dataset_desc_index'), 'paper_desc_index': None if not os.path.isfile(self.directory + 'janet_paper_desc_index') else Dataset.load_from_disk(self.directory + 'janet_paper_desc_index'), 'content_index': None if not os.path.isfile(self.directory + 'janet_content_index') else Dataset.load_from_disk(self.directory + 'janet_content_index'), 'post_index': None if not os.path.isfile(self.directory + 'janet_post_index') else Dataset.load_from_disk(self.directory + 'janet_post_index')} self.new_income = False def init(self): #first run self.get_content() if self.index['dataset_titles_index'] is None: self.create_index('dataset_db', 'title', 'dataset_titles_index', 'janet_dataset_titles_index') #self.populate_index('dataset_db', 'title', 'dataset_titles_index', 'janet_dataset_titles_index') if self.index['dataset_desc_index'] is None: self.create_index('dataset_db', 'notes', 'dataset_desc_index', 'janet_dataset_desc_index') #self.populate_index('dataset_db', 'notes', 'dataset_desc_index', 'janet_dataset_desc_index') if self.index['paper_titles_index'] is None: self.create_index('paper_db', 'title', 'paper_titles_index', 'janet_paper_titles_index') #self.populate_index('paper_db', 'title', 'paper_titles_index', 'janet_paper_titles_index') if self.index['paper_desc_index'] is None: self.create_index('paper_db', 'notes', 'paper_desc_index', 'janet_paper_desc_index') #self.populate_index('paper_db', 'notes', 'paper_desc_index', 'janet_paper_desc_index') if self.index['content_index'] is None: self.create_index('content_db', 'content', 'content_index', 'janet_content_index') #self.populate_index('content_db', 'content', 'content_index', 'janet_content_index') if self.index['post_index'] is None: self.create_index('post_db', 'content', 'post_index', 'janet_post_index') #self.populate_index('post_db', 'content', 'post_index', 'janet_post_index') def index_periodic_update(self): if self.new_income: if len(self.db['content_db'])%100 != 0: self.create_index('content_db', 'content', 'content_index', 'janet_content_index') #self.populate_index('content_db', 'content', 'content_index', 'janet_content_index') if len(self.db['post_db'])%100 != 0: self.create_index('post_db', 'content', 'post_index', 'janet_post_index') #self.populate_index('post_db', 'content', 'post_index', 'janet_post_index') if len(self.db['paper_db'])%100 != 0: self.create_index('paper_db', 'title', 'paper_titles_index', 'janet_paper_titles_index') self.create_index('paper_db', 'notes', 'paper_desc_index', 'janet_paper_desc_index') #self.populate_index('paper_db', 'title', 'paper_titles_index', 'janet_paper_titles_index') #self.populate_index('paper_db', 'notes', 'paper_desc_index', 'janet_paper_desc_index') if len(self.db['dataset_db'])%100 != 0: self.create_index('dataset_db', 'title', 'dataset_titles_index', 'janet_dataset_titles_index') self.create_index('dataset_db', 'notes', 'dataset_desc_index', 'janet_dataset_desc_index') #self.populate_index('dataset_db', 'title', 'dataset_titles_index', 'janet_dataset_titles_index') #self.populate_index('dataset_db', 'notes', 'dataset_desc_index', 'janet_dataset_desc_index') self.new_income = False def create_index(self, db_type, attribute, index_type, filename): filename = self.directory + filename to_index = self.db[db_type][attribute] dataset = Dataset.from_pandas(self.db[db_type]) embeddings_dataset = dataset.map( lambda x: {"embeddings": self.retriever.encode([x[attribute]])[0]} ) embeddings_dataset.save_to_disk(filename) self.index[index_type] = embeddings_dataset #faiss.write_index(self.index[index_type], filename) def populate_index(self, db_type, attribute, index_type, filename): filename = self.directory + filename to_index = self.db[db_type][attribute] for info in to_index: sentence_embedding = np.array(self.retriever.encode([info])) self.index[index_type].add(sentence_embedding) faiss.write_index(self.index[index_type], filename) def get_content(self): h = html2text.HTML2Text() h.ignore_links = True #posts posts = requests.get(self.socialnetwork_url, headers=self.headers) posts = posts.json()['result'] post_df = pd.DataFrame(columns=['id', 'author', 'content', 'time', 'tags']) for post in posts: author = post['full_name'].lower() content = h.handle(post['description']).replace('\n', ' ').lower() date = post['time'] tags = [] for word in content.split(): if word[0] == '#': tags.append(word[1:]) if date > self.postlastupdate: self.postlastupdate = date self.post_counter += 1 post_df.loc[str(self.post_counter)] = [self.post_counter, author, content, date, tags] #catalog response = requests.get(self.catalogue_url, headers=self.headers) items = response.json() items_data = [] for item in items: api_url = self.catalogue_url + item + '/' response = requests.get(api_url, headers=self.headers) items_data.append(response.json()) keys = ['type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created'] paper_df = pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created', 'url']) dataset_df = pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created', 'url']) content_df = pd.DataFrame(columns=['id', 'paperid', 'content']) content_df = self.get_vre_info(content_df) for item in items_data: for el in item['extras']: if el['key'] == 'system:type': rsrc = el['value'] if el['key'] == 'Item URL': url = el['value'] resources = [] for resource in item['resources']: resources.append( {'name': resource['name'].lower(), 'url': resource['url'], 'description': resource['description'].lower()}) tags = [] for tag in item['tags']: tags.append(tag['name'].lower()) title = item['title'].lower() author = item['author'].lower() notes = item['notes'].lower() date = datetime.strptime(item['metadata_created'], '%Y-%m-%dT%H:%M:%S.%f').timestamp() if date > self.lastupdatetime: self.lastupdatetime = date if rsrc == 'Paper': self.paper_counter += 1 paper_df.loc[str(self.paper_counter)] = [self.paper_counter, rsrc, resources, tags, title, author, notes, date, url] content_df = self.get_pdf_content(item, content_df) content_df = self.get_txt_content(item, content_df) if rsrc == 'Dataset': self.dataset_counter += 1 dataset_df.loc[str(self.dataset_counter)] = [self.dataset_counter, rsrc, resources, tags, title, author, notes, date, url] self.db['paper_db'] = paper_df.sort_values(by='metadata_created', ascending=True) self.db['dataset_db'] = dataset_df.sort_values(by='metadata_created', ascending=True) self.db['post_db'] = post_df.sort_values(by='time', ascending=True) #other_content_df = pd.DataFrame(columns=['id', 'paperid', 'content']) for i, post in post_df.iterrows(): if post['author'] != "catalogue": self.content_counter+=1 content_df.loc[str(self.content_counter)] = [self.content_counter, -1, post['author'] + ' posted: ' + post['content'] + ' It is about ' + ', '.join(post['tags'])] """ for i, description in dataset_df.iterrows(): self.content_counter+=1 other_content_df.loc[str(self.content_counter)] = [self.content_counter, -2, description['title'] + ' is a dataset. ' + description['notes'] + ' It is about ' + ', '.join(description['tags']) ] for i, description in paper_df.iterrows(): self.content_counter+=1 other_content_df.loc[str(self.content_counter)] = [self.content_counter, -3, description['title'] + ' is a paper. ' + description['notes'] + ' It is about ' + ', '.join(description['tags']) ] """ self.db['content_db'] = content_df self.db['paper_db'].to_json(self.directory + self.name + '_paper.json') self.db['dataset_db'].to_json(self.directory + self.name + '_dataset.json') self.db['content_db'].to_json(self.directory + self.name + '_content.json') self.db['post_db'].to_json(self.directory+self.name+'_post.json') # modify query def get_vre_update(self): print("Getting new items") h = html2text.HTML2Text() h.ignore_links = True #posts posts = requests.get(self.socialnetwork_url, headers=self.headers) posts = posts.json()['result'] new_posts = [] for post in posts: if post['time'] > self.postlastupdate: new_posts.append(post) post_df = pd.DataFrame(columns=['id', 'author', 'content', 'time', 'tags']) for post in new_posts: author = post['full_name'].lower() content = h.handle(post['description']).replace('\n', ' ').lower() date = post['time'] tags = [] for word in content.split(): if word[0] == '#': tags.append(word[1:]) if date > self.postlastupdate: self.postlastupdate = date self.post_counter += 1 post_df.loc[str(self.post_counter)] = [self.post_counter, author, content, date, tags] #catalog response = requests.get(self.catalogue_url, headers=self.headers) items = response.json() items_data = [] for item in items: api_url = self.catalogue_url + item + '/' response = requests.get(api_url, headers=self.headers) if datetime.strptime(response.json()['metadata_created'],'%Y-%m-%dT%H:%M:%S.%f').timestamp() > self.lastupdatetime: items_data.append(response.json()) keys = ['type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created'] paper_df = pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created', 'url']) dataset_df = pd.DataFrame(columns=['id', 'type', 'resources', 'tags', 'title', 'author', 'notes', 'metadata_created', 'url']) content_df = pd.DataFrame(columns=['id', 'paperid', 'content']) for item in items_data: for el in item['extras']: if el['key'] == 'system:type': rsrc = el['value'] if el['key'] == 'Item URL': url = el['value'] resources = [] for resource in item['resources']: resources.append( {'name': resource['name'].lower(), 'url': resource['url'], 'description': resource['description'].lower()}) tags = [] for tag in item['tags']: tags.append(tag['name'].lower()) title = item['title'].lower() author = item['author'].lower() notes = item['notes'].lower() date = datetime.strptime(item['metadata_created'], '%Y-%m-%dT%H:%M:%S.%f').timestamp() if date > self.lastupdatetime: self.lastupdatetime = date if rsrc == 'Paper': self.paper_counter += 1 paper_df.loc[str(self.paper_counter)] = [self.paper_counter, rsrc, resources, tags, title, author, notes, date, url] content_df = self.get_pdf_content(item, content_df) content_df = self.get_txt_content(item, content_df) if rsrc == 'Dataset': self.dataset_counter += 1 dataset_df.loc[str(self.dataset_counter)] = [self.dataset_counter, rsrc, resources, tags, title, author, notes, date, url] self.db['paper_db'] = pd.concat([self.db['paper_db'], paper_df.sort_values(by='metadata_created', ascending=True)]) self.db['dataset_db'] = pd.concat([self.db['dataset_db'], dataset_df.sort_values(by='metadata_created', ascending=True)]) self.db['post_db'] = pd.concat([self.db['post_db'], post_df.sort_values(by='time', ascending=True)]) self.db['post_db'].to_json(self.directory+self.name+'_post.json') for i, post in post_df.iterrows(): if post['author'] != "catalogue": self.content_counter+=1 content_df.loc[str(self.content_counter)] = [self.content_counter, -1, post['author'] + ' posted: ' + post['content'] + ' It is about ' + ', '.join(post['tags'])] """ for i, description in dataset_df.iterrows(): self.content_counter+=1 other_content_df.loc[str(self.content_counter)] = [self.content_counter, -2, description['title'] + ' is a dataset. ' + description['notes'] + ' It is about ' + ', '.join(description['tags']) ] for i, description in paper_df.iterrows(): self.content_counter+=1 other_content_df.loc[str(self.content_counter)] = [self.content_counter, -3, description['title'] + ' is a paper. ' + description['notes'] + ' It is about ' + ', '.join(description['tags']) ] """ self.db['paper_db'].to_json(self.directory + self.name + '_paper.json') self.db['dataset_db'].to_json(self.directory + self.name + '_dataset.json') self.db['content_db'] = pd.concat([self.db['content_db'], content_df]) self.db['content_db'].to_json(self.directory + self.name + '_content.json') if not paper_df.empty or not dataset_df.empty or not content_df.empty or not post_df.empty: self.new_income = True def remove_suffix(self, input_string, suffix): if suffix and input_string.endswith(suffix): return input_string[:-len(suffix)] return input_string def remove_useless_dots(self, line): modline = '' for i in range(0, len(line)): if line[i] != '.': modline+=line[i] if line[i] == '.': if line[i-2] == ' ' or line[i-2] in string.punctuation: continue if line[i-1] == '.': continue if line[i-3] == ' ' and line[i-2] == 'a' and line[i-1] == 'l': continue modline+=line[i] modline = re.sub(r'\.+', ".", modline) modline = re.sub("\[.*?\]","", modline) return modline def check_if_sentence(self, sentence): if (len(sentence.split())) > 9 or '.' in sentence: return True return False def get_abstract(self, text): abstract_start = 0 abstract_end = len(text) for i in range(0, len(text)): if len(text[i].split()) > 0: words = text[i].split() if words[0].lower() == 'abstract': abstract_start = i for j in range(i+1, len(text)): if len(text[j]) == 0 and j > i+5: abstract_end = j break break return abstract_start, abstract_end def useful_index(self, text): start = 0 end = len(text) for i in range(0, len(text)): if len(text[i].split()) > 0: words = text[i].split() if words[0].lower() in ['bibliography','references']: if i < end: end = i if words[0].lower() in ['introduction', '1 introduction', '1. introduction', '1.introduction']: start = i if words[0].lower() in ['acknowledgement', 'acknowledgements']: if i < end: end = i return start, end def get_line_sentences(self, text, i): mytext = self.remove_useless_dots(text[i]) if self.check_if_sentence(mytext): splits = mytext.split('.') for j in range(len(splits)): if j+1 < len(splits): splits[j] = splits[j]+'. ' if j == len(splits)-1: splits[j] = self.remove_suffix(splits[j], '-') #splits[j].removesuffix('-') return splits, i+1 else: return [], i+1 def parts_to_sentences(self, parts): sentences = [] sentence = '' for part in parts: sentence += part if '.' in sentence: sentences.append(sentence) sentence = '' return sentences def get_pdf_content(self, item, df): for rsrc in tqdm(item['resources']): response = requests.get(rsrc['url']) if 'application/pdf' in response.headers.get('content-type'): urllib.request.urlretrieve(rsrc['url'], self.directory + "janet.pdf") pdf = pdfquery.PDFQuery(self.directory + "janet.pdf") pdf.load() #pages = pdf.pq('LTPage') text = [] for i, el in enumerate(pdf.tree.getiterator()): if el.tag == 'LTTextLineHorizontal' or el.tag == 'LTTextBoxHorizontal': text.append(el.text) paragraphs = [] parts = [] i, end = self.useful_index(text) while i < end: sent, i = self.get_line_sentences(text, i) for part in sent: if part!='': x = part if len(part) > 1 and part[0] == ' ': x = part[1:] if len(part) > 2 and part[1] == ' ': x = part[2:] parts.append(x) sentences = self.parts_to_sentences(parts) for i in range(0, len(sentences)-4, 5): paragraph = sentences[i] + sentences[i+1] + sentences[i+2] + sentences[i+3] + sentences[i+4] paragraphs.append(paragraph) for paragraph in tqdm(paragraphs): self.content_counter += 1 df.loc[str(self.content_counter)] = [self.content_counter, self.paper_counter, paragraph] start, end = self.get_abstract(text) abstract = '' for i in range(start, end): abstract += text[i] self.content_counter += 1 df.loc[str(self.content_counter)] = [self.content_counter, self.paper_counter, abstract] return df def get_vre_info(self, df): with open('info.txt', 'r') as file: content = file.read().replace('\n', ' ') content = self.remove_useless_dots(content) self.content_counter += 1 df.loc[str(self.content_counter)] = [self.content_counter, -6, content] return df def get_txt_content(self, item, df): for rsrc in tqdm(item['resources']): response = requests.get(rsrc['url']) if 'text/plain' in response.headers.get('content-type'): content = response.text content = self.remove_useless_dots(content) sentences = content.split('.') paragraphs = [] for i in range(0, len(sentences)-4, 5): paragraph = sentences[i] + '. ' + sentences[i+1]+ '. ' + sentences[i+2]+ '. ' + sentences[i+3] + '. ' + sentences[i+4]+ '. ' paragraphs.append(paragraph) for paragraph in tqdm(paragraphs): self.content_counter += 1 df.loc[str(self.content_counter)] = [self.content_counter, self.paper_counter, paragraph] return df def get_db(self): return self.db def get_index(self): return self.index