package eu.dnetlib.ariadneplus.reader; import eu.dnetlib.ariadneplus.elasticsearch.BulkUpload; import eu.dnetlib.ariadneplus.reader.json.ParseRDFJSON; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.eclipse.rdf4j.model.Model; import org.eclipse.rdf4j.query.*; import org.eclipse.rdf4j.repository.Repository; import org.eclipse.rdf4j.repository.RepositoryConnection; import org.eclipse.rdf4j.repository.manager.RemoteRepositoryManager; import org.eclipse.rdf4j.rio.RDFFormat; import org.eclipse.rdf4j.rio.RDFWriter; import org.eclipse.rdf4j.rio.Rio; import org.springframework.stereotype.Service; import java.io.StringWriter; import java.util.ArrayList; import java.util.Arrays; import java.util.Calendar; import java.util.List; @Service public class RunSPARQLQueryService { private static final Log log = LogFactory.getLog(RunSPARQLQueryService.class); private RepositoryConnection connection; private RemoteRepositoryManager manager; private Repository repository; private ParseRDFJSON parser; private ResourceManager resourceManager; private BulkUpload bulkUpload; private static String username = null; private static String pwd = null; private static String graphDBUrl = null; private static String graphDBRepository = null; public void setupConnection(String username, String pwd, String graphDbUrl, String graphDbRepository) { setUsername(username); setPwd(pwd); setGraphDBUrl(graphDbUrl); setGraphDBRepository(graphDbRepository); } private void openConnection(){ manager = new RemoteRepositoryManager(getGraphDBUrl()); manager.init(); manager.setUsernameAndPassword(getUsername(), getPwd()); repository = manager.getRepository(getGraphDBRepository()); connection = repository.getConnection(); } private void closeConnection(){ connection.close(); repository.shutDown(); manager.shutDown(); } public String executeMultipleQueryGraph(String queryTemplate, List recordIds, String datasource, String collectionId, boolean isCollection){ if (queryTemplate==null) return null; final String selectQueryTemplate = queryTemplate.replaceAll("%datasource", datasource).replaceAll("%collectionId", collectionId); log.info("Start indexing "+ recordIds.size()+ " records ..."); final List errorCodesCount = Arrays.asList(new Integer(0)); final List successCodesCount = Arrays.asList(new Integer(0)); final List counter = Arrays.asList(new Integer(0)); recordIds.forEach(recordId -> { log.info(recordId+" >"); int operationResult = executeQueryGraph(selectQueryTemplate, recordId, isCollection); log.info(" "+operationResult); if (operationResult!=200) { log.error(recordId + " error_code: "+ Integer.toString(operationResult)); int currentErrorsCount = errorCodesCount.get(0).intValue(); currentErrorsCount+=1; errorCodesCount.set(0, new Integer(currentErrorsCount)); } else { int currentSuccessCount = successCodesCount.get(0).intValue(); currentSuccessCount+=1; successCodesCount.set(0, new Integer(currentSuccessCount)); } int counterValue = counter.get(0).intValue(); String curReport = null; if ((counterValue % 1000) == 0) { curReport = "Current analyzed records: "+counterValue+" Current indexed records: "+ successCodesCount.get(0).intValue() + " , " + "Current errors: "+ errorCodesCount.get(0).intValue(); log.info(curReport); } counterValue+=1; counter.set(0, new Integer(counterValue)); }); String report = "Total indexed records: "+ successCodesCount.get(0).intValue() + " , " + "Total errors: "+ errorCodesCount.get(0).intValue(); log.info(report); return report; } private int executeQueryGraph(String selectQueryTemplate, String recordId, boolean isCollection){ // decrease queries execution rate to avoid heap overload on graphdb try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } // log.info("Retrieving "+recordId+" - isCollection:"+isCollection ); String query = selectQueryTemplate.replaceAll("%record", "<"+recordId+">"); openConnection(); StringWriter recordWriter = null; Model resultsModel = null; try { // log.debug("Started at: "+Calendar.getInstance().getTime().toString()); GraphQuery graphQuery = connection.prepareGraphQuery(QueryLanguage.SPARQL, query); GraphQueryResult graphQueryResult = graphQuery.evaluate(); resultsModel = QueryResults.asModel(graphQueryResult); graphQueryResult.close(); // log.debug("Finished at: "+Calendar.getInstance().getTime().toString()); // log.info("Statements retrieved: " + resultsModel.size()); if (resultsModel.size()==0) { return -2; } recordWriter = new StringWriter(); RDFWriter rdfRecordWriter = Rio.createWriter(RDFFormat.RDFJSON, recordWriter); Rio.write(resultsModel, rdfRecordWriter); parser.setCollection(isCollection); String bufferedRecord = recordWriter.toString(); int size = parser.parse(bufferedRecord); // log.debug("json elements: "+size); if (size==-1) { return -4; } resourceManager.manage(parser); return bulkUpload.index(resourceManager, isCollection); } catch(Exception e){ log.error(e); return -1; } finally{ closeConnection(); if (resultsModel!=null) { resultsModel.clear(); } } } public ParseRDFJSON getParser() { return parser; } public void setParser(ParseRDFJSON parser) { this.parser = parser; } public ResourceManager getResourceManager() { return resourceManager; } public void setResourceManager(ResourceManager resourceManager) { this.resourceManager = resourceManager; } public BulkUpload getBulkUpload() { return bulkUpload; } public void setBulkUpload(BulkUpload bulkUpload) { this.bulkUpload = bulkUpload; } public static String getUsername() { return username; } public static String getPwd() { return pwd; } public static String getGraphDBUrl() { return graphDBUrl; } public static String getGraphDBRepository() { return graphDBRepository; } public static void setUsername(String username) { RunSPARQLQueryService.username = username; } public static void setPwd(String pwd) { RunSPARQLQueryService.pwd = pwd; } public static void setGraphDBUrl(String graphDBUrl) { RunSPARQLQueryService.graphDBUrl = graphDBUrl; } public static void setGraphDBRepository(String graphDBRepository) { RunSPARQLQueryService.graphDBRepository = graphDBRepository; } public List selectRecordIds(String datasource, String collectionId){ log.debug("Retrieving record Ids from GraphDB ..."); String queryTemplate = "PREFIX rdf: \n" + "select * \n" + "from \n" + "where { \n" + "\t?recordId rdf:type .\n" + "} \n"; String query = queryTemplate.replaceAll("%datasource", datasource).replaceAll("%collectionId", collectionId); return executeSelect(query); } public List selectCollectionId(String datasource, String collectionId){ log.debug("Retrieving collection Id from GraphDB ..."); String queryTemplate = "PREFIX rdf: \n" + "select * \n" + "from \n" + "where { \n" + "\t?recordId rdf:type .\n" + "} \n"; String query = queryTemplate.replaceAll("%datasource", datasource).replaceAll("%collectionId", collectionId); return executeSelect(query); } private List executeSelect(String query){ openConnection(); String jsonRecord = null; List results = new ArrayList<>(); try { // log.debug("Started at: "+Calendar.getInstance().getTime().toString()); TupleQuery selectQuery = connection.prepareTupleQuery(QueryLanguage.SPARQL, query); TupleQueryResult selectQueryResult = selectQuery.evaluate(); int counter = 0; while (selectQueryResult.hasNext()) { BindingSet recordSet = selectQueryResult.next(); org.eclipse.rdf4j.model.Value recordIdValue = recordSet.getValue("recordId"); results.add(recordIdValue.stringValue()); counter++; } log.debug("Total records retrieved: "+counter); // log.debug("Finished at: "+Calendar.getInstance().getTime().toString()); } catch(Exception e){ e.printStackTrace(); } finally{ closeConnection(); } return results; } }