AriadnePlus/dnet-ariadneplus-graphdb-pu.../src/main/java/eu/dnetlib/ariadneplus/graphdb/GraphDBClient.java

435 lines
16 KiB
Java

package eu.dnetlib.ariadneplus.graphdb;
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.*;
import com.google.common.base.Splitter;
import eu.dnetlib.ariadneplus.elasticsearch.BulkUpload;
import eu.dnetlib.ariadneplus.reader.ResourceManager;
import eu.dnetlib.ariadneplus.reader.RunSPARQLQueryService;
import eu.dnetlib.ariadneplus.reader.json.ParseRDFJSON;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.rdf4j.RDF4JException;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.query.*;
import org.eclipse.rdf4j.repository.Repository;
import org.eclipse.rdf4j.repository.RepositoryConnection;
import org.eclipse.rdf4j.repository.manager.RemoteRepositoryManager;
import org.eclipse.rdf4j.repository.util.Repositories;
import org.eclipse.rdf4j.rio.RDFFormat;
import eu.dnetlib.ariadneplus.publisher.AriadnePlusPublisherException;
import eu.dnetlib.ariadneplus.rdf.RecordParserHelper;
import net.sf.saxon.s9api.SaxonApiException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.ClassPathResource;
import org.springframework.stereotype.Component;
/**
* @author enrico.ottonello
*
*/
@Component
public class GraphDBClient {
private static final Log log = LogFactory.getLog(GraphDBClient.class);
public static final String PROVENANCE_NS = "http://www.d-net.research-infrastructures.eu/provenance/";
@Autowired
private RunSPARQLQueryService runSPQRLQuery;
@Autowired
private ParseRDFJSON parseRDFJSON;
@Autowired
private ResourceManager resourceManager;
@Autowired
private BulkUpload bulkUpload;
private RecordParserHelper recordParserHelper;
private String graphDBServerUrl;
private String graphDBBaseURI;
private String writerUser;
private String writerPwd;
private String repository;
protected void setup(final RecordParserHelper recordParserHelper,
final String graphDBServerUrl, final String graphDBBaseURI, final String writerUser, final String writerPwd, final String repository) {
this.recordParserHelper = recordParserHelper;
this.graphDBServerUrl = graphDBServerUrl;
this.graphDBBaseURI = graphDBBaseURI;
this.writerUser = writerUser;
this.writerPwd = writerPwd;
this.repository = repository;
}
public long feed(final String record) throws AriadnePlusPublisherException{
try {
String objIdentifier = recordParserHelper.getObjIdentifier(record);
if (StringUtils.isBlank(objIdentifier)) {
log.warn("Got record with no objIdentifier -- skipping");
return 0;
}
RemoteRepositoryManager manager = new RemoteRepositoryManager(this.graphDBServerUrl);
manager.init();
manager.setUsernameAndPassword(getWriterUser(), getWriterPwd());
Repository repository = manager.getRepository(getRepository());
ValueFactory factory = repository.getValueFactory();
String datasourceApi = recordParserHelper.getDatasourceApi(record);
IRI graph = factory.createIRI(getGraphDBBaseURI(), datasourceApi);
try (RepositoryConnection con = repository.getConnection()) {
con.begin();
String recordURI = getRecordURI(objIdentifier, datasourceApi);
// log.debug("Trying to adding record with recordURI " + recordURI + " into graph " + graph);
con.add(IOUtils.toInputStream(getRDFBlock(record), "UTF-8"), recordURI, RDFFormat.RDFXML, graph);
con.commit();
// log.debug("statement added");
}
catch (RDF4JException e) {
log.error("error adding statement ...", e);
}
repository.shutDown();
manager.shutDown();
return 1;
}catch(Throwable e){
log.error(e);
throw new AriadnePlusPublisherException(e);
}
}
public long feedProvenance(final String datasource, final String datasourceApi) throws AriadnePlusPublisherException {
try {
RemoteRepositoryManager manager = new RemoteRepositoryManager(this.graphDBServerUrl);
manager.init();
manager.setUsernameAndPassword(getWriterUser(), getWriterPwd());
Repository repository = manager.getRepository(getRepository());
ValueFactory factory = repository.getValueFactory();
IRI IS_API_OF = factory.createIRI(PROVENANCE_NS, "isApiOf");
IRI INSERTED_IN_DATE = factory.createIRI(PROVENANCE_NS, "insertedInDate");
IRI rApi = factory.createIRI(getGraphDBBaseURI(), datasourceApi);
Statement stmApi = factory.createStatement(rApi, IS_API_OF, factory.createLiteral(datasource));
LocalDateTime now = LocalDateTime.now();
Statement stmInsertedDate = factory.createStatement(rApi, INSERTED_IN_DATE, factory.createLiteral(now.toString()));
IRI datasourceApisGraph = factory.createIRI(getGraphDBBaseURI(), "datasourceApis");
try (RepositoryConnection con = repository.getConnection()) {
con.begin();
// log.debug("Adding stmt " + stmApi.toString() + " into graph " + datasourceApisGraph.toString());
con.remove(rApi, INSERTED_IN_DATE, null, datasourceApisGraph);
con.add(stmApi, datasourceApisGraph);
// log.debug("Adding stmt " + stmInsertedDate.toString() + " into graph " + datasourceApisGraph.toString());
con.add(stmInsertedDate, datasourceApisGraph);
con.commit();
// log.debug("statements added");
con.close();
}
catch (RDF4JException e) {
log.error("error adding statement ...", e);
throw new AriadnePlusPublisherException(e);
}
repository.shutDown();
manager.shutDown();
return 200;
}
catch(Throwable e){
log.error(e);
throw new AriadnePlusPublisherException(e);
}
}
public long dropDatasourceApiGraph(final String datasourceApi) throws AriadnePlusPublisherException {
try {
RemoteRepositoryManager manager = new RemoteRepositoryManager(this.graphDBServerUrl);
manager.init();
manager.setUsernameAndPassword(getWriterUser(), getWriterPwd());
Repository repository = manager.getRepository(getRepository());
if (repository==null) {
throw new AriadnePlusPublisherException("GraphDB repository not found");
}
ValueFactory factory = repository.getValueFactory();
IRI rApi = factory.createIRI(getGraphDBBaseURI(), datasourceApi);
try (RepositoryConnection con = repository.getConnection()) {
log.debug("removing namedGraph: " + rApi);
Repositories.consume(repository, conn -> conn.clear(rApi));
}
catch (RDF4JException e) {
log.error("error removing datasourceApi partition info ", e);
throw new AriadnePlusPublisherException(e);
}
repository.shutDown();
manager.shutDown();
return 200;
}
catch(Throwable e){
log.error("error removing datasourceApi partition info ", e);
throw new AriadnePlusPublisherException(e);
}
}
private String getRecordURI(final String objIdentifier, final String datasourceApi) {
return "/" + datasourceApi + "/" + objIdentifier;
}
public RecordParserHelper getRecordParserHelper() {
return recordParserHelper;
}
public void setRecordParserHelper(final RecordParserHelper recordParserHelper) {
this.recordParserHelper = recordParserHelper;
}
public void setDefaultBaseURI(final String defaultBaseURI) {
this.graphDBServerUrl = defaultBaseURI;
}
public String getRDFBlock(final String record) throws SaxonApiException{
recordParserHelper.init();
try {
if (StringUtils.isBlank(record)) {
log.warn("Got empty record");
return "";
}
String objIdentifier = recordParserHelper.getObjIdentifier(record);
if (StringUtils.isBlank(objIdentifier)) {
log.warn("Got record with no objIdentifier -- skipping");
return "";
}
String rdfBlock = recordParserHelper.getRDF(record);
if (StringUtils.isBlank(rdfBlock)) {
log.warn("Missing rdf:RDF in record with objIdentifier " + objIdentifier);
}
return rdfBlock;
}catch(Throwable e){
log.error(e);
throw e;
}
}
public String getGraphDBBaseURI() {
return graphDBBaseURI;
}
public void setGraphDBBaseURI(String graphDBBaseURI) {
this.graphDBBaseURI = graphDBBaseURI;
}
public String getWriterUser() {
return writerUser;
}
public void setWriterUser(String writerUser) {
this.writerUser = writerUser;
}
public String getWriterPwd() {
return writerPwd;
}
public void setWriterPwd(String writerPwd) {
this.writerPwd = writerPwd;
}
public String getRepository() {
return repository;
}
public void setRepository(String repository) {
this.repository = repository;
}
public String updateSparql(final String queryValue) throws AriadnePlusPublisherException{
try {
String result= "";
RemoteRepositoryManager manager = new RemoteRepositoryManager(this.graphDBServerUrl);
manager.init();
manager.setUsernameAndPassword(getWriterUser(), getWriterPwd());
Repository repository = manager.getRepository(getRepository());
//NOTE: One query with multiple updates separated by ; is ok with GraphDB EE.
// Using the free edition, we have to divide them in separate INSERT,
// otherwise we cannot see the triples inserted by the previous updates.
//see https://stackoverflow.com/questions/54428161/graphdb-read-check-and-update-in-a-transaction
try (RepositoryConnection con = repository.getConnection()) {
int countQueries = 0;
int countSuccess = 0;
for(String query : Splitter.on(";").split(queryValue)){
countQueries++;
con.begin();
Update updateResultQuery = con.prepareUpdate(queryValue);
if (updateResultQuery != null) {
updateResultQuery.execute();
}
else {
throw new AriadnePlusPublisherException(String.format("Cannot generate Update statement from %s", query));
}
log.debug(String.format("Query %d executed: %s", countQueries, query));
con.commit();
countSuccess++;
log.debug(String.format("Query %d committed", countQueries));
}
log.info(String.format("Queries committed with success %d/%d", countSuccess, countQueries));
}
catch (RDF4JException e) {
log.error("error executing query ...", e);
throw new AriadnePlusPublisherException(e);
}
repository.shutDown();
manager.shutDown();
return result;
}catch(Throwable e){
log.error(e);
throw new AriadnePlusPublisherException(e);
}
}
public String feedFromURL(final String dataUrl, final String context) throws AriadnePlusPublisherException{
String result = "";
RemoteRepositoryManager manager = new RemoteRepositoryManager(this.graphDBServerUrl);
manager.init();
manager.setUsernameAndPassword(getWriterUser(), getWriterPwd());
Repository repository = manager.getRepository(getRepository());
ValueFactory factory = repository.getValueFactory();
try (RepositoryConnection con = repository.getConnection()) {
con.begin();
IRI contextIRI = factory.createIRI(getGraphDBBaseURI(), context);
log.debug("adding data from IRI: "+contextIRI.toString());
con.add(new URL(dataUrl), null, RDFFormat.TURTLE, contextIRI);
result.concat("data added from url: "+dataUrl+" into graph "+context);
con.commit();
log.debug("add data from Url executed");
}
catch (RDF4JException | MalformedURLException e) {
log.error("error executing query ...", e);
throw new AriadnePlusPublisherException(e);
} catch (IOException e) {
log.error("error executing query ...", e);
throw new AriadnePlusPublisherException(e);
}
finally {
repository.shutDown();
manager.shutDown();
}
return result;
}
public RunSPARQLQueryService getRunSPQRLQuery() {
return runSPQRLQuery;
}
public void setRunSPQRLQuery(RunSPARQLQueryService runSPQRLQuery) {
this.runSPQRLQuery = runSPQRLQuery;
}
public String indexOnES(String datasource, String collectionId) throws AriadnePlusPublisherException {
String recordsIndexReport = "";
String collectionIndexReport = "";
try {
log.info("Start indexing from "+ datasource + " " + collectionId);
runSPQRLQuery.setupConnection( getWriterUser(), getWriterPwd(), this.graphDBServerUrl, getRepository());
runSPQRLQuery.setParser(parseRDFJSON);
runSPQRLQuery.setResourceManager(resourceManager);
runSPQRLQuery.setBulkUpload(bulkUpload);
List<String> collectionResourceIds = runSPQRLQuery.selectCollectionIds(datasource, collectionId);
log.info(String.format("Found %d collections to index for datasource %s - %s", collectionResourceIds.size(), datasource, collectionId));
try {
if (!collectionResourceIds.isEmpty()) {
final ClassPathResource selectCollectionTemplateRes = new ClassPathResource("eu/dnetlib/ariadneplus/sparql/read_collection_data_template.sparql");
String selectCollectionTemplate = IOUtils.toString(selectCollectionTemplateRes.getInputStream(), StandardCharsets.UTF_8.name());
collectionIndexReport = runSPQRLQuery.executeMultipleQueryGraph(selectCollectionTemplate, collectionResourceIds, datasource, collectionId, true);
}
}
catch (RuntimeException re) {
throw new AriadnePlusPublisherException(re);
}
List<String> recordIds = runSPQRLQuery.selectRecordIds(datasource, collectionId);
log.info(String.format("Found %d individual resources to index for datasource %s - %s", recordIds.size(), datasource, collectionId));
if(!recordIds.isEmpty()) {
final ClassPathResource queryTemplateResource = new ClassPathResource("eu/dnetlib/ariadneplus/sparql/read_record_data_template.sparql");
String queryTemplate = IOUtils.toString(queryTemplateResource.getInputStream(), StandardCharsets.UTF_8.name());
recordsIndexReport = runSPQRLQuery.executeMultipleQueryGraph(queryTemplate, recordIds, datasource, collectionId, false);
}
}catch(Throwable e){
log.error(e);
throw new AriadnePlusPublisherException(e);
}
return "Records: ".concat(recordsIndexReport).concat(" Collection: ").concat(collectionIndexReport);
}
public List<String> selectIdentifiers(String datasource, String collectionId, String resourceType) throws AriadnePlusPublisherException {
List <String> identifiers = Collections.emptyList();
try {
log.info("Select "+resourceType+" Identifiers from "+ datasource + " " + collectionId);
runSPQRLQuery.setupConnection( getWriterUser(), getWriterPwd(), this.graphDBServerUrl, getRepository());
if (resourceType.equals("COLLECTION")) {
identifiers = runSPQRLQuery.selectCollectionIds(datasource, collectionId);
log.info(String.format("Found %d collections for datasource %s - %s", identifiers.size(), datasource, collectionId));
}
else if (resourceType.equals("RECORD")) {
identifiers = runSPQRLQuery.selectRecordIds(datasource, collectionId);
log.info(String.format("Found %d records for datasource %s - %s", identifiers.size(), datasource, collectionId));
}
}catch(Throwable e){
log.error(e);
throw new AriadnePlusPublisherException(e);
}
return identifiers;
}
public String indexOnESByIdentifier(String datasource, String collectionId, String resourceType, String identifier) throws AriadnePlusPublisherException {
String report = "";
try {
if (StringUtils.isBlank(identifier)) {
return "empty identifier";
}
List <String> identifiers = Arrays.asList(identifier);
log.info("Start indexing from "+ datasource + " " + collectionId);
runSPQRLQuery.setupConnection( getWriterUser(), getWriterPwd(), this.graphDBServerUrl, getRepository());
runSPQRLQuery.setParser(parseRDFJSON);
runSPQRLQuery.setResourceManager(resourceManager);
runSPQRLQuery.setBulkUpload(bulkUpload);
if (resourceType.equals("COLLECTION")) {
if (!identifiers.isEmpty()) {
final ClassPathResource selectCollectionTemplateRes = new ClassPathResource("eu/dnetlib/ariadneplus/sparql/read_collection_data_template.sparql");
String selectCollectionTemplate = IOUtils.toString(selectCollectionTemplateRes.getInputStream(), StandardCharsets.UTF_8.name());
report = runSPQRLQuery.executeMultipleQueryGraph(selectCollectionTemplate, identifiers, datasource, collectionId, true);
}
}
else if (resourceType.equals("RECORD")) {
if(!identifiers.isEmpty()) {
final ClassPathResource queryTemplateResource = new ClassPathResource("eu/dnetlib/ariadneplus/sparql/read_record_data_template.sparql");
String queryTemplate = IOUtils.toString(queryTemplateResource.getInputStream(), StandardCharsets.UTF_8.name());
report = runSPQRLQuery.executeMultipleQueryGraph(queryTemplate, identifiers, datasource, collectionId, false);
}
}
}
catch (RuntimeException | IOException re) {
throw new AriadnePlusPublisherException(re);
}
return "Resources: ".concat(report);
}
}