From 226cdde77d3da35b78cafd2c3b657321c79c7fdc Mon Sep 17 00:00:00 2001 From: Enrico Ottonello Date: Wed, 19 Feb 2020 14:33:54 +0100 Subject: [PATCH] all records related to an api are published into the same graph --- .../ariadneplus/graphdb/GraphDBClient.java | 152 ++---------------- .../AriadnePlusPublisherController.java | 4 +- .../publisher/AriadnePlusPublisherHelper.java | 8 +- .../nodes/PublishGraphDBJobNode.java | 2 +- 4 files changed, 17 insertions(+), 149 deletions(-) diff --git a/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/graphdb/GraphDBClient.java b/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/graphdb/GraphDBClient.java index 16a83ba..d3487ce 100644 --- a/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/graphdb/GraphDBClient.java +++ b/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/graphdb/GraphDBClient.java @@ -8,15 +8,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.eclipse.rdf4j.RDF4JException; import org.eclipse.rdf4j.model.IRI; -import org.eclipse.rdf4j.model.Literal; import org.eclipse.rdf4j.model.Statement; import org.eclipse.rdf4j.model.ValueFactory; -import org.eclipse.rdf4j.query.QueryLanguage; -import org.eclipse.rdf4j.query.TupleQuery; -import org.eclipse.rdf4j.query.TupleQueryResult; import org.eclipse.rdf4j.repository.Repository; import org.eclipse.rdf4j.repository.RepositoryConnection; -import org.eclipse.rdf4j.repository.RepositoryResult; import org.eclipse.rdf4j.repository.manager.RemoteRepositoryManager; import org.eclipse.rdf4j.repository.util.Repositories; import org.eclipse.rdf4j.rio.RDFFormat; @@ -65,130 +60,27 @@ public class GraphDBClient { RemoteRepositoryManager manager = new RemoteRepositoryManager(this.graphDBServerUrl); manager.init(); manager.setUsernameAndPassword(getWriterUser(), getWriterPwd()); - log.debug("get manager for GraphDB Repository " + getRepository()); + log.debug("manager init"); Repository repository = manager.getRepository(getRepository()); ValueFactory factory = repository.getValueFactory(); String datasourceApi = recordParserHelper.getDatasourceApi(record); - IRI graph = factory.createIRI(datasourceApi); - IRI rApi = factory.createIRI(getGraphDBBaseURI(), datasourceApi); - log.debug("query current num partitions for graph " + graph); - String graphName = graph.toString(); - boolean hasPartition = false; - int numPartitions = 0; - try (RepositoryConnection con = repository.getConnection()) { - TupleQuery tupleQuery = con.prepareTupleQuery(QueryLanguage.SPARQL, "select ?num_partitions\n" + - "where {\n" + - " graph {\n" + - " <" + rApi + "> ?num_partitions\n" + - " }\n" + - "}"); - TupleQueryResult tupleQueryResult = tupleQuery.evaluate(); - if (tupleQueryResult.hasNext()) { - hasPartition = true; - numPartitions = Integer.parseInt(tupleQueryResult.next().getValue("num_partitions").stringValue()); - log.debug("numPartitions: "+ numPartitions); - } - con.close(); - } - catch (RDF4JException e) { - log.error("error counting partitions ...", e); - } - - if (!hasPartition) { - graphName = graph.toString().concat("_partition_").concat("1"); - } - else { - if (numPartitions==0) { - log.debug("partition not already created, default numPartitions set to 1"); - numPartitions+=1; - } - graphName = graph.toString().concat("_partition_").concat(Integer.toString(numPartitions)); - } - - log.debug("query current records count on graph " + graphName); - int currentRecordsCount = 0; - try (RepositoryConnection con = repository.getConnection()) { - TupleQuery tupleQuery = con.prepareTupleQuery(QueryLanguage.SPARQL, "select (COUNT(?has_identifier)/2 AS ?count)\n" + - "where {\n" + - " graph <" + graphName + "> {\n" + - " ?x ?has_identifier\n" + - " }\n" + - "}"); - TupleQueryResult tupleQueryResult = tupleQuery.evaluate(); - if (tupleQueryResult.hasNext()) { - currentRecordsCount = Integer.parseInt(tupleQueryResult.next().getValue("count").stringValue()); - log.debug("currentRecordsCount: "+ currentRecordsCount); - } - con.close(); - } - catch (RDF4JException e) { - log.error("error counting records ...", e); - } - - int origNumPartitions = numPartitions; - boolean numRecordsThresholdReached = false; - if (currentRecordsCount >= NUM_RECORDS_THRESHOLD) { - numRecordsThresholdReached = true; - numPartitions+=1; - graphName = graph.toString().concat("_partition_").concat(Integer.toString(numPartitions)); - log.debug("threshold reached graphname is: " + graphName); - } - else { - log.debug("threshold not reached graphname is: " + graphName); - } - + IRI graph = factory.createIRI(getGraphDBBaseURI(), datasourceApi); try (RepositoryConnection con = repository.getConnection()) { + log.debug("connection established"); con.begin(); String recordURI = getRecordURI(objIdentifier, datasourceApi); - log.debug("Adding record " + recordURI + " into graph " + graphName); - con.add(IOUtils.toInputStream(getRDFBlock(record), "UTF-8"), recordURI, RDFFormat.RDFXML, factory.createIRI(graphName)); + log.debug("Trying to adding record with recordURI " + recordURI + " into graph " + graph); + con.add(IOUtils.toInputStream(getRDFBlock(record), "UTF-8"), recordURI, RDFFormat.RDFXML, graph); con.commit(); log.debug("statement added"); con.close(); - } - catch (RDF4JException e) { - log.error("error adding statement ...", e); - } - - if (numRecordsThresholdReached) { - log.debug("updating current numPartitionsStmt"); - Statement numPartitionsStmt = null; - try (RepositoryConnection con = repository.getConnection()) { - IRI pred = factory.createIRI("http://www.d-net.research-infrastructures.eu/provenance/num_partitions"); - Literal curValue = factory.createLiteral(origNumPartitions); - IRI datasourceApisGraph = factory.createIRI("https://ariadne-infrastructure.eu/datasourceApis"); - RepositoryResult numPartitionsStmts = con.getStatements(rApi, pred, curValue, false, datasourceApisGraph); - if (numPartitionsStmts.hasNext()) { - con.begin(); - numPartitionsStmt = numPartitionsStmts.next(); - log.debug("current numPartitionsStmt retrieved " + numPartitionsStmt.toString() + " inside " + datasourceApisGraph.toString()); - con.remove(numPartitionsStmt, datasourceApisGraph); - log.debug("current numPartitionsStmt removed"); - Statement numPartitionStmtUpdated = factory.createStatement(rApi, pred, factory.createLiteral(numPartitions)); - con.add(numPartitionStmtUpdated, datasourceApisGraph); - log.debug("numPartitionsStmt updated"); - con.commit(); - con.close(); - } - else { - con.begin(); - Statement numPartitionStmtUpdated = factory.createStatement(rApi, pred, factory.createLiteral(numPartitions++)); - con.add(numPartitionStmtUpdated, datasourceApisGraph); - log.debug("numPartitionsStmt updated"); - con.commit(); - con.close(); - } - if (con.isActive()) { - con.close(); - } } catch (RDF4JException e) { - log.error("error updating num partition statement ", e); + log.error("error adding statement ...", e); } - } - repository.shutDown(); manager.shutDown(); + log.debug("manager shutDown"); return 1; }catch(Throwable e){ log.error(e); @@ -196,7 +88,6 @@ public class GraphDBClient { } } - public long feedProvenance(final String datasource, final String datasourceApi) throws AriadnePlusPublisherException { try { @@ -239,7 +130,7 @@ public class GraphDBClient { } } - public long dropDatasourceApisPartitionInfo(final String datasourceApi) throws AriadnePlusPublisherException { + public long dropDatasourceApiGraph(final String datasourceApi) throws AriadnePlusPublisherException { try { log.debug("init connection to graphDBServerUrl " + this.graphDBServerUrl); @@ -251,29 +142,10 @@ public class GraphDBClient { throw new AriadnePlusPublisherException("GraphDB repository not found"); } ValueFactory factory = repository.getValueFactory(); - IRI HAS_NUM_PARTITIONS = factory.createIRI("http://www.d-net.research-infrastructures.eu/provenance/num_partitions"); IRI rApi = factory.createIRI(getGraphDBBaseURI(), datasourceApi); - IRI datasourceApisGraph = factory.createIRI(getGraphDBBaseURI(), "datasourceApis"); - Statement numPartitionsStmt = null; try (RepositoryConnection con = repository.getConnection()) { - int numPartitions = 0; - log.debug("Removing datasourceApi partition info s:" + rApi.toString() + " p:" + HAS_NUM_PARTITIONS + " g:" + datasourceApisGraph ); - RepositoryResult numPartitionsStmts = con.getStatements(rApi, HAS_NUM_PARTITIONS, null, false, datasourceApisGraph); - if (numPartitionsStmts.hasNext()) { - con.begin(); - numPartitionsStmt = numPartitionsStmts.next(); - numPartitions = Integer.parseInt(numPartitionsStmt.getObject().stringValue()); - log.debug(" old partitions count: " + numPartitions); - con.remove(rApi, HAS_NUM_PARTITIONS, factory.createLiteral(numPartitions), datasourceApisGraph); - con.commit(); - con.close(); - } - for (int numPartition=1; numPartition<=numPartitions; numPartition++) { - String namedGraph = String.format("api_________::ariadne_plus::ads::1_partition_%d", numPartition); - IRI graph = factory.createIRI(namedGraph); - log.debug("removing namedGraph: " + graph); - Repositories.consume(repository, conn -> conn.clear(graph)); - } + log.debug("removing namedGraph: " + rApi); + Repositories.consume(repository, conn -> conn.clear(rApi)); } catch (RDF4JException e) { log.error("error removing datasourceApi partition info ", e); @@ -289,10 +161,6 @@ public class GraphDBClient { } } - //CLEAR GRAPH - - - private String getRecordURI(final String objIdentifier, final String datasourceApi) { return "/" + datasourceApi + "/" + objIdentifier; } diff --git a/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/publisher/AriadnePlusPublisherController.java b/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/publisher/AriadnePlusPublisherController.java index 283e862..9018c06 100644 --- a/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/publisher/AriadnePlusPublisherController.java +++ b/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/publisher/AriadnePlusPublisherController.java @@ -46,12 +46,12 @@ public class AriadnePlusPublisherController { getAriadnePlusPublisherHelper().feedProvenance(datasource, datasourceApi, getTarget(ariadneplusTarget)); } - @RequestMapping(value = "/dropDatasourceApisPartitionInfo", method = RequestMethod.POST) + @RequestMapping(value = "/dropDatasourceApiGraph", method = RequestMethod.POST) public void dropDatasourceApisPartitionInfo(@RequestParam final String datasourceApi, @RequestParam(required = false) String ariadneplusTarget) throws AriadnePlusPublisherException { if (ariadneplusTarget==null) { ariadneplusTarget = DEFAULT_TARGET_ENDPOINT; } - getAriadnePlusPublisherHelper().dropDatasourceApisPartitionInfo(datasourceApi, getTarget(ariadneplusTarget)); + getAriadnePlusPublisherHelper().dropDatasourceApiGraph(datasourceApi, getTarget(ariadneplusTarget)); } @RequestMapping(value = "/unpublish", method = RequestMethod.GET) diff --git a/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/publisher/AriadnePlusPublisherHelper.java b/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/publisher/AriadnePlusPublisherHelper.java index 3f71692..ec242a9 100644 --- a/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/publisher/AriadnePlusPublisherHelper.java +++ b/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/publisher/AriadnePlusPublisherHelper.java @@ -45,10 +45,10 @@ public class AriadnePlusPublisherHelper { } - public void dropDatasourceApisPartitionInfo(final String datasourceApi, final AriadnePlusTargets target) throws AriadnePlusPublisherException { + public void dropDatasourceApiGraph(final String datasourceApi, final AriadnePlusTargets target) throws AriadnePlusPublisherException { switch(target){ case GRAPHDB: - dropDatasourceApisPartitionInfo(datasourceApi); + dropDatasourceApiGraph(datasourceApi); break; default: throw new AriadnePlusPublisherException("Target "+target+" not supported yet"); } @@ -78,10 +78,10 @@ public class AriadnePlusPublisherHelper { graphDBClient.feedProvenance(datasource, datasourceApi); } - private void dropDatasourceApisPartitionInfo(final String datasourceApi) throws AriadnePlusPublisherException { + private void dropDatasourceApiGraph(final String datasourceApi) throws AriadnePlusPublisherException { log.debug("Drop DatasourceApis Partition Info " + datasourceApi); GraphDBClient graphDBClient = this.graphdbClientFactory.getGraphDBClient(); - graphDBClient.dropDatasourceApisPartitionInfo(datasourceApi); + graphDBClient.dropDatasourceApiGraph(datasourceApi); } private long unpublishGraphDB(final String datasourceInterface) { diff --git a/dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/PublishGraphDBJobNode.java b/dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/PublishGraphDBJobNode.java index 7904184..fce2722 100644 --- a/dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/PublishGraphDBJobNode.java +++ b/dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/PublishGraphDBJobNode.java @@ -223,7 +223,7 @@ public class PublishGraphDBJobNode extends AsyncJobNode { } private String getDropDatasourceApisPartitionInfoEndpoint() { - return publisherEndpoint.concat("/dropDatasourceApisPartitionInfo"); + return publisherEndpoint.concat("/dropDatasourceApiGraph"); } public void setPublisherEndpoint(final String publisherEndpoint) {