2020-01-28 16:53:59 +01:00
|
|
|
package eu.dnetlib.ariadneplus.reader;
|
|
|
|
|
2020-06-10 19:39:53 +02:00
|
|
|
import eu.dnetlib.ariadneplus.elasticsearch.BulkUpload;
|
2020-09-24 12:35:59 +02:00
|
|
|
import eu.dnetlib.ariadneplus.publisher.AriadnePlusPublisherException;
|
2020-06-10 19:39:53 +02:00
|
|
|
import eu.dnetlib.ariadneplus.reader.json.ParseRDFJSON;
|
2020-06-12 18:14:41 +02:00
|
|
|
import org.apache.commons.logging.Log;
|
|
|
|
import org.apache.commons.logging.LogFactory;
|
2020-01-28 16:53:59 +01:00
|
|
|
import org.eclipse.rdf4j.model.Model;
|
2020-06-11 18:20:42 +02:00
|
|
|
import org.eclipse.rdf4j.query.*;
|
2020-10-10 00:55:26 +02:00
|
|
|
import org.eclipse.rdf4j.repository.http.HTTPQueryEvaluationException;
|
2020-01-28 16:53:59 +01:00
|
|
|
import org.eclipse.rdf4j.repository.Repository;
|
|
|
|
import org.eclipse.rdf4j.repository.RepositoryConnection;
|
|
|
|
import org.eclipse.rdf4j.repository.manager.RemoteRepositoryManager;
|
|
|
|
import org.eclipse.rdf4j.rio.RDFFormat;
|
|
|
|
import org.eclipse.rdf4j.rio.RDFWriter;
|
|
|
|
import org.eclipse.rdf4j.rio.Rio;
|
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
2020-06-10 19:39:53 +02:00
|
|
|
import java.io.StringWriter;
|
2020-06-11 18:20:42 +02:00
|
|
|
import java.util.ArrayList;
|
2020-07-24 12:34:06 +02:00
|
|
|
import java.util.Arrays;
|
2020-06-10 19:39:53 +02:00
|
|
|
import java.util.Calendar;
|
2020-06-11 18:20:42 +02:00
|
|
|
import java.util.List;
|
2020-01-28 16:53:59 +01:00
|
|
|
|
|
|
|
@Service
|
|
|
|
public class RunSPARQLQueryService {
|
|
|
|
|
2020-06-12 18:14:41 +02:00
|
|
|
private static final Log log = LogFactory.getLog(RunSPARQLQueryService.class);
|
2020-01-28 16:53:59 +01:00
|
|
|
|
|
|
|
private RepositoryConnection connection;
|
|
|
|
private RemoteRepositoryManager manager;
|
|
|
|
private Repository repository;
|
|
|
|
|
|
|
|
private ParseRDFJSON parser;
|
|
|
|
private ResourceManager resourceManager;
|
|
|
|
private BulkUpload bulkUpload;
|
|
|
|
|
2020-06-10 19:39:53 +02:00
|
|
|
private static String username = null;
|
|
|
|
private static String pwd = null;
|
|
|
|
private static String graphDBUrl = null;
|
|
|
|
private static String graphDBRepository = null;
|
|
|
|
|
|
|
|
public void setupConnection(String username, String pwd, String graphDbUrl, String graphDbRepository) {
|
|
|
|
setUsername(username);
|
|
|
|
setPwd(pwd);
|
|
|
|
setGraphDBUrl(graphDbUrl);
|
|
|
|
setGraphDBRepository(graphDbRepository);
|
|
|
|
}
|
|
|
|
|
|
|
|
private void openConnection(){
|
|
|
|
manager = new RemoteRepositoryManager(getGraphDBUrl());
|
2020-01-28 16:53:59 +01:00
|
|
|
manager.init();
|
2020-06-10 19:39:53 +02:00
|
|
|
manager.setUsernameAndPassword(getUsername(), getPwd());
|
|
|
|
repository = manager.getRepository(getGraphDBRepository());
|
2020-01-28 16:53:59 +01:00
|
|
|
connection = repository.getConnection();
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
private void closeConnection(){
|
|
|
|
connection.close();
|
|
|
|
repository.shutDown();
|
|
|
|
manager.shutDown();
|
|
|
|
}
|
|
|
|
|
2020-09-24 12:53:08 +02:00
|
|
|
public String executeMultipleQueryGraph(String queryTemplate, List<String> recordIds, String datasource, String collectionId, boolean isCollection) {
|
2020-06-11 18:20:42 +02:00
|
|
|
if (queryTemplate==null)
|
|
|
|
return null;
|
|
|
|
final String selectQueryTemplate = queryTemplate.replaceAll("%datasource", datasource).replaceAll("%collectionId", collectionId);
|
2020-07-23 23:12:07 +02:00
|
|
|
log.info("Start indexing "+ recordIds.size()+ " records ...");
|
2020-07-24 12:34:06 +02:00
|
|
|
final List<Integer> errorCodesCount = Arrays.asList(new Integer(0));
|
|
|
|
final List<Integer> successCodesCount = Arrays.asList(new Integer(0));
|
2020-08-12 15:47:55 +02:00
|
|
|
final List<Integer> counter = Arrays.asList(new Integer(0));
|
2020-06-11 18:20:42 +02:00
|
|
|
recordIds.forEach(recordId -> {
|
2020-10-10 00:55:26 +02:00
|
|
|
int waitAmount=1;
|
2020-10-10 01:03:41 +02:00
|
|
|
int retryResult = 0;
|
2020-07-22 23:13:47 +02:00
|
|
|
int operationResult = executeQueryGraph(selectQueryTemplate, recordId, isCollection);
|
2020-07-24 12:34:06 +02:00
|
|
|
if (operationResult!=200) {
|
2020-09-07 15:45:26 +02:00
|
|
|
log.error(recordId + " error_code: "+ operationResult);
|
2020-10-10 00:55:26 +02:00
|
|
|
if (operationResult==-5) {
|
|
|
|
do {
|
|
|
|
// let's wait if heap space decreases
|
|
|
|
try {
|
|
|
|
log.warn("Waiting more free space on heap for " + waitAmount + " seconds ...");
|
|
|
|
Thread.sleep(waitAmount * 1000);
|
|
|
|
waitAmount = waitAmount*2;
|
|
|
|
} catch (InterruptedException ie) {
|
|
|
|
log.error(ie);
|
|
|
|
}
|
|
|
|
retryResult = executeQueryGraph(selectQueryTemplate, recordId, isCollection);
|
|
|
|
log.debug("retryResult: " + retryResult);
|
|
|
|
} while (retryResult!=200);
|
2020-10-10 01:03:41 +02:00
|
|
|
operationResult = retryResult;
|
2020-10-10 00:55:26 +02:00
|
|
|
}
|
|
|
|
if (operationResult!=200) {
|
|
|
|
int currentErrorsCount = errorCodesCount.get(0).intValue();
|
|
|
|
currentErrorsCount += 1;
|
|
|
|
errorCodesCount.set(0, new Integer(currentErrorsCount));
|
|
|
|
}
|
|
|
|
else {
|
|
|
|
int currentSuccessCount = successCodesCount.get(0).intValue();
|
|
|
|
currentSuccessCount+=1;
|
|
|
|
successCodesCount.set(0, new Integer(currentSuccessCount));
|
2020-09-24 12:35:59 +02:00
|
|
|
}
|
2020-07-24 12:34:06 +02:00
|
|
|
}
|
|
|
|
else {
|
|
|
|
int currentSuccessCount = successCodesCount.get(0).intValue();
|
|
|
|
currentSuccessCount+=1;
|
|
|
|
successCodesCount.set(0, new Integer(currentSuccessCount));
|
|
|
|
}
|
2020-08-12 15:47:55 +02:00
|
|
|
int counterValue = counter.get(0).intValue();
|
|
|
|
String curReport = null;
|
2020-09-24 12:35:59 +02:00
|
|
|
if ((counterValue>0) && (counterValue % 1000) == 0) {
|
2020-08-12 15:47:55 +02:00
|
|
|
curReport = "Current analyzed records: "+counterValue+" Current indexed records: "+ successCodesCount.get(0).intValue() +
|
|
|
|
" , " + "Current errors: "+ errorCodesCount.get(0).intValue();
|
|
|
|
log.info(curReport);
|
|
|
|
}
|
|
|
|
counterValue+=1;
|
|
|
|
counter.set(0, new Integer(counterValue));
|
|
|
|
|
2020-06-11 18:20:42 +02:00
|
|
|
});
|
2020-08-12 15:47:55 +02:00
|
|
|
|
2020-07-24 12:34:06 +02:00
|
|
|
String report = "Total indexed records: "+ successCodesCount.get(0).intValue() +
|
|
|
|
" , " + "Total errors: "+ errorCodesCount.get(0).intValue();
|
|
|
|
log.info(report);
|
|
|
|
return report;
|
2020-06-11 18:20:42 +02:00
|
|
|
}
|
2020-06-10 19:39:53 +02:00
|
|
|
|
2020-07-22 23:13:47 +02:00
|
|
|
private int executeQueryGraph(String selectQueryTemplate, String recordId, boolean isCollection){
|
2020-07-23 23:12:07 +02:00
|
|
|
// decrease queries execution rate to avoid heap overload on graphdb
|
2020-07-22 23:47:08 +02:00
|
|
|
try {
|
2020-07-23 23:12:07 +02:00
|
|
|
Thread.sleep(50);
|
2020-07-22 23:47:08 +02:00
|
|
|
} catch (InterruptedException e) {
|
|
|
|
e.printStackTrace();
|
|
|
|
}
|
2020-10-12 18:15:15 +02:00
|
|
|
log.debug("Retrieving "+recordId+" - isCollection:"+isCollection );
|
2020-07-07 13:39:22 +02:00
|
|
|
String query = selectQueryTemplate.replaceAll("%record", "<"+recordId+">");
|
|
|
|
openConnection();
|
2020-01-28 16:53:59 +01:00
|
|
|
StringWriter recordWriter = null;
|
2020-06-10 19:39:53 +02:00
|
|
|
Model resultsModel = null;
|
2020-01-28 16:53:59 +01:00
|
|
|
try {
|
2020-10-12 18:15:15 +02:00
|
|
|
log.debug("Started at: "+Calendar.getInstance().getTime().toString());
|
2020-06-10 19:39:53 +02:00
|
|
|
GraphQuery graphQuery = connection.prepareGraphQuery(QueryLanguage.SPARQL, query);
|
2020-01-28 16:53:59 +01:00
|
|
|
GraphQueryResult graphQueryResult = graphQuery.evaluate();
|
2020-06-10 19:39:53 +02:00
|
|
|
resultsModel = QueryResults.asModel(graphQueryResult);
|
2020-01-28 16:53:59 +01:00
|
|
|
graphQueryResult.close();
|
2020-10-12 18:15:15 +02:00
|
|
|
log.debug("Finished at: "+Calendar.getInstance().getTime().toString());
|
|
|
|
log.debug("Statements retrieved: " + resultsModel.size());
|
2020-06-11 18:20:42 +02:00
|
|
|
if (resultsModel.size()==0) {
|
2020-07-22 23:13:47 +02:00
|
|
|
return -2;
|
2020-06-11 18:20:42 +02:00
|
|
|
}
|
|
|
|
recordWriter = new StringWriter();
|
2020-06-10 19:39:53 +02:00
|
|
|
RDFWriter rdfRecordWriter = Rio.createWriter(RDFFormat.RDFJSON, recordWriter);
|
|
|
|
Rio.write(resultsModel, rdfRecordWriter);
|
2020-08-07 16:40:21 +02:00
|
|
|
parser.setCollection(isCollection);
|
2020-07-24 00:32:49 +02:00
|
|
|
String bufferedRecord = recordWriter.toString();
|
2020-10-13 00:31:17 +02:00
|
|
|
// log.debug(bufferedRecord);
|
2020-07-24 00:32:49 +02:00
|
|
|
int size = parser.parse(bufferedRecord);
|
2020-10-12 18:15:15 +02:00
|
|
|
log.debug("json elements: "+size);
|
2020-07-24 00:32:49 +02:00
|
|
|
if (size==-1) {
|
|
|
|
return -4;
|
|
|
|
}
|
2020-06-10 19:39:53 +02:00
|
|
|
resourceManager.manage(parser);
|
2020-07-22 23:13:47 +02:00
|
|
|
return bulkUpload.index(resourceManager, isCollection);
|
2020-10-10 00:55:26 +02:00
|
|
|
} catch (HTTPQueryEvaluationException qe) {
|
|
|
|
log.error(qe);
|
|
|
|
return -5;
|
|
|
|
} catch(Exception e){
|
2020-07-22 23:13:47 +02:00
|
|
|
log.error(e);
|
|
|
|
return -1;
|
2020-06-10 19:39:53 +02:00
|
|
|
} finally{
|
2020-01-28 16:53:59 +01:00
|
|
|
closeConnection();
|
2020-06-10 19:39:53 +02:00
|
|
|
if (resultsModel!=null) {
|
|
|
|
resultsModel.clear();
|
2020-01-28 16:53:59 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
public ParseRDFJSON getParser() {
|
|
|
|
return parser;
|
|
|
|
}
|
|
|
|
|
|
|
|
public void setParser(ParseRDFJSON parser) {
|
|
|
|
this.parser = parser;
|
|
|
|
}
|
|
|
|
|
|
|
|
public ResourceManager getResourceManager() {
|
|
|
|
return resourceManager;
|
|
|
|
}
|
|
|
|
|
|
|
|
public void setResourceManager(ResourceManager resourceManager) {
|
|
|
|
this.resourceManager = resourceManager;
|
|
|
|
}
|
|
|
|
|
|
|
|
public BulkUpload getBulkUpload() {
|
|
|
|
return bulkUpload;
|
|
|
|
}
|
|
|
|
|
|
|
|
public void setBulkUpload(BulkUpload bulkUpload) {
|
|
|
|
this.bulkUpload = bulkUpload;
|
|
|
|
}
|
|
|
|
|
2020-06-10 19:39:53 +02:00
|
|
|
public static String getUsername() {
|
|
|
|
return username;
|
|
|
|
}
|
|
|
|
|
|
|
|
public static String getPwd() {
|
|
|
|
return pwd;
|
|
|
|
}
|
|
|
|
|
|
|
|
public static String getGraphDBUrl() {
|
|
|
|
return graphDBUrl;
|
|
|
|
}
|
|
|
|
|
|
|
|
public static String getGraphDBRepository() {
|
|
|
|
return graphDBRepository;
|
|
|
|
}
|
|
|
|
|
|
|
|
public static void setUsername(String username) {
|
|
|
|
RunSPARQLQueryService.username = username;
|
|
|
|
}
|
|
|
|
|
|
|
|
public static void setPwd(String pwd) {
|
|
|
|
RunSPARQLQueryService.pwd = pwd;
|
|
|
|
}
|
|
|
|
|
|
|
|
public static void setGraphDBUrl(String graphDBUrl) {
|
|
|
|
RunSPARQLQueryService.graphDBUrl = graphDBUrl;
|
|
|
|
}
|
|
|
|
|
|
|
|
public static void setGraphDBRepository(String graphDBRepository) {
|
|
|
|
RunSPARQLQueryService.graphDBRepository = graphDBRepository;
|
|
|
|
}
|
2020-06-11 18:20:42 +02:00
|
|
|
|
2020-06-12 18:14:41 +02:00
|
|
|
public List<String> selectRecordIds(String datasource, String collectionId){
|
2020-06-15 12:10:59 +02:00
|
|
|
log.debug("Retrieving record Ids from GraphDB ...");
|
2020-06-12 18:14:41 +02:00
|
|
|
String queryTemplate = "PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>\n" +
|
2020-06-11 18:20:42 +02:00
|
|
|
"select * \n" +
|
2020-06-12 18:14:41 +02:00
|
|
|
"from <https://ariadne-infrastructure.eu/api_________::ariadne_plus::%datasource::%collectionId>\n" +
|
2020-06-11 18:20:42 +02:00
|
|
|
"where { \n" +
|
|
|
|
"\t?recordId rdf:type <https://www.ariadne-infrastructure.eu/resource/ao/cat/1.1/AO_Individual_Data_Resource> .\n" +
|
|
|
|
"} \n";
|
2020-06-12 18:14:41 +02:00
|
|
|
String query = queryTemplate.replaceAll("%datasource", datasource).replaceAll("%collectionId", collectionId);
|
2020-06-15 12:10:59 +02:00
|
|
|
return executeSelect(query);
|
|
|
|
}
|
|
|
|
|
2020-09-04 16:57:38 +02:00
|
|
|
public List<String> selectCollectionIds(String datasource, String collectionId){
|
|
|
|
log.debug("Retrieving collection Ids from GraphDB ...");
|
2020-06-15 12:10:59 +02:00
|
|
|
String queryTemplate = "PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>\n" +
|
|
|
|
"select * \n" +
|
|
|
|
"from <https://ariadne-infrastructure.eu/api_________::ariadne_plus::%datasource::%collectionId>\n" +
|
|
|
|
"where { \n" +
|
|
|
|
"\t?recordId rdf:type <https://www.ariadne-infrastructure.eu/resource/ao/cat/1.1/AO_Collection> .\n" +
|
|
|
|
"} \n";
|
|
|
|
String query = queryTemplate.replaceAll("%datasource", datasource).replaceAll("%collectionId", collectionId);
|
|
|
|
return executeSelect(query);
|
|
|
|
}
|
|
|
|
|
|
|
|
private List<String> executeSelect(String query){
|
2020-06-11 18:20:42 +02:00
|
|
|
openConnection();
|
|
|
|
String jsonRecord = null;
|
2020-06-15 12:10:59 +02:00
|
|
|
List<String> results = new ArrayList<>();
|
2020-06-11 18:20:42 +02:00
|
|
|
try {
|
2020-07-24 00:32:49 +02:00
|
|
|
// log.debug("Started at: "+Calendar.getInstance().getTime().toString());
|
2020-06-11 18:20:42 +02:00
|
|
|
TupleQuery selectQuery = connection.prepareTupleQuery(QueryLanguage.SPARQL, query);
|
|
|
|
TupleQueryResult selectQueryResult = selectQuery.evaluate();
|
|
|
|
int counter = 0;
|
|
|
|
while (selectQueryResult.hasNext()) {
|
|
|
|
BindingSet recordSet = selectQueryResult.next();
|
|
|
|
org.eclipse.rdf4j.model.Value recordIdValue = recordSet.getValue("recordId");
|
2020-06-15 12:10:59 +02:00
|
|
|
results.add(recordIdValue.stringValue());
|
2020-06-11 18:20:42 +02:00
|
|
|
counter++;
|
|
|
|
}
|
2020-06-15 12:10:59 +02:00
|
|
|
log.debug("Total records retrieved: "+counter);
|
2020-07-24 00:32:49 +02:00
|
|
|
// log.debug("Finished at: "+Calendar.getInstance().getTime().toString());
|
2020-06-11 18:20:42 +02:00
|
|
|
} catch(Exception e){
|
|
|
|
e.printStackTrace();
|
|
|
|
} finally{
|
|
|
|
closeConnection();
|
|
|
|
}
|
2020-06-15 12:10:59 +02:00
|
|
|
return results;
|
2020-06-11 18:20:42 +02:00
|
|
|
}
|
2020-01-28 16:53:59 +01:00
|
|
|
}
|