new class to handle publish on graphdb operation, transaction throws OOM so currently one write operation at time is executed without transaction
This commit is contained in:
parent
405f623a4c
commit
42eb9604a5
|
@ -1,6 +1,5 @@
|
||||||
package eu.dnetlib.ariadneplus.graphdb;
|
package eu.dnetlib.ariadneplus.graphdb;
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.MalformedURLException;
|
import java.net.MalformedURLException;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
|
@ -12,8 +11,8 @@ import com.google.common.base.Splitter;
|
||||||
import eu.dnetlib.ariadneplus.elasticsearch.BulkUpload;
|
import eu.dnetlib.ariadneplus.elasticsearch.BulkUpload;
|
||||||
import eu.dnetlib.ariadneplus.reader.ResourceManager;
|
import eu.dnetlib.ariadneplus.reader.ResourceManager;
|
||||||
import eu.dnetlib.ariadneplus.reader.RunSPARQLQueryService;
|
import eu.dnetlib.ariadneplus.reader.RunSPARQLQueryService;
|
||||||
|
import eu.dnetlib.ariadneplus.reader.RunSPARQLWriteService;
|
||||||
import eu.dnetlib.ariadneplus.reader.json.ParseRDFJSON;
|
import eu.dnetlib.ariadneplus.reader.json.ParseRDFJSON;
|
||||||
import org.apache.commons.io.FileUtils;
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -51,6 +50,8 @@ public class GraphDBClient {
|
||||||
@Autowired
|
@Autowired
|
||||||
private RunSPARQLQueryService runSPQRLQuery;
|
private RunSPARQLQueryService runSPQRLQuery;
|
||||||
@Autowired
|
@Autowired
|
||||||
|
private RunSPARQLWriteService runSPQRLWrite;
|
||||||
|
@Autowired
|
||||||
private ParseRDFJSON parseRDFJSON;
|
private ParseRDFJSON parseRDFJSON;
|
||||||
@Autowired
|
@Autowired
|
||||||
private ResourceManager resourceManager;
|
private ResourceManager resourceManager;
|
||||||
|
@ -434,4 +435,29 @@ public class GraphDBClient {
|
||||||
}
|
}
|
||||||
return "Resources: ".concat(report);
|
return "Resources: ".concat(report);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public RunSPARQLWriteService getRunSPQRLWrite() {
|
||||||
|
return runSPQRLWrite;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setRunSPQRLWrite(RunSPARQLWriteService runSPQRLWrite) {
|
||||||
|
this.runSPQRLWrite = runSPQRLWrite;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long callFeedService(final String record) throws AriadnePlusPublisherException{
|
||||||
|
try {
|
||||||
|
String objIdentifier = recordParserHelper.getObjIdentifier(record);
|
||||||
|
if (StringUtils.isBlank(objIdentifier)) {
|
||||||
|
log.warn("Got record with no objIdentifier -- skipping");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
String datasourceApi = recordParserHelper.getDatasourceApi(record);
|
||||||
|
String recordURI = getRecordURI(objIdentifier, datasourceApi);
|
||||||
|
runSPQRLWrite.setupConnection( getWriterUser(), getWriterPwd(), this.graphDBServerUrl, getRepository());
|
||||||
|
return runSPQRLWrite.writeRecord(IOUtils.toInputStream(getRDFBlock(record), "UTF-8"), recordURI, datasourceApi, getGraphDBBaseURI());
|
||||||
|
}catch(Throwable e){
|
||||||
|
log.error(e);
|
||||||
|
throw new AriadnePlusPublisherException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -103,7 +103,7 @@ public class AriadnePlusPublisherHelper {
|
||||||
|
|
||||||
private void publishGraphDB(final String record) throws AriadnePlusPublisherException {
|
private void publishGraphDB(final String record) throws AriadnePlusPublisherException {
|
||||||
GraphDBClient graphDBClient = this.graphdbClientFactory.getGraphDBClient();
|
GraphDBClient graphDBClient = this.graphdbClientFactory.getGraphDBClient();
|
||||||
graphDBClient.feed(record);
|
graphDBClient.callFeedService(record);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void feedProvenance(final String datasource, final String datasourceApi) throws AriadnePlusPublisherException {
|
private void feedProvenance(final String datasource, final String datasourceApi) throws AriadnePlusPublisherException {
|
||||||
|
|
|
@ -0,0 +1,118 @@
|
||||||
|
package eu.dnetlib.ariadneplus.reader;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.eclipse.rdf4j.model.ValueFactory;
|
||||||
|
import org.eclipse.rdf4j.repository.Repository;
|
||||||
|
import org.eclipse.rdf4j.repository.RepositoryConnection;
|
||||||
|
import org.eclipse.rdf4j.repository.manager.RemoteRepositoryManager;
|
||||||
|
import org.eclipse.rdf4j.rio.RDFFormat;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import javax.annotation.PreDestroy;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
@Service
|
||||||
|
public class RunSPARQLWriteService {
|
||||||
|
|
||||||
|
private static final Log log = LogFactory.getLog(RunSPARQLWriteService.class);
|
||||||
|
|
||||||
|
private RemoteRepositoryManager manager;
|
||||||
|
private Repository repository;
|
||||||
|
private ValueFactory valueFactory;
|
||||||
|
|
||||||
|
private static String username = null;
|
||||||
|
private static String pwd = null;
|
||||||
|
private static String graphDBUrl = null;
|
||||||
|
private static String graphDBRepository = null;
|
||||||
|
|
||||||
|
@PreDestroy
|
||||||
|
public void preDestroy() {
|
||||||
|
shutDownRepository();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setupConnection(String username, String pwd, String graphDbUrl, String graphDbRepository) {
|
||||||
|
setUsername(username);
|
||||||
|
setPwd(pwd);
|
||||||
|
setGraphDBUrl(graphDbUrl);
|
||||||
|
setGraphDBRepository(graphDbRepository);
|
||||||
|
initRepository();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initRepository(){
|
||||||
|
if (manager==null) {
|
||||||
|
manager = new RemoteRepositoryManager(getGraphDBUrl());
|
||||||
|
manager.init();
|
||||||
|
manager.setUsernameAndPassword(getUsername(), getPwd());
|
||||||
|
repository = manager.getRepository(getGraphDBRepository());
|
||||||
|
valueFactory = repository.getValueFactory();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void shutDownRepository(){
|
||||||
|
if (repository!=null) {
|
||||||
|
repository.shutDown();
|
||||||
|
}
|
||||||
|
if (manager!=null) {
|
||||||
|
manager.shutDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String getUsername() {
|
||||||
|
return username;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String getPwd() {
|
||||||
|
return pwd;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String getGraphDBUrl() {
|
||||||
|
return graphDBUrl;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String getGraphDBRepository() {
|
||||||
|
return graphDBRepository;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void setUsername(String username) {
|
||||||
|
RunSPARQLWriteService.username = username;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void setPwd(String pwd) {
|
||||||
|
RunSPARQLWriteService.pwd = pwd;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void setGraphDBUrl(String graphDBUrl) {
|
||||||
|
RunSPARQLWriteService.graphDBUrl = graphDBUrl;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void setGraphDBRepository(String graphDBRepository) {
|
||||||
|
RunSPARQLWriteService.graphDBRepository = graphDBRepository;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long writeRecord(java.io.InputStream rdfxmlRecord, String recordURI, String datasourceApi, String graphDBBaseURI){
|
||||||
|
RepositoryConnection connection = repository.getConnection();
|
||||||
|
try {
|
||||||
|
// connection.begin(); currently with rdf4j 3.6.3 the transaction throws OOM
|
||||||
|
connection.add(
|
||||||
|
rdfxmlRecord,
|
||||||
|
recordURI,
|
||||||
|
RDFFormat.RDFXML,
|
||||||
|
valueFactory.createIRI(graphDBBaseURI, datasourceApi));
|
||||||
|
// connection.commit();
|
||||||
|
} catch(Exception e){
|
||||||
|
log.error(e);
|
||||||
|
return -1;
|
||||||
|
} finally{
|
||||||
|
if (connection!=null && connection.isOpen()) {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
rdfxmlRecord.close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
log.error(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue