package eu.dnetlib.ariadneplus.graphdb; import java.io.File; import java.io.IOException; import java.net.URL; import java.nio.charset.StandardCharsets; import java.time.LocalDateTime; import java.util.*; import com.google.common.base.Splitter; import eu.dnetlib.ariadneplus.elasticsearch.BulkUpload; import eu.dnetlib.ariadneplus.reader.ResourceManager; import eu.dnetlib.ariadneplus.reader.RunSPARQLQueryService; import eu.dnetlib.ariadneplus.reader.json.ParseRDFJSON; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.eclipse.rdf4j.RDF4JException; import org.eclipse.rdf4j.model.IRI; import org.eclipse.rdf4j.model.Statement; import org.eclipse.rdf4j.model.ValueFactory; import org.eclipse.rdf4j.query.*; import org.eclipse.rdf4j.repository.Repository; import org.eclipse.rdf4j.repository.RepositoryConnection; import org.eclipse.rdf4j.repository.manager.RemoteRepositoryManager; import org.eclipse.rdf4j.repository.util.Repositories; import org.eclipse.rdf4j.rio.RDFFormat; import eu.dnetlib.ariadneplus.publisher.AriadnePlusPublisherException; import eu.dnetlib.ariadneplus.rdf.RecordParserHelper; import net.sf.saxon.s9api.SaxonApiException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.io.ClassPathResource; import org.springframework.stereotype.Component; /** * @author enrico.ottonello * */ @Component public class GraphDBClient { private static final Log log = LogFactory.getLog(GraphDBClient.class); public static final String PROVENANCE_NS = "http://www.d-net.research-infrastructures.eu/provenance/"; @Autowired private RunSPARQLQueryService runSPQRLQuery; @Autowired private ParseRDFJSON parseRDFJSON; @Autowired private ResourceManager resourceManager; @Autowired private BulkUpload bulkUpload; private RecordParserHelper recordParserHelper; private String graphDBServerUrl; private String graphDBBaseURI; private String writerUser; private String writerPwd; private String repository; protected void setup(final RecordParserHelper recordParserHelper, final String graphDBServerUrl, final String graphDBBaseURI, final String writerUser, final String writerPwd, final String repository) { this.recordParserHelper = recordParserHelper; this.graphDBServerUrl = graphDBServerUrl; this.graphDBBaseURI = graphDBBaseURI; this.writerUser = writerUser; this.writerPwd = writerPwd; this.repository = repository; } public long feed(final String record) throws AriadnePlusPublisherException{ try { String objIdentifier = recordParserHelper.getObjIdentifier(record); if (StringUtils.isBlank(objIdentifier)) { log.warn("Got record with no objIdentifier -- skipping"); return 0; } RemoteRepositoryManager manager = new RemoteRepositoryManager(this.graphDBServerUrl); manager.init(); manager.setUsernameAndPassword(getWriterUser(), getWriterPwd()); Repository repository = manager.getRepository(getRepository()); ValueFactory factory = repository.getValueFactory(); String datasourceApi = recordParserHelper.getDatasourceApi(record); IRI graph = factory.createIRI(getGraphDBBaseURI(), datasourceApi); try (RepositoryConnection con = repository.getConnection()) { con.begin(); String recordURI = getRecordURI(objIdentifier, datasourceApi); // log.debug("Trying to adding record with recordURI " + recordURI + " into graph " + graph); con.add(IOUtils.toInputStream(getRDFBlock(record), "UTF-8"), recordURI, RDFFormat.RDFXML, graph); con.commit(); // log.debug("statement added"); } catch (RDF4JException e) { log.error("error adding statement ...", e); } repository.shutDown(); manager.shutDown(); return 1; }catch(Throwable e){ log.error(e); throw new AriadnePlusPublisherException(e); } } public long feedProvenance(final String datasource, final String datasourceApi) throws AriadnePlusPublisherException { try { RemoteRepositoryManager manager = new RemoteRepositoryManager(this.graphDBServerUrl); manager.init(); manager.setUsernameAndPassword(getWriterUser(), getWriterPwd()); Repository repository = manager.getRepository(getRepository()); ValueFactory factory = repository.getValueFactory(); IRI IS_API_OF = factory.createIRI(PROVENANCE_NS, "isApiOf"); IRI INSERTED_IN_DATE = factory.createIRI(PROVENANCE_NS, "insertedInDate"); IRI rApi = factory.createIRI(getGraphDBBaseURI(), datasourceApi); Statement stmApi = factory.createStatement(rApi, IS_API_OF, factory.createLiteral(datasource)); LocalDateTime now = LocalDateTime.now(); Statement stmInsertedDate = factory.createStatement(rApi, INSERTED_IN_DATE, factory.createLiteral(now.toString())); IRI datasourceApisGraph = factory.createIRI(getGraphDBBaseURI(), "datasourceApis"); try (RepositoryConnection con = repository.getConnection()) { con.begin(); // log.debug("Adding stmt " + stmApi.toString() + " into graph " + datasourceApisGraph.toString()); con.remove(rApi, INSERTED_IN_DATE, null, datasourceApisGraph); con.add(stmApi, datasourceApisGraph); // log.debug("Adding stmt " + stmInsertedDate.toString() + " into graph " + datasourceApisGraph.toString()); con.add(stmInsertedDate, datasourceApisGraph); con.commit(); // log.debug("statements added"); con.close(); } catch (RDF4JException e) { log.error("error adding statement ...", e); throw new AriadnePlusPublisherException(e); } repository.shutDown(); manager.shutDown(); return 200; } catch(Throwable e){ log.error(e); throw new AriadnePlusPublisherException(e); } } public long dropDatasourceApiGraph(final String datasourceApi) throws AriadnePlusPublisherException { try { RemoteRepositoryManager manager = new RemoteRepositoryManager(this.graphDBServerUrl); manager.init(); manager.setUsernameAndPassword(getWriterUser(), getWriterPwd()); Repository repository = manager.getRepository(getRepository()); if (repository==null) { throw new AriadnePlusPublisherException("GraphDB repository not found"); } ValueFactory factory = repository.getValueFactory(); IRI rApi = factory.createIRI(getGraphDBBaseURI(), datasourceApi); try (RepositoryConnection con = repository.getConnection()) { log.debug("removing namedGraph: " + rApi); Repositories.consume(repository, conn -> conn.clear(rApi)); } catch (RDF4JException e) { log.error("error removing datasourceApi partition info ", e); throw new AriadnePlusPublisherException(e); } repository.shutDown(); manager.shutDown(); return 200; } catch(Throwable e){ log.error("error removing datasourceApi partition info ", e); throw new AriadnePlusPublisherException(e); } } private String getRecordURI(final String objIdentifier, final String datasourceApi) { return "/" + datasourceApi + "/" + objIdentifier; } public RecordParserHelper getRecordParserHelper() { return recordParserHelper; } public void setRecordParserHelper(final RecordParserHelper recordParserHelper) { this.recordParserHelper = recordParserHelper; } public void setDefaultBaseURI(final String defaultBaseURI) { this.graphDBServerUrl = defaultBaseURI; } public String getRDFBlock(final String record) throws SaxonApiException{ recordParserHelper.init(); try { if (StringUtils.isBlank(record)) { log.warn("Got empty record"); return ""; } String objIdentifier = recordParserHelper.getObjIdentifier(record); if (StringUtils.isBlank(objIdentifier)) { log.warn("Got record with no objIdentifier -- skipping"); return ""; } String rdfBlock = recordParserHelper.getRDF(record); if (StringUtils.isBlank(rdfBlock)) { log.warn("Missing rdf:RDF in record with objIdentifier " + objIdentifier); } return rdfBlock; }catch(Throwable e){ log.error(e); throw e; } } public String getGraphDBBaseURI() { return graphDBBaseURI; } public void setGraphDBBaseURI(String graphDBBaseURI) { this.graphDBBaseURI = graphDBBaseURI; } public String getWriterUser() { return writerUser; } public void setWriterUser(String writerUser) { this.writerUser = writerUser; } public String getWriterPwd() { return writerPwd; } public void setWriterPwd(String writerPwd) { this.writerPwd = writerPwd; } public String getRepository() { return repository; } public void setRepository(String repository) { this.repository = repository; } public String updateSparql(final String queryValue) throws AriadnePlusPublisherException{ try { String result= ""; RemoteRepositoryManager manager = new RemoteRepositoryManager(this.graphDBServerUrl); manager.init(); manager.setUsernameAndPassword(getWriterUser(), getWriterPwd()); Repository repository = manager.getRepository(getRepository()); //NOTE: One query with multiple updates separated by ; is ok with GraphDB EE. // Using the free edition, we have to divide them in separate INSERT, // otherwise we cannot see the triples inserted by the previous updates. //see https://stackoverflow.com/questions/54428161/graphdb-read-check-and-update-in-a-transaction try (RepositoryConnection con = repository.getConnection()) { int countQueries = 0; int countSuccess = 0; for(String query : Splitter.on(";").split(queryValue)){ countQueries++; con.begin(); Update updateResultQuery = con.prepareUpdate(queryValue); if (updateResultQuery != null) { updateResultQuery.execute(); } else { throw new AriadnePlusPublisherException(String.format("Cannot generate Update statement from %s", query)); } log.debug(String.format("Query %d executed: %s", countQueries, query)); con.commit(); countSuccess++; log.debug(String.format("Query %d committed", countQueries)); } log.info(String.format("Queries committed with success %d/%d", countSuccess, countQueries)); } catch (RDF4JException e) { log.error("error executing query ...", e); throw new AriadnePlusPublisherException(e); } repository.shutDown(); manager.shutDown(); return result; }catch(Throwable e){ log.error(e); throw new AriadnePlusPublisherException(e); } } public String feedFromURL(final String dataUrl, final String context) throws AriadnePlusPublisherException{ try { String result = ""; RemoteRepositoryManager manager = new RemoteRepositoryManager(this.graphDBServerUrl); manager.init(); manager.setUsernameAndPassword(getWriterUser(), getWriterPwd()); Repository repository = manager.getRepository(getRepository()); ValueFactory factory = repository.getValueFactory(); try (RepositoryConnection con = repository.getConnection()) { con.begin(); IRI contextIRI = factory.createIRI(getGraphDBBaseURI(), context); con.add(new URL(dataUrl), null, RDFFormat.TURTLE, contextIRI); result.concat("data added from url: "+dataUrl+" into graph "+context); con.commit(); log.debug("add data from Url executed"); } catch (RDF4JException e) { log.error("error executing query ...", e); } repository.shutDown(); manager.shutDown(); return result; }catch(Throwable e){ log.error(e); throw new AriadnePlusPublisherException(e); } } public RunSPARQLQueryService getRunSPQRLQuery() { return runSPQRLQuery; } public void setRunSPQRLQuery(RunSPARQLQueryService runSPQRLQuery) { this.runSPQRLQuery = runSPQRLQuery; } public String indexOnES(String datasource, String collectionId) throws AriadnePlusPublisherException { String recordsIndexReport = ""; String collectionIndexReport = ""; try { log.info("Start indexing from "+ datasource + " " + collectionId); runSPQRLQuery.setupConnection( getWriterUser(), getWriterPwd(), this.graphDBServerUrl, getRepository()); runSPQRLQuery.setParser(parseRDFJSON); runSPQRLQuery.setResourceManager(resourceManager); runSPQRLQuery.setBulkUpload(bulkUpload); List collectionResourceIds = runSPQRLQuery.selectCollectionIds(datasource, collectionId); log.info(String.format("Found %d collections to index for datasource %s - %s", collectionResourceIds.size(), datasource, collectionId)); try { if (!collectionResourceIds.isEmpty()) { final ClassPathResource selectCollectionTemplateRes = new ClassPathResource("eu/dnetlib/ariadneplus/sparql/read_collection_data_template.sparql"); String selectCollectionTemplate = IOUtils.toString(selectCollectionTemplateRes.getInputStream(), StandardCharsets.UTF_8.name()); collectionIndexReport = runSPQRLQuery.executeMultipleQueryGraph(selectCollectionTemplate, collectionResourceIds, datasource, collectionId, true); } } catch (RuntimeException re) { throw new AriadnePlusPublisherException(re); } List recordIds = runSPQRLQuery.selectRecordIds(datasource, collectionId); log.info(String.format("Found %d individual resources to index for datasource %s - %s", recordIds.size(), datasource, collectionId)); if(!recordIds.isEmpty()) { final ClassPathResource queryTemplateResource = new ClassPathResource("eu/dnetlib/ariadneplus/sparql/read_record_data_template.sparql"); String queryTemplate = IOUtils.toString(queryTemplateResource.getInputStream(), StandardCharsets.UTF_8.name()); recordsIndexReport = runSPQRLQuery.executeMultipleQueryGraph(queryTemplate, recordIds, datasource, collectionId, false); } }catch(Throwable e){ log.error(e); throw new AriadnePlusPublisherException(e); } return "Records: ".concat(recordsIndexReport).concat(" Collection: ").concat(collectionIndexReport); } public List selectIdentifiers(String datasource, String collectionId, String resourceType) throws AriadnePlusPublisherException { List identifiers = Collections.emptyList(); try { log.info("Select "+resourceType+" Identifiers from "+ datasource + " " + collectionId); runSPQRLQuery.setupConnection( getWriterUser(), getWriterPwd(), this.graphDBServerUrl, getRepository()); if (resourceType.equals("COLLECTION")) { identifiers = runSPQRLQuery.selectCollectionIds(datasource, collectionId); log.info(String.format("Found %d collections for datasource %s - %s", identifiers.size(), datasource, collectionId)); } else if (resourceType.equals("RECORD")) { identifiers = runSPQRLQuery.selectRecordIds(datasource, collectionId); log.info(String.format("Found %d records for datasource %s - %s", identifiers.size(), datasource, collectionId)); } }catch(Throwable e){ log.error(e); throw new AriadnePlusPublisherException(e); } return identifiers; } public String indexOnESByIdentifier(String datasource, String collectionId, String resourceType, String identifier) throws AriadnePlusPublisherException { String report = ""; try { if (StringUtils.isBlank(identifier)) { return "empty identifier"; } List identifiers = Arrays.asList(identifier); log.info("Start indexing from "+ datasource + " " + collectionId); runSPQRLQuery.setupConnection( getWriterUser(), getWriterPwd(), this.graphDBServerUrl, getRepository()); runSPQRLQuery.setParser(parseRDFJSON); runSPQRLQuery.setResourceManager(resourceManager); runSPQRLQuery.setBulkUpload(bulkUpload); if (resourceType.equals("COLLECTION")) { if (!identifiers.isEmpty()) { final ClassPathResource selectCollectionTemplateRes = new ClassPathResource("eu/dnetlib/ariadneplus/sparql/read_collection_data_template.sparql"); String selectCollectionTemplate = IOUtils.toString(selectCollectionTemplateRes.getInputStream(), StandardCharsets.UTF_8.name()); report = runSPQRLQuery.executeMultipleQueryGraph(selectCollectionTemplate, identifiers, datasource, collectionId, true); } } else if (resourceType.equals("RECORD")) { if(!identifiers.isEmpty()) { final ClassPathResource queryTemplateResource = new ClassPathResource("eu/dnetlib/ariadneplus/sparql/read_record_data_template.sparql"); String queryTemplate = IOUtils.toString(queryTemplateResource.getInputStream(), StandardCharsets.UTF_8.name()); report = runSPQRLQuery.executeMultipleQueryGraph(queryTemplate, identifiers, datasource, collectionId, false); } } } catch (RuntimeException | IOException re) { throw new AriadnePlusPublisherException(re); } return "Resources: ".concat(report); } }