424 lines
15 KiB
Java
424 lines
15 KiB
Java
package eu.dnetlib.ariadneplus.graphdb;
|
|
|
|
import java.time.LocalDateTime;
|
|
|
|
import org.apache.commons.io.IOUtils;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.eclipse.rdf4j.RDF4JException;
|
|
import org.eclipse.rdf4j.model.IRI;
|
|
import org.eclipse.rdf4j.model.Literal;
|
|
import org.eclipse.rdf4j.model.Statement;
|
|
import org.eclipse.rdf4j.model.ValueFactory;
|
|
import org.eclipse.rdf4j.query.QueryLanguage;
|
|
import org.eclipse.rdf4j.query.TupleQuery;
|
|
import org.eclipse.rdf4j.query.TupleQueryResult;
|
|
import org.eclipse.rdf4j.repository.Repository;
|
|
import org.eclipse.rdf4j.repository.RepositoryConnection;
|
|
import org.eclipse.rdf4j.repository.RepositoryResult;
|
|
import org.eclipse.rdf4j.repository.manager.RemoteRepositoryManager;
|
|
import org.eclipse.rdf4j.repository.util.Repositories;
|
|
import org.eclipse.rdf4j.rio.RDFFormat;
|
|
|
|
import eu.dnetlib.ariadneplus.publisher.AriadnePlusPublisherException;
|
|
import eu.dnetlib.ariadneplus.rdf.RecordParserHelper;
|
|
import net.sf.saxon.s9api.SaxonApiException;
|
|
|
|
/**
|
|
* @author enrico.ottonello
|
|
*
|
|
*/
|
|
|
|
public class GraphDBClient {
|
|
|
|
private static final Log log = LogFactory.getLog(GraphDBClient.class);
|
|
|
|
public static final String PROVENANCE_NS = "http://www.d-net.research-infrastructures.eu/provenance/";
|
|
public static final int NUM_RECORDS_THRESHOLD = 10;
|
|
|
|
private RecordParserHelper recordParserHelper;
|
|
private String graphDBServerUrl;
|
|
private String graphDBBaseURI;
|
|
private String writerUser;
|
|
private String writerPwd;
|
|
private String repository;
|
|
|
|
protected GraphDBClient(final RecordParserHelper recordParserHelper,
|
|
final String graphDBServerUrl, final String graphDBBaseURI, final String writerUser, final String writerPwd, final String repository) {
|
|
this.recordParserHelper = recordParserHelper;
|
|
this.graphDBServerUrl = graphDBServerUrl;
|
|
this.graphDBBaseURI = graphDBBaseURI;
|
|
this.writerUser = writerUser;
|
|
this.writerPwd = writerPwd;
|
|
this.repository = repository;
|
|
}
|
|
|
|
public long feed(final String record) throws AriadnePlusPublisherException{
|
|
try {
|
|
String objIdentifier = recordParserHelper.getObjIdentifier(record);
|
|
if (StringUtils.isBlank(objIdentifier)) {
|
|
log.warn("Got record with no objIdentifier -- skipping");
|
|
return 0;
|
|
}
|
|
log.debug("init connection to graphDBServerUrl " + this.graphDBServerUrl);
|
|
RemoteRepositoryManager manager = new RemoteRepositoryManager(this.graphDBServerUrl);
|
|
manager.init();
|
|
manager.setUsernameAndPassword(getWriterUser(), getWriterPwd());
|
|
log.debug("get manager for GraphDB Repository " + getRepository());
|
|
Repository repository = manager.getRepository(getRepository());
|
|
ValueFactory factory = repository.getValueFactory();
|
|
String datasourceApi = recordParserHelper.getDatasourceApi(record);
|
|
IRI graph = factory.createIRI(datasourceApi);
|
|
IRI rApi = factory.createIRI(getGraphDBBaseURI(), datasourceApi);
|
|
log.debug("query current num partitions for graph " + graph);
|
|
String graphName = graph.toString();
|
|
boolean hasPartition = false;
|
|
int numPartitions = 0;
|
|
try (RepositoryConnection con = repository.getConnection()) {
|
|
TupleQuery tupleQuery = con.prepareTupleQuery(QueryLanguage.SPARQL, "select ?num_partitions\n" +
|
|
"where {\n" +
|
|
" graph <https://ariadne-infrastructure.eu/datasourceApis> {\n" +
|
|
" <" + rApi + "> <http://www.d-net.research-infrastructures.eu/provenance/num_partitions> ?num_partitions\n" +
|
|
" }\n" +
|
|
"}");
|
|
TupleQueryResult tupleQueryResult = tupleQuery.evaluate();
|
|
if (tupleQueryResult.hasNext()) {
|
|
hasPartition = true;
|
|
numPartitions = Integer.parseInt(tupleQueryResult.next().getValue("num_partitions").stringValue());
|
|
log.debug("numPartitions: "+ numPartitions);
|
|
}
|
|
con.close();
|
|
}
|
|
catch (RDF4JException e) {
|
|
log.error("error counting partitions ...", e);
|
|
}
|
|
|
|
if (!hasPartition) {
|
|
graphName = graph.toString().concat("_partition_").concat("1");
|
|
}
|
|
else {
|
|
if (numPartitions==0) {
|
|
log.debug("partition not already created, default numPartitions set to 1");
|
|
numPartitions+=1;
|
|
}
|
|
graphName = graph.toString().concat("_partition_").concat(Integer.toString(numPartitions));
|
|
}
|
|
|
|
log.debug("query current records count on graph " + graphName);
|
|
int currentRecordsCount = 0;
|
|
try (RepositoryConnection con = repository.getConnection()) {
|
|
TupleQuery tupleQuery = con.prepareTupleQuery(QueryLanguage.SPARQL, "select (COUNT(?has_identifier)/2 AS ?count)\n" +
|
|
"where {\n" +
|
|
" graph <" + graphName + "> {\n" +
|
|
" ?x <https://www.ariadne-infrastructure.eu/resource/ao/cat/1.1/has_identifier> ?has_identifier\n" +
|
|
" }\n" +
|
|
"}");
|
|
TupleQueryResult tupleQueryResult = tupleQuery.evaluate();
|
|
if (tupleQueryResult.hasNext()) {
|
|
currentRecordsCount = Integer.parseInt(tupleQueryResult.next().getValue("count").stringValue());
|
|
log.debug("currentRecordsCount: "+ currentRecordsCount);
|
|
}
|
|
con.close();
|
|
}
|
|
catch (RDF4JException e) {
|
|
log.error("error counting records ...", e);
|
|
}
|
|
|
|
int origNumPartitions = numPartitions;
|
|
boolean numRecordsThresholdReached = false;
|
|
if (currentRecordsCount >= NUM_RECORDS_THRESHOLD) {
|
|
numRecordsThresholdReached = true;
|
|
numPartitions+=1;
|
|
graphName = graph.toString().concat("_partition_").concat(Integer.toString(numPartitions));
|
|
log.debug("threshold reached graphname is: " + graphName);
|
|
}
|
|
else {
|
|
log.debug("threshold not reached graphname is: " + graphName);
|
|
}
|
|
|
|
try (RepositoryConnection con = repository.getConnection()) {
|
|
con.begin();
|
|
String recordURI = getRecordURI(objIdentifier, datasourceApi);
|
|
log.debug("Adding record " + recordURI + " into graph " + graphName);
|
|
con.add(IOUtils.toInputStream(getRDFBlock(record), "UTF-8"), recordURI, RDFFormat.RDFXML, factory.createIRI(graphName));
|
|
con.commit();
|
|
log.debug("statement added");
|
|
con.close();
|
|
}
|
|
catch (RDF4JException e) {
|
|
log.error("error adding statement ...", e);
|
|
}
|
|
|
|
if (numRecordsThresholdReached) {
|
|
log.debug("updating current numPartitionsStmt");
|
|
Statement numPartitionsStmt = null;
|
|
try (RepositoryConnection con = repository.getConnection()) {
|
|
IRI pred = factory.createIRI("http://www.d-net.research-infrastructures.eu/provenance/num_partitions");
|
|
Literal curValue = factory.createLiteral(origNumPartitions);
|
|
IRI datasourceApisGraph = factory.createIRI("https://ariadne-infrastructure.eu/datasourceApis");
|
|
RepositoryResult<Statement> numPartitionsStmts = con.getStatements(rApi, pred, curValue, false, datasourceApisGraph);
|
|
if (numPartitionsStmts.hasNext()) {
|
|
con.begin();
|
|
numPartitionsStmt = numPartitionsStmts.next();
|
|
log.debug("current numPartitionsStmt retrieved " + numPartitionsStmt.toString() + " inside " + datasourceApisGraph.toString());
|
|
con.remove(numPartitionsStmt, datasourceApisGraph);
|
|
log.debug("current numPartitionsStmt removed");
|
|
Statement numPartitionStmtUpdated = factory.createStatement(rApi, pred, factory.createLiteral(numPartitions));
|
|
con.add(numPartitionStmtUpdated, datasourceApisGraph);
|
|
log.debug("numPartitionsStmt updated");
|
|
con.commit();
|
|
con.close();
|
|
}
|
|
else {
|
|
con.begin();
|
|
Statement numPartitionStmtUpdated = factory.createStatement(rApi, pred, factory.createLiteral(numPartitions++));
|
|
con.add(numPartitionStmtUpdated, datasourceApisGraph);
|
|
log.debug("numPartitionsStmt updated");
|
|
con.commit();
|
|
con.close();
|
|
}
|
|
if (con.isActive()) {
|
|
con.close();
|
|
}
|
|
}
|
|
catch (RDF4JException e) {
|
|
log.error("error updating num partition statement ", e);
|
|
}
|
|
}
|
|
|
|
repository.shutDown();
|
|
manager.shutDown();
|
|
return 1;
|
|
}catch(Throwable e){
|
|
log.error(e);
|
|
throw new AriadnePlusPublisherException(e);
|
|
}
|
|
}
|
|
|
|
|
|
public long feedProvenance(final String datasource, final String datasourceApi) throws AriadnePlusPublisherException {
|
|
|
|
try {
|
|
log.debug("init connection to graphDBServerUrl " + this.graphDBServerUrl);
|
|
RemoteRepositoryManager manager = new RemoteRepositoryManager(this.graphDBServerUrl);
|
|
manager.init();
|
|
manager.setUsernameAndPassword(getWriterUser(), getWriterPwd());
|
|
log.debug("manager init");
|
|
Repository repository = manager.getRepository(getRepository());
|
|
ValueFactory factory = repository.getValueFactory();
|
|
IRI IS_API_OF = factory.createIRI(PROVENANCE_NS, "isApiOf");
|
|
IRI INSERTED_IN_DATE = factory.createIRI(PROVENANCE_NS, "insertedInDate");
|
|
IRI rApi = factory.createIRI(getGraphDBBaseURI(), datasourceApi);
|
|
Statement stmApi = factory.createStatement(rApi, IS_API_OF, factory.createLiteral(datasource));
|
|
LocalDateTime now = LocalDateTime.now();
|
|
Statement stmInsertedDate = factory.createStatement(rApi, INSERTED_IN_DATE, factory.createLiteral(now.toString()));
|
|
IRI datasourceApisGraph = factory.createIRI(getGraphDBBaseURI(), "datasourceApis");
|
|
try (RepositoryConnection con = repository.getConnection()) {
|
|
con.begin();
|
|
log.debug("Adding stmt " + stmApi.toString() + " into graph " + datasourceApisGraph.toString());
|
|
con.remove(rApi, INSERTED_IN_DATE, null, datasourceApisGraph);
|
|
con.add(stmApi, datasourceApisGraph);
|
|
log.debug("Adding stmt " + stmInsertedDate.toString() + " into graph " + datasourceApisGraph.toString());
|
|
con.add(stmInsertedDate, datasourceApisGraph);
|
|
con.commit();
|
|
log.debug("statements added");
|
|
con.close();
|
|
}
|
|
catch (RDF4JException e) {
|
|
log.error("error adding statement ...", e);
|
|
throw new AriadnePlusPublisherException(e);
|
|
}
|
|
repository.shutDown();
|
|
manager.shutDown();
|
|
return 200;
|
|
}
|
|
catch(Throwable e){
|
|
log.error(e);
|
|
throw new AriadnePlusPublisherException(e);
|
|
}
|
|
}
|
|
|
|
public long dropDatasourceApisPartitionInfo(final String datasourceApi) throws AriadnePlusPublisherException {
|
|
|
|
try {
|
|
log.debug("init connection to graphDBServerUrl " + this.graphDBServerUrl);
|
|
RemoteRepositoryManager manager = new RemoteRepositoryManager(this.graphDBServerUrl);
|
|
manager.init();
|
|
manager.setUsernameAndPassword(getWriterUser(), getWriterPwd());
|
|
Repository repository = manager.getRepository(getRepository());
|
|
if (repository==null) {
|
|
throw new AriadnePlusPublisherException("GraphDB repository not found");
|
|
}
|
|
ValueFactory factory = repository.getValueFactory();
|
|
IRI HAS_NUM_PARTITIONS = factory.createIRI("http://www.d-net.research-infrastructures.eu/provenance/num_partitions");
|
|
IRI rApi = factory.createIRI(getGraphDBBaseURI(), datasourceApi);
|
|
IRI datasourceApisGraph = factory.createIRI(getGraphDBBaseURI(), "datasourceApis");
|
|
Statement numPartitionsStmt = null;
|
|
try (RepositoryConnection con = repository.getConnection()) {
|
|
int numPartitions = 0;
|
|
log.debug("Removing datasourceApi partition info s:" + rApi.toString() + " p:" + HAS_NUM_PARTITIONS + " g:" + datasourceApisGraph );
|
|
RepositoryResult<Statement> numPartitionsStmts = con.getStatements(rApi, HAS_NUM_PARTITIONS, null, false, datasourceApisGraph);
|
|
if (numPartitionsStmts.hasNext()) {
|
|
con.begin();
|
|
numPartitionsStmt = numPartitionsStmts.next();
|
|
numPartitions = Integer.parseInt(numPartitionsStmt.getObject().stringValue());
|
|
log.debug(" old partitions count: " + numPartitions);
|
|
con.remove(rApi, HAS_NUM_PARTITIONS, factory.createLiteral(numPartitions), datasourceApisGraph);
|
|
con.commit();
|
|
con.close();
|
|
}
|
|
for (int numPartition=1; numPartition<=numPartitions; numPartition++) {
|
|
String namedGraph = String.format("api_________::ariadne_plus::ads::1_partition_%d", numPartition);
|
|
IRI graph = factory.createIRI(namedGraph);
|
|
log.debug("removing namedGraph: " + graph);
|
|
Repositories.consume(repository, conn -> conn.clear(graph));
|
|
}
|
|
}
|
|
catch (RDF4JException e) {
|
|
log.error("error removing datasourceApi partition info ", e);
|
|
throw new AriadnePlusPublisherException(e);
|
|
}
|
|
repository.shutDown();
|
|
manager.shutDown();
|
|
return 200;
|
|
}
|
|
catch(Throwable e){
|
|
log.error("error removing datasourceApi partition info ", e);
|
|
throw new AriadnePlusPublisherException(e);
|
|
}
|
|
}
|
|
|
|
//CLEAR GRAPH <https://ariadne-infrastructure.eu/datasourceApis>
|
|
|
|
|
|
|
|
private String getRecordURI(final String objIdentifier, final String datasourceApi) {
|
|
return "/" + datasourceApi + "/" + objIdentifier;
|
|
}
|
|
|
|
public RecordParserHelper getRecordParserHelper() {
|
|
return recordParserHelper;
|
|
}
|
|
|
|
public void setRecordParserHelper(final RecordParserHelper recordParserHelper) {
|
|
this.recordParserHelper = recordParserHelper;
|
|
}
|
|
|
|
public void setDefaultBaseURI(final String defaultBaseURI) {
|
|
this.graphDBServerUrl = defaultBaseURI;
|
|
}
|
|
|
|
public String getRDFBlock(final String record) throws SaxonApiException{
|
|
recordParserHelper.init();
|
|
try {
|
|
if (StringUtils.isBlank(record)) {
|
|
log.warn("Got empty record");
|
|
return "";
|
|
}
|
|
String objIdentifier = recordParserHelper.getObjIdentifier(record);
|
|
if (StringUtils.isBlank(objIdentifier)) {
|
|
log.warn("Got record with no objIdentifier -- skipping");
|
|
return "";
|
|
}
|
|
String rdfBlock = recordParserHelper.getRDF(record);
|
|
if (StringUtils.isBlank(rdfBlock)) {
|
|
log.warn("Missing rdf:RDF in record with objIdentifier " + objIdentifier);
|
|
}
|
|
return rdfBlock;
|
|
}catch(Throwable e){
|
|
log.error(e);
|
|
throw e;
|
|
}
|
|
}
|
|
|
|
|
|
public String getGraphDBBaseURI() {
|
|
return graphDBBaseURI;
|
|
}
|
|
|
|
|
|
public void setGraphDBBaseURI(String graphDBBaseURI) {
|
|
this.graphDBBaseURI = graphDBBaseURI;
|
|
}
|
|
|
|
|
|
public String getWriterUser() {
|
|
return writerUser;
|
|
}
|
|
|
|
|
|
public void setWriterUser(String writerUser) {
|
|
this.writerUser = writerUser;
|
|
}
|
|
|
|
|
|
public String getWriterPwd() {
|
|
return writerPwd;
|
|
}
|
|
|
|
|
|
public void setWriterPwd(String writerPwd) {
|
|
this.writerPwd = writerPwd;
|
|
}
|
|
|
|
|
|
public String getRepository() {
|
|
return repository;
|
|
}
|
|
|
|
|
|
public void setRepository(String repository) {
|
|
this.repository = repository;
|
|
}
|
|
}
|
|
|
|
//
|
|
// strQuery =
|
|
// "SELECT ?name FROM DEFAULT WHERE {" +
|
|
// "?s <http://xmlns.com/foaf/0.1/name> ?name .}";
|
|
// }
|
|
|
|
|
|
//
|
|
// public void queryTest(){
|
|
// RemoteRepositoryManager manager = new RemoteRepositoryManager(GRAPHDB_SERVER_URL);
|
|
// manager.init();
|
|
// logger.debug("manager init");
|
|
// Repository repository = manager.getRepository("PersonData");
|
|
// try (RepositoryConnection con = repository.getConnection()) {
|
|
// logger.debug("connection established");
|
|
// query(con);
|
|
// logger.debug("query success");
|
|
// }
|
|
// catch (RDF4JException e) {
|
|
// logger.error("error adding statement ...", e);
|
|
// }
|
|
// manager.shutDown();
|
|
// logger.debug("manager shutDown");
|
|
// }
|
|
//
|
|
// private void query(RepositoryConnection repositoryConnection) {
|
|
// TupleQuery tupleQuery = repositoryConnection.prepareTupleQuery(QueryLanguage.SPARQL, strQuery);
|
|
// TupleQueryResult result = null;
|
|
// try {
|
|
// result = tupleQuery.evaluate();
|
|
// int count = 0;
|
|
// while (result.hasNext()) {
|
|
// BindingSet bindingSet = result.next();
|
|
//
|
|
// SimpleLiteral name = (SimpleLiteral)bindingSet.getValue("name");
|
|
// logger.info("name = " + name.stringValue());
|
|
// count++;
|
|
// }
|
|
// logger.info("Entries found: ", count);
|
|
// }
|
|
// catch (QueryEvaluationException qee) {
|
|
// logger.error(WTF_MARKER, qee.getStackTrace().toString(), qee);
|
|
// } finally {
|
|
// result.close();
|
|
// }
|
|
// }
|
|
|
|
|