239 lines
11 KiB
Python
239 lines
11 KiB
Python
import os
|
|
import re
|
|
import warnings
|
|
import faiss
|
|
import torch
|
|
from flask import Flask, render_template, request, jsonify
|
|
from flask_cors import CORS, cross_origin
|
|
import psycopg2
|
|
import spacy
|
|
import requests
|
|
import spacy_transformers
|
|
import torch
|
|
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline
|
|
from User import User
|
|
from VRE import VRE
|
|
from NLU import NLU
|
|
from DM import DM
|
|
from Recommender import Recommender
|
|
from ResponseGenerator import ResponseGenerator
|
|
import pandas as pd
|
|
import time
|
|
import threading
|
|
from sentence_transformers import SentenceTransformer
|
|
|
|
|
|
|
|
app = Flask(__name__)
|
|
url = os.getenv("FRONTEND_URL_WITH_PORT")
|
|
cors = CORS(app, resources={r"/api/predict": {"origins": url},
|
|
r"/api/feedback": {"origins": url},
|
|
r"/api/dm": {"origins": url},
|
|
r"/health": {"origins": "*"}
|
|
})
|
|
users = {}
|
|
|
|
def vre_fetch():
|
|
while True:
|
|
time.sleep(1000)
|
|
print('getting new material')
|
|
#users[token]['args']['vre'].get_vre_update()
|
|
#users[token]['args']['vre'].index_periodic_update()
|
|
#users[token]['args']['rg'].update_index(vre.get_index())
|
|
#users[token]['args']['rg'].update_db(vre.get_db())
|
|
vre.get_vre_update()
|
|
vre.index_periodic_update()
|
|
rg.update_index(vre.get_index())
|
|
rg.update_db(vre.get_db())
|
|
|
|
|
|
def user_interest_decay(token):
|
|
while True:
|
|
if token in users:
|
|
print("decaying interests after 3 minutes for " + users[token]['username'])
|
|
time.sleep(180)
|
|
users[token]['user'].decay_interests()
|
|
else:
|
|
break
|
|
|
|
|
|
def clear_inactive():
|
|
while True:
|
|
time.sleep(1)
|
|
for username in users:
|
|
if users[username]['activity'] > 3600:
|
|
del users[username]
|
|
users[username]['activity'] += 1
|
|
|
|
@app.route("/health", methods=['GET'])
|
|
def health():
|
|
return "Success", 200
|
|
|
|
@app.route("/api/dm", methods=['POST'])
|
|
def init_dm():
|
|
token = request.get_json().get("token")
|
|
status = request.get_json().get("stat")
|
|
if status == "start":
|
|
message = {"stat": "waiting"}
|
|
elif status == "set":
|
|
headers = {"gcube-token": token, "Accept": "application/json"}
|
|
if token not in users:
|
|
url = 'https://api.d4science.org/rest/2/people/profile'
|
|
response = requests.get(url, headers=headers)
|
|
if response.status_code == 200:
|
|
username = response.json()['result']['username']
|
|
name = response.json()['result']['fullname']
|
|
|
|
users[token] = {'username': username, 'name': name, 'dm': DM(), 'activity': 0, 'user': User(username, token)}
|
|
|
|
threading.Thread(target=user_interest_decay, args=(token,), name='decayinterest_'+users[token]['username']).start()
|
|
|
|
message = {"stat": "done"}
|
|
else:
|
|
message = {"stat": "rejected"}
|
|
else:
|
|
message = {"stat": "done"}
|
|
return message
|
|
|
|
@app.route("/api/predict", methods=['POST'])
|
|
def predict():
|
|
text = request.get_json().get("message")
|
|
token = request.get_json().get("token")
|
|
dm = users[token]['dm']
|
|
user = users[token]['user']
|
|
#rg = users[token]['args']['rg']
|
|
#vre = users[token]['args']['vre']
|
|
message = {}
|
|
try:
|
|
if text == "<HELP_ON_START>":
|
|
state = {'help': True, 'inactive': False, 'modified_query':"", 'intent':""}
|
|
dm.update(state)
|
|
action = dm.next_action()
|
|
response = rg.gen_response(action, vrename=vre.name, username=users[token]['username'], name=users[token]['name'].split()[0])
|
|
message = {"answer": response}
|
|
elif text == "<RECOMMEND_ON_IDLE>":
|
|
state = {'help': False, 'inactive': True, 'modified_query':"recommed: ", 'intent':""}
|
|
dm.update(state)
|
|
action = dm.next_action()
|
|
response = rg.gen_response(action, username=users[token]['username'],name=users[token]['name'].split()[0], vrename=vre.name)
|
|
message = {"answer": response}
|
|
new_state = {'modified_query': response}
|
|
dm.update(new_state)
|
|
else:
|
|
state = nlu.process_utterance(text, dm.get_consec_history(), dm.get_sep_history())
|
|
state['help'] = False
|
|
state['inactive'] = False
|
|
old_user_interests = user.get_user_interests()
|
|
old_vre_material = pd.concat([vre.db['paper_db'], vre.db['dataset_db']]).reset_index(drop=True)
|
|
user_interests = []
|
|
for entity in state['entities']:
|
|
if entity['entity'] == 'TOPIC':
|
|
user_interests.append(entity['value'])
|
|
user.update_interests(user_interests)
|
|
new_user_interests = user.get_user_interests()
|
|
new_vre_material = pd.concat([vre.db['paper_db'], vre.db['dataset_db']]).reset_index(drop=True)
|
|
if (new_user_interests != old_user_interests or len(old_vre_material) != len(new_vre_material)):
|
|
rec.generate_recommendations(users[token]['username'], new_user_interests, new_vre_material)
|
|
dm.update(state)
|
|
action = dm.next_action()
|
|
response = rg.gen_response(action=action, utterance=state['modified_query'], state=dm.get_recent_state(), consec_history=dm.get_consec_history(), chitchat_history=dm.get_chitchat_history(), vrename=vre.name, username=users[token]['username'], name=users[token]['name'].split()[0])
|
|
message = {"answer": response, "query": text, "cand": "candidate", "history": dm.get_consec_history(), "modQuery": state['modified_query']}
|
|
if state['intent'] == "QA":
|
|
split_response = response.split("_______ \n ")
|
|
if len(split_response) > 1:
|
|
response = split_response[1]
|
|
new_state = {'modified_query': response, 'intent': state['intent']}
|
|
dm.update(new_state)
|
|
reply = jsonify(message)
|
|
users[token]['dm'] = dm
|
|
users[token]['user'] = user
|
|
users[token]['activity'] = 0
|
|
#users[token]['args']['vre'] = vre
|
|
#users[token]['args']['rg'] = rg
|
|
return reply
|
|
except Exception as e:
|
|
message = {"answer": str(e), "query": "", "cand": "candidate", "history": "", "modQuery": ""}
|
|
return jsonify(message)
|
|
|
|
|
|
|
|
@app.route('/api/feedback', methods = ['POST'])
|
|
def feedback():
|
|
data = request.get_json().get("feedback")
|
|
print(data)
|
|
|
|
try:
|
|
conn = psycopg2.connect(host="janet-pg", database=os.getenv("POSTGRES_DB"), user=os.getenv("POSTGRES_USER"), password=os.getenv("POSTGRES_PASSWORD"))
|
|
cur = conn.cursor()
|
|
cur.execute('INSERT INTO feedback_experimental (query, history, janet_modified_query, is_modified_query_correct, user_modified_query, evidence_useful, response, preferred_response, response_length_feedback, response_fluency_feedback, response_truth_feedback, response_useful_feedback, response_time_feedback, response_intent) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)',
|
|
(data['query'], data['history'], data['modQuery'],
|
|
data['queryModCorrect'], data['correctQuery'], data['evidence'], data['janetResponse'], data['preferredResponse'], data['length'],
|
|
data['fluency'], data['truthfulness'], data['usefulness'],
|
|
data['speed'], data['intent']))
|
|
conn.commit()
|
|
cur.close()
|
|
|
|
reply = jsonify({"status": "done"})
|
|
return reply
|
|
|
|
except Exception as e:
|
|
return jsonify({"status": str(e)})
|
|
|
|
if __name__ == "__main__":
|
|
warnings.filterwarnings("ignore")
|
|
device = "cuda" if torch.cuda.is_available() else "cpu"
|
|
device_flag = torch.cuda.current_device() if torch.cuda.is_available() else -1
|
|
|
|
query_rewriter = pipeline("text2text-generation", model="castorini/t5-base-canard")
|
|
intent_classifier = pipeline("sentiment-analysis", model='/models/intent_classifier', device=device_flag)
|
|
entity_extractor = spacy.load("/models/entity_extractor")
|
|
offensive_classifier = pipeline("sentiment-analysis", model='/models/offensive_classifier', device=device_flag)
|
|
ambig_classifier = pipeline("sentiment-analysis", model='/models/ambig_classifier', device=device_flag)
|
|
coref_resolver = spacy.load("en_coreference_web_trf")
|
|
|
|
nlu = NLU(query_rewriter, coref_resolver, intent_classifier, offensive_classifier, entity_extractor, ambig_classifier)
|
|
|
|
#load retriever and generator
|
|
retriever = SentenceTransformer('/models/retriever/').to(device)
|
|
qa_generator = pipeline("text2text-generation", model="/models/train_qa", device=device_flag)
|
|
summ_generator = pipeline("text2text-generation", model="/models/train_summ", device=device_flag)
|
|
chat_generator = pipeline("text2text-generation", model="/models/train_chat", device=device_flag)
|
|
amb_generator = pipeline("text2text-generation", model="/models/train_amb_gen", device=device_flag)
|
|
generators = {'qa': qa_generator,
|
|
'chat': chat_generator,
|
|
'amb': amb_generator,
|
|
'summ': summ_generator}
|
|
rec = Recommender(retriever)
|
|
vre = VRE("assistedlab", '2c1e8f88-461c-42c0-8cc1-b7660771c9a3-843339462', retriever)
|
|
vre.init()
|
|
index = vre.get_index()
|
|
db = vre.get_db()
|
|
rg = ResponseGenerator(index,db, rec, generators, retriever)
|
|
|
|
threading.Thread(target=vre_fetch, name='updatevre').start()
|
|
threading.Thread(target=clear_inactive, name='clear').start()
|
|
|
|
conn = psycopg2.connect(host="janet-pg", database=os.getenv("POSTGRES_DB"), user=os.getenv("POSTGRES_USER"), password=os.getenv("POSTGRES_PASSWORD"))
|
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute('CREATE TABLE IF NOT EXISTS feedback_experimental (id serial PRIMARY KEY,'
|
|
'query text NOT NULL,'
|
|
'history text NOT NULL,'
|
|
'janet_modified_query text NOT NULL,'
|
|
'is_modified_query_correct text NOT NULL,'
|
|
'user_modified_query text, evidence_useful text NOT NULL,'
|
|
'response text NOT NULL,'
|
|
'preferred_response text,'
|
|
'response_length_feedback text NOT NULL,'
|
|
'response_fluency_feedback text NOT NULL,'
|
|
'response_truth_feedback text NOT NULL,'
|
|
'response_useful_feedback text NOT NULL,'
|
|
'response_time_feedback text NOT NULL,'
|
|
'response_intent text NOT NULL);'
|
|
)
|
|
conn.commit()
|
|
cur.close()
|
|
|
|
app.run(host='0.0.0.0')
|