all records related to an api are published into the same graph
This commit is contained in:
parent
f12eb7eb52
commit
226cdde77d
|
@ -8,15 +8,10 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.eclipse.rdf4j.RDF4JException;
|
import org.eclipse.rdf4j.RDF4JException;
|
||||||
import org.eclipse.rdf4j.model.IRI;
|
import org.eclipse.rdf4j.model.IRI;
|
||||||
import org.eclipse.rdf4j.model.Literal;
|
|
||||||
import org.eclipse.rdf4j.model.Statement;
|
import org.eclipse.rdf4j.model.Statement;
|
||||||
import org.eclipse.rdf4j.model.ValueFactory;
|
import org.eclipse.rdf4j.model.ValueFactory;
|
||||||
import org.eclipse.rdf4j.query.QueryLanguage;
|
|
||||||
import org.eclipse.rdf4j.query.TupleQuery;
|
|
||||||
import org.eclipse.rdf4j.query.TupleQueryResult;
|
|
||||||
import org.eclipse.rdf4j.repository.Repository;
|
import org.eclipse.rdf4j.repository.Repository;
|
||||||
import org.eclipse.rdf4j.repository.RepositoryConnection;
|
import org.eclipse.rdf4j.repository.RepositoryConnection;
|
||||||
import org.eclipse.rdf4j.repository.RepositoryResult;
|
|
||||||
import org.eclipse.rdf4j.repository.manager.RemoteRepositoryManager;
|
import org.eclipse.rdf4j.repository.manager.RemoteRepositoryManager;
|
||||||
import org.eclipse.rdf4j.repository.util.Repositories;
|
import org.eclipse.rdf4j.repository.util.Repositories;
|
||||||
import org.eclipse.rdf4j.rio.RDFFormat;
|
import org.eclipse.rdf4j.rio.RDFFormat;
|
||||||
|
@ -65,130 +60,27 @@ public class GraphDBClient {
|
||||||
RemoteRepositoryManager manager = new RemoteRepositoryManager(this.graphDBServerUrl);
|
RemoteRepositoryManager manager = new RemoteRepositoryManager(this.graphDBServerUrl);
|
||||||
manager.init();
|
manager.init();
|
||||||
manager.setUsernameAndPassword(getWriterUser(), getWriterPwd());
|
manager.setUsernameAndPassword(getWriterUser(), getWriterPwd());
|
||||||
log.debug("get manager for GraphDB Repository " + getRepository());
|
log.debug("manager init");
|
||||||
Repository repository = manager.getRepository(getRepository());
|
Repository repository = manager.getRepository(getRepository());
|
||||||
ValueFactory factory = repository.getValueFactory();
|
ValueFactory factory = repository.getValueFactory();
|
||||||
String datasourceApi = recordParserHelper.getDatasourceApi(record);
|
String datasourceApi = recordParserHelper.getDatasourceApi(record);
|
||||||
IRI graph = factory.createIRI(datasourceApi);
|
IRI graph = factory.createIRI(getGraphDBBaseURI(), datasourceApi);
|
||||||
IRI rApi = factory.createIRI(getGraphDBBaseURI(), datasourceApi);
|
|
||||||
log.debug("query current num partitions for graph " + graph);
|
|
||||||
String graphName = graph.toString();
|
|
||||||
boolean hasPartition = false;
|
|
||||||
int numPartitions = 0;
|
|
||||||
try (RepositoryConnection con = repository.getConnection()) {
|
|
||||||
TupleQuery tupleQuery = con.prepareTupleQuery(QueryLanguage.SPARQL, "select ?num_partitions\n" +
|
|
||||||
"where {\n" +
|
|
||||||
" graph <https://ariadne-infrastructure.eu/datasourceApis> {\n" +
|
|
||||||
" <" + rApi + "> <http://www.d-net.research-infrastructures.eu/provenance/num_partitions> ?num_partitions\n" +
|
|
||||||
" }\n" +
|
|
||||||
"}");
|
|
||||||
TupleQueryResult tupleQueryResult = tupleQuery.evaluate();
|
|
||||||
if (tupleQueryResult.hasNext()) {
|
|
||||||
hasPartition = true;
|
|
||||||
numPartitions = Integer.parseInt(tupleQueryResult.next().getValue("num_partitions").stringValue());
|
|
||||||
log.debug("numPartitions: "+ numPartitions);
|
|
||||||
}
|
|
||||||
con.close();
|
|
||||||
}
|
|
||||||
catch (RDF4JException e) {
|
|
||||||
log.error("error counting partitions ...", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!hasPartition) {
|
|
||||||
graphName = graph.toString().concat("_partition_").concat("1");
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
if (numPartitions==0) {
|
|
||||||
log.debug("partition not already created, default numPartitions set to 1");
|
|
||||||
numPartitions+=1;
|
|
||||||
}
|
|
||||||
graphName = graph.toString().concat("_partition_").concat(Integer.toString(numPartitions));
|
|
||||||
}
|
|
||||||
|
|
||||||
log.debug("query current records count on graph " + graphName);
|
|
||||||
int currentRecordsCount = 0;
|
|
||||||
try (RepositoryConnection con = repository.getConnection()) {
|
|
||||||
TupleQuery tupleQuery = con.prepareTupleQuery(QueryLanguage.SPARQL, "select (COUNT(?has_identifier)/2 AS ?count)\n" +
|
|
||||||
"where {\n" +
|
|
||||||
" graph <" + graphName + "> {\n" +
|
|
||||||
" ?x <https://www.ariadne-infrastructure.eu/resource/ao/cat/1.1/has_identifier> ?has_identifier\n" +
|
|
||||||
" }\n" +
|
|
||||||
"}");
|
|
||||||
TupleQueryResult tupleQueryResult = tupleQuery.evaluate();
|
|
||||||
if (tupleQueryResult.hasNext()) {
|
|
||||||
currentRecordsCount = Integer.parseInt(tupleQueryResult.next().getValue("count").stringValue());
|
|
||||||
log.debug("currentRecordsCount: "+ currentRecordsCount);
|
|
||||||
}
|
|
||||||
con.close();
|
|
||||||
}
|
|
||||||
catch (RDF4JException e) {
|
|
||||||
log.error("error counting records ...", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
int origNumPartitions = numPartitions;
|
|
||||||
boolean numRecordsThresholdReached = false;
|
|
||||||
if (currentRecordsCount >= NUM_RECORDS_THRESHOLD) {
|
|
||||||
numRecordsThresholdReached = true;
|
|
||||||
numPartitions+=1;
|
|
||||||
graphName = graph.toString().concat("_partition_").concat(Integer.toString(numPartitions));
|
|
||||||
log.debug("threshold reached graphname is: " + graphName);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
log.debug("threshold not reached graphname is: " + graphName);
|
|
||||||
}
|
|
||||||
|
|
||||||
try (RepositoryConnection con = repository.getConnection()) {
|
try (RepositoryConnection con = repository.getConnection()) {
|
||||||
|
log.debug("connection established");
|
||||||
con.begin();
|
con.begin();
|
||||||
String recordURI = getRecordURI(objIdentifier, datasourceApi);
|
String recordURI = getRecordURI(objIdentifier, datasourceApi);
|
||||||
log.debug("Adding record " + recordURI + " into graph " + graphName);
|
log.debug("Trying to adding record with recordURI " + recordURI + " into graph " + graph);
|
||||||
con.add(IOUtils.toInputStream(getRDFBlock(record), "UTF-8"), recordURI, RDFFormat.RDFXML, factory.createIRI(graphName));
|
con.add(IOUtils.toInputStream(getRDFBlock(record), "UTF-8"), recordURI, RDFFormat.RDFXML, graph);
|
||||||
con.commit();
|
con.commit();
|
||||||
log.debug("statement added");
|
log.debug("statement added");
|
||||||
con.close();
|
con.close();
|
||||||
}
|
|
||||||
catch (RDF4JException e) {
|
|
||||||
log.error("error adding statement ...", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (numRecordsThresholdReached) {
|
|
||||||
log.debug("updating current numPartitionsStmt");
|
|
||||||
Statement numPartitionsStmt = null;
|
|
||||||
try (RepositoryConnection con = repository.getConnection()) {
|
|
||||||
IRI pred = factory.createIRI("http://www.d-net.research-infrastructures.eu/provenance/num_partitions");
|
|
||||||
Literal curValue = factory.createLiteral(origNumPartitions);
|
|
||||||
IRI datasourceApisGraph = factory.createIRI("https://ariadne-infrastructure.eu/datasourceApis");
|
|
||||||
RepositoryResult<Statement> numPartitionsStmts = con.getStatements(rApi, pred, curValue, false, datasourceApisGraph);
|
|
||||||
if (numPartitionsStmts.hasNext()) {
|
|
||||||
con.begin();
|
|
||||||
numPartitionsStmt = numPartitionsStmts.next();
|
|
||||||
log.debug("current numPartitionsStmt retrieved " + numPartitionsStmt.toString() + " inside " + datasourceApisGraph.toString());
|
|
||||||
con.remove(numPartitionsStmt, datasourceApisGraph);
|
|
||||||
log.debug("current numPartitionsStmt removed");
|
|
||||||
Statement numPartitionStmtUpdated = factory.createStatement(rApi, pred, factory.createLiteral(numPartitions));
|
|
||||||
con.add(numPartitionStmtUpdated, datasourceApisGraph);
|
|
||||||
log.debug("numPartitionsStmt updated");
|
|
||||||
con.commit();
|
|
||||||
con.close();
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
con.begin();
|
|
||||||
Statement numPartitionStmtUpdated = factory.createStatement(rApi, pred, factory.createLiteral(numPartitions++));
|
|
||||||
con.add(numPartitionStmtUpdated, datasourceApisGraph);
|
|
||||||
log.debug("numPartitionsStmt updated");
|
|
||||||
con.commit();
|
|
||||||
con.close();
|
|
||||||
}
|
|
||||||
if (con.isActive()) {
|
|
||||||
con.close();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
catch (RDF4JException e) {
|
catch (RDF4JException e) {
|
||||||
log.error("error updating num partition statement ", e);
|
log.error("error adding statement ...", e);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
repository.shutDown();
|
repository.shutDown();
|
||||||
manager.shutDown();
|
manager.shutDown();
|
||||||
|
log.debug("manager shutDown");
|
||||||
return 1;
|
return 1;
|
||||||
}catch(Throwable e){
|
}catch(Throwable e){
|
||||||
log.error(e);
|
log.error(e);
|
||||||
|
@ -196,7 +88,6 @@ public class GraphDBClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public long feedProvenance(final String datasource, final String datasourceApi) throws AriadnePlusPublisherException {
|
public long feedProvenance(final String datasource, final String datasourceApi) throws AriadnePlusPublisherException {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -239,7 +130,7 @@ public class GraphDBClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public long dropDatasourceApisPartitionInfo(final String datasourceApi) throws AriadnePlusPublisherException {
|
public long dropDatasourceApiGraph(final String datasourceApi) throws AriadnePlusPublisherException {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
log.debug("init connection to graphDBServerUrl " + this.graphDBServerUrl);
|
log.debug("init connection to graphDBServerUrl " + this.graphDBServerUrl);
|
||||||
|
@ -251,29 +142,10 @@ public class GraphDBClient {
|
||||||
throw new AriadnePlusPublisherException("GraphDB repository not found");
|
throw new AriadnePlusPublisherException("GraphDB repository not found");
|
||||||
}
|
}
|
||||||
ValueFactory factory = repository.getValueFactory();
|
ValueFactory factory = repository.getValueFactory();
|
||||||
IRI HAS_NUM_PARTITIONS = factory.createIRI("http://www.d-net.research-infrastructures.eu/provenance/num_partitions");
|
|
||||||
IRI rApi = factory.createIRI(getGraphDBBaseURI(), datasourceApi);
|
IRI rApi = factory.createIRI(getGraphDBBaseURI(), datasourceApi);
|
||||||
IRI datasourceApisGraph = factory.createIRI(getGraphDBBaseURI(), "datasourceApis");
|
|
||||||
Statement numPartitionsStmt = null;
|
|
||||||
try (RepositoryConnection con = repository.getConnection()) {
|
try (RepositoryConnection con = repository.getConnection()) {
|
||||||
int numPartitions = 0;
|
log.debug("removing namedGraph: " + rApi);
|
||||||
log.debug("Removing datasourceApi partition info s:" + rApi.toString() + " p:" + HAS_NUM_PARTITIONS + " g:" + datasourceApisGraph );
|
Repositories.consume(repository, conn -> conn.clear(rApi));
|
||||||
RepositoryResult<Statement> numPartitionsStmts = con.getStatements(rApi, HAS_NUM_PARTITIONS, null, false, datasourceApisGraph);
|
|
||||||
if (numPartitionsStmts.hasNext()) {
|
|
||||||
con.begin();
|
|
||||||
numPartitionsStmt = numPartitionsStmts.next();
|
|
||||||
numPartitions = Integer.parseInt(numPartitionsStmt.getObject().stringValue());
|
|
||||||
log.debug(" old partitions count: " + numPartitions);
|
|
||||||
con.remove(rApi, HAS_NUM_PARTITIONS, factory.createLiteral(numPartitions), datasourceApisGraph);
|
|
||||||
con.commit();
|
|
||||||
con.close();
|
|
||||||
}
|
|
||||||
for (int numPartition=1; numPartition<=numPartitions; numPartition++) {
|
|
||||||
String namedGraph = String.format("api_________::ariadne_plus::ads::1_partition_%d", numPartition);
|
|
||||||
IRI graph = factory.createIRI(namedGraph);
|
|
||||||
log.debug("removing namedGraph: " + graph);
|
|
||||||
Repositories.consume(repository, conn -> conn.clear(graph));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
catch (RDF4JException e) {
|
catch (RDF4JException e) {
|
||||||
log.error("error removing datasourceApi partition info ", e);
|
log.error("error removing datasourceApi partition info ", e);
|
||||||
|
@ -289,10 +161,6 @@ public class GraphDBClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//CLEAR GRAPH <https://ariadne-infrastructure.eu/datasourceApis>
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
private String getRecordURI(final String objIdentifier, final String datasourceApi) {
|
private String getRecordURI(final String objIdentifier, final String datasourceApi) {
|
||||||
return "/" + datasourceApi + "/" + objIdentifier;
|
return "/" + datasourceApi + "/" + objIdentifier;
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,12 +46,12 @@ public class AriadnePlusPublisherController {
|
||||||
getAriadnePlusPublisherHelper().feedProvenance(datasource, datasourceApi, getTarget(ariadneplusTarget));
|
getAriadnePlusPublisherHelper().feedProvenance(datasource, datasourceApi, getTarget(ariadneplusTarget));
|
||||||
}
|
}
|
||||||
|
|
||||||
@RequestMapping(value = "/dropDatasourceApisPartitionInfo", method = RequestMethod.POST)
|
@RequestMapping(value = "/dropDatasourceApiGraph", method = RequestMethod.POST)
|
||||||
public void dropDatasourceApisPartitionInfo(@RequestParam final String datasourceApi, @RequestParam(required = false) String ariadneplusTarget) throws AriadnePlusPublisherException {
|
public void dropDatasourceApisPartitionInfo(@RequestParam final String datasourceApi, @RequestParam(required = false) String ariadneplusTarget) throws AriadnePlusPublisherException {
|
||||||
if (ariadneplusTarget==null) {
|
if (ariadneplusTarget==null) {
|
||||||
ariadneplusTarget = DEFAULT_TARGET_ENDPOINT;
|
ariadneplusTarget = DEFAULT_TARGET_ENDPOINT;
|
||||||
}
|
}
|
||||||
getAriadnePlusPublisherHelper().dropDatasourceApisPartitionInfo(datasourceApi, getTarget(ariadneplusTarget));
|
getAriadnePlusPublisherHelper().dropDatasourceApiGraph(datasourceApi, getTarget(ariadneplusTarget));
|
||||||
}
|
}
|
||||||
|
|
||||||
@RequestMapping(value = "/unpublish", method = RequestMethod.GET)
|
@RequestMapping(value = "/unpublish", method = RequestMethod.GET)
|
||||||
|
|
|
@ -45,10 +45,10 @@ public class AriadnePlusPublisherHelper {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void dropDatasourceApisPartitionInfo(final String datasourceApi, final AriadnePlusTargets target) throws AriadnePlusPublisherException {
|
public void dropDatasourceApiGraph(final String datasourceApi, final AriadnePlusTargets target) throws AriadnePlusPublisherException {
|
||||||
switch(target){
|
switch(target){
|
||||||
case GRAPHDB:
|
case GRAPHDB:
|
||||||
dropDatasourceApisPartitionInfo(datasourceApi);
|
dropDatasourceApiGraph(datasourceApi);
|
||||||
break;
|
break;
|
||||||
default: throw new AriadnePlusPublisherException("Target "+target+" not supported yet");
|
default: throw new AriadnePlusPublisherException("Target "+target+" not supported yet");
|
||||||
}
|
}
|
||||||
|
@ -78,10 +78,10 @@ public class AriadnePlusPublisherHelper {
|
||||||
graphDBClient.feedProvenance(datasource, datasourceApi);
|
graphDBClient.feedProvenance(datasource, datasourceApi);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void dropDatasourceApisPartitionInfo(final String datasourceApi) throws AriadnePlusPublisherException {
|
private void dropDatasourceApiGraph(final String datasourceApi) throws AriadnePlusPublisherException {
|
||||||
log.debug("Drop DatasourceApis Partition Info " + datasourceApi);
|
log.debug("Drop DatasourceApis Partition Info " + datasourceApi);
|
||||||
GraphDBClient graphDBClient = this.graphdbClientFactory.getGraphDBClient();
|
GraphDBClient graphDBClient = this.graphdbClientFactory.getGraphDBClient();
|
||||||
graphDBClient.dropDatasourceApisPartitionInfo(datasourceApi);
|
graphDBClient.dropDatasourceApiGraph(datasourceApi);
|
||||||
}
|
}
|
||||||
|
|
||||||
private long unpublishGraphDB(final String datasourceInterface) {
|
private long unpublishGraphDB(final String datasourceInterface) {
|
||||||
|
|
|
@ -223,7 +223,7 @@ public class PublishGraphDBJobNode extends AsyncJobNode {
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getDropDatasourceApisPartitionInfoEndpoint() {
|
private String getDropDatasourceApisPartitionInfoEndpoint() {
|
||||||
return publisherEndpoint.concat("/dropDatasourceApisPartitionInfo");
|
return publisherEndpoint.concat("/dropDatasourceApiGraph");
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setPublisherEndpoint(final String publisherEndpoint) {
|
public void setPublisherEndpoint(final String publisherEndpoint) {
|
||||||
|
|
Loading…
Reference in New Issue