diff --git a/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/graphdb/GraphDBClient.java b/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/graphdb/GraphDBClient.java index c587211..67dc991 100644 --- a/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/graphdb/GraphDBClient.java +++ b/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/graphdb/GraphDBClient.java @@ -1,6 +1,5 @@ package eu.dnetlib.ariadneplus.graphdb; -import java.io.File; import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; @@ -12,8 +11,8 @@ import com.google.common.base.Splitter; import eu.dnetlib.ariadneplus.elasticsearch.BulkUpload; import eu.dnetlib.ariadneplus.reader.ResourceManager; import eu.dnetlib.ariadneplus.reader.RunSPARQLQueryService; +import eu.dnetlib.ariadneplus.reader.RunSPARQLWriteService; import eu.dnetlib.ariadneplus.reader.json.ParseRDFJSON; -import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; @@ -51,6 +50,8 @@ public class GraphDBClient { @Autowired private RunSPARQLQueryService runSPQRLQuery; @Autowired + private RunSPARQLWriteService runSPQRLWrite; + @Autowired private ParseRDFJSON parseRDFJSON; @Autowired private ResourceManager resourceManager; @@ -434,4 +435,29 @@ public class GraphDBClient { } return "Resources: ".concat(report); } + + public RunSPARQLWriteService getRunSPQRLWrite() { + return runSPQRLWrite; + } + + public void setRunSPQRLWrite(RunSPARQLWriteService runSPQRLWrite) { + this.runSPQRLWrite = runSPQRLWrite; + } + + public long callFeedService(final String record) throws AriadnePlusPublisherException{ + try { + String objIdentifier = recordParserHelper.getObjIdentifier(record); + if (StringUtils.isBlank(objIdentifier)) { + log.warn("Got record with no objIdentifier -- skipping"); + return 0; + } + String datasourceApi = recordParserHelper.getDatasourceApi(record); + String recordURI = getRecordURI(objIdentifier, datasourceApi); + runSPQRLWrite.setupConnection( getWriterUser(), getWriterPwd(), this.graphDBServerUrl, getRepository()); + return runSPQRLWrite.writeRecord(IOUtils.toInputStream(getRDFBlock(record), "UTF-8"), recordURI, datasourceApi, getGraphDBBaseURI()); + }catch(Throwable e){ + log.error(e); + throw new AriadnePlusPublisherException(e); + } + } } diff --git a/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/publisher/AriadnePlusPublisherHelper.java b/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/publisher/AriadnePlusPublisherHelper.java index 368f18f..79b236e 100644 --- a/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/publisher/AriadnePlusPublisherHelper.java +++ b/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/publisher/AriadnePlusPublisherHelper.java @@ -103,7 +103,7 @@ public class AriadnePlusPublisherHelper { private void publishGraphDB(final String record) throws AriadnePlusPublisherException { GraphDBClient graphDBClient = this.graphdbClientFactory.getGraphDBClient(); - graphDBClient.feed(record); + graphDBClient.callFeedService(record); } private void feedProvenance(final String datasource, final String datasourceApi) throws AriadnePlusPublisherException { diff --git a/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/reader/RunSPARQLWriteService.java b/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/reader/RunSPARQLWriteService.java new file mode 100644 index 0000000..d460d33 --- /dev/null +++ b/dnet-ariadneplus-graphdb-publisher/src/main/java/eu/dnetlib/ariadneplus/reader/RunSPARQLWriteService.java @@ -0,0 +1,118 @@ +package eu.dnetlib.ariadneplus.reader; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.eclipse.rdf4j.model.ValueFactory; +import org.eclipse.rdf4j.repository.Repository; +import org.eclipse.rdf4j.repository.RepositoryConnection; +import org.eclipse.rdf4j.repository.manager.RemoteRepositoryManager; +import org.eclipse.rdf4j.rio.RDFFormat; +import org.springframework.stereotype.Service; + +import javax.annotation.PreDestroy; +import java.io.IOException; + +@Service +public class RunSPARQLWriteService { + + private static final Log log = LogFactory.getLog(RunSPARQLWriteService.class); + + private RemoteRepositoryManager manager; + private Repository repository; + private ValueFactory valueFactory; + + private static String username = null; + private static String pwd = null; + private static String graphDBUrl = null; + private static String graphDBRepository = null; + + @PreDestroy + public void preDestroy() { + shutDownRepository(); + } + + public void setupConnection(String username, String pwd, String graphDbUrl, String graphDbRepository) { + setUsername(username); + setPwd(pwd); + setGraphDBUrl(graphDbUrl); + setGraphDBRepository(graphDbRepository); + initRepository(); + } + + private void initRepository(){ + if (manager==null) { + manager = new RemoteRepositoryManager(getGraphDBUrl()); + manager.init(); + manager.setUsernameAndPassword(getUsername(), getPwd()); + repository = manager.getRepository(getGraphDBRepository()); + valueFactory = repository.getValueFactory(); + } + } + + private void shutDownRepository(){ + if (repository!=null) { + repository.shutDown(); + } + if (manager!=null) { + manager.shutDown(); + } + } + + public static String getUsername() { + return username; + } + + public static String getPwd() { + return pwd; + } + + public static String getGraphDBUrl() { + return graphDBUrl; + } + + public static String getGraphDBRepository() { + return graphDBRepository; + } + + public static void setUsername(String username) { + RunSPARQLWriteService.username = username; + } + + public static void setPwd(String pwd) { + RunSPARQLWriteService.pwd = pwd; + } + + public static void setGraphDBUrl(String graphDBUrl) { + RunSPARQLWriteService.graphDBUrl = graphDBUrl; + } + + public static void setGraphDBRepository(String graphDBRepository) { + RunSPARQLWriteService.graphDBRepository = graphDBRepository; + } + + public long writeRecord(java.io.InputStream rdfxmlRecord, String recordURI, String datasourceApi, String graphDBBaseURI){ + RepositoryConnection connection = repository.getConnection(); + try { +// connection.begin(); currently with rdf4j 3.6.3 the transaction throws OOM + connection.add( + rdfxmlRecord, + recordURI, + RDFFormat.RDFXML, + valueFactory.createIRI(graphDBBaseURI, datasourceApi)); +// connection.commit(); + } catch(Exception e){ + log.error(e); + return -1; + } finally{ + if (connection!=null && connection.isOpen()) { + connection.close(); + } + try { + rdfxmlRecord.close(); + } catch (IOException e) { + log.error(e); + } + } + return 0; + } +}