package eu.dnetlib.ariadneplus.reader; import eu.dnetlib.ariadneplus.elasticsearch.BulkUpload; import eu.dnetlib.ariadneplus.reader.json.ParseRDFJSON; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.eclipse.rdf4j.model.Model; import org.eclipse.rdf4j.query.*; import org.eclipse.rdf4j.repository.Repository; import org.eclipse.rdf4j.repository.RepositoryConnection; import org.eclipse.rdf4j.repository.manager.RemoteRepositoryManager; import org.eclipse.rdf4j.rio.RDFFormat; import org.eclipse.rdf4j.rio.RDFWriter; import org.eclipse.rdf4j.rio.Rio; import org.springframework.stereotype.Service; import java.io.StringWriter; import java.util.ArrayList; import java.util.Calendar; import java.util.List; @Service public class RunSPARQLQueryService { private static final Log log = LogFactory.getLog(RunSPARQLQueryService.class); private RepositoryConnection connection; private RemoteRepositoryManager manager; private Repository repository; private ParseRDFJSON parser; private ResourceManager resourceManager; private BulkUpload bulkUpload; private static String username = null; private static String pwd = null; private static String graphDBUrl = null; private static String graphDBRepository = null; public void setupConnection(String username, String pwd, String graphDbUrl, String graphDbRepository) { setUsername(username); setPwd(pwd); setGraphDBUrl(graphDbUrl); setGraphDBRepository(graphDbRepository); } private void openConnection(){ manager = new RemoteRepositoryManager(getGraphDBUrl()); manager.init(); manager.setUsernameAndPassword(getUsername(), getPwd()); repository = manager.getRepository(getGraphDBRepository()); connection = repository.getConnection(); } private void closeConnection(){ connection.close(); repository.shutDown(); manager.shutDown(); } public String executeMultipleQueryGraph(String queryTemplate, List recordIds, String datasource, String collectionId, boolean isCollection){ if (queryTemplate==null) return null; final String selectQueryTemplate = queryTemplate.replaceAll("%datasource", datasource).replaceAll("%collectionId", collectionId); recordIds.forEach(recordId -> { int operationResult = executeQueryGraph(selectQueryTemplate, recordId, isCollection); log.info("Indexing final code: "+Integer.toString(operationResult)); }); return "ok"; } private int executeQueryGraph(String selectQueryTemplate, String recordId, boolean isCollection){ // to avoid heap overload on graphdb try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } log.info("Retrieving "+recordId+" - isCollection:"+isCollection ); String query = selectQueryTemplate.replaceAll("%record", "<"+recordId+">"); openConnection(); StringWriter recordWriter = null; Model resultsModel = null; try { // log.debug("Started at: "+Calendar.getInstance().getTime().toString()); GraphQuery graphQuery = connection.prepareGraphQuery(QueryLanguage.SPARQL, query); GraphQueryResult graphQueryResult = graphQuery.evaluate(); resultsModel = QueryResults.asModel(graphQueryResult); graphQueryResult.close(); // log.debug("Finished at: "+Calendar.getInstance().getTime().toString()); log.info("Statements retrieved: " + resultsModel.size()); if (resultsModel.size()==0) { return -2; } recordWriter = new StringWriter(); RDFWriter rdfRecordWriter = Rio.createWriter(RDFFormat.RDFJSON, recordWriter); Rio.write(resultsModel, rdfRecordWriter); if (isCollection) { parser.setCollection(true); } parser.parse(recordWriter.toString()); resourceManager.manage(parser); return bulkUpload.index(resourceManager, isCollection); } catch(Exception e){ log.error(e); return -1; } finally{ closeConnection(); if (resultsModel!=null) { resultsModel.clear(); } } } public ParseRDFJSON getParser() { return parser; } public void setParser(ParseRDFJSON parser) { this.parser = parser; } public ResourceManager getResourceManager() { return resourceManager; } public void setResourceManager(ResourceManager resourceManager) { this.resourceManager = resourceManager; } public BulkUpload getBulkUpload() { return bulkUpload; } public void setBulkUpload(BulkUpload bulkUpload) { this.bulkUpload = bulkUpload; } public static String getUsername() { return username; } public static String getPwd() { return pwd; } public static String getGraphDBUrl() { return graphDBUrl; } public static String getGraphDBRepository() { return graphDBRepository; } public static void setUsername(String username) { RunSPARQLQueryService.username = username; } public static void setPwd(String pwd) { RunSPARQLQueryService.pwd = pwd; } public static void setGraphDBUrl(String graphDBUrl) { RunSPARQLQueryService.graphDBUrl = graphDBUrl; } public static void setGraphDBRepository(String graphDBRepository) { RunSPARQLQueryService.graphDBRepository = graphDBRepository; } public List selectRecordIds(String datasource, String collectionId){ log.debug("Retrieving record Ids from GraphDB ..."); String queryTemplate = "PREFIX rdf: \n" + "select * \n" + "from \n" + "where { \n" + "\t?recordId rdf:type .\n" + "} \n"; String query = queryTemplate.replaceAll("%datasource", datasource).replaceAll("%collectionId", collectionId); return executeSelect(query); } public List selectCollectionId(String datasource, String collectionId){ log.debug("Retrieving collection Id from GraphDB ..."); String queryTemplate = "PREFIX rdf: \n" + "select * \n" + "from \n" + "where { \n" + "\t?recordId rdf:type .\n" + "} \n"; String query = queryTemplate.replaceAll("%datasource", datasource).replaceAll("%collectionId", collectionId); return executeSelect(query); } private List executeSelect(String query){ openConnection(); String jsonRecord = null; List results = new ArrayList<>(); try { log.debug("Started at: "+Calendar.getInstance().getTime().toString()); TupleQuery selectQuery = connection.prepareTupleQuery(QueryLanguage.SPARQL, query); TupleQueryResult selectQueryResult = selectQuery.evaluate(); int counter = 0; while (selectQueryResult.hasNext()) { BindingSet recordSet = selectQueryResult.next(); org.eclipse.rdf4j.model.Value recordIdValue = recordSet.getValue("recordId"); results.add(recordIdValue.stringValue()); counter++; } log.debug("Total records retrieved: "+counter); log.debug("Finished at: "+Calendar.getInstance().getTime().toString()); } catch(Exception e){ e.printStackTrace(); } finally{ closeConnection(); } return results; } }