after records publishing the datasourceApi informations are saved on GraphDB

This commit is contained in:
Enrico Ottonello 2019-12-13 14:55:51 +01:00
parent 3f8a4e9a46
commit 0f6f2e7b75
2 changed files with 69 additions and 15 deletions

View File

@ -8,16 +8,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import eu.dnetlib.enabling.resultset.client.ResultSetClient;
import eu.dnetlib.msro.workflows.graph.Arc;
import eu.dnetlib.msro.workflows.nodes.AsyncJobNode;
import eu.dnetlib.msro.workflows.procs.Env;
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
import eu.dnetlib.rmi.common.ResultSet;
import eu.dnetlib.rmi.manager.MSROException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpResponse;
@ -29,6 +19,18 @@ import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.springframework.beans.factory.annotation.Autowired;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import eu.dnetlib.enabling.resultset.client.ResultSetClient;
import eu.dnetlib.msro.workflows.graph.Arc;
import eu.dnetlib.msro.workflows.nodes.AsyncJobNode;
import eu.dnetlib.msro.workflows.procs.Env;
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
import eu.dnetlib.rmi.common.ResultSet;
import eu.dnetlib.rmi.manager.MSROException;
public class PublishGraphDBJobNode extends AsyncJobNode {
@ -40,6 +42,7 @@ public class PublishGraphDBJobNode extends AsyncJobNode {
private ResultSetClient resultSetClient;
private String publisherEndpoint;
private String datasourceInterface;
//for parallel requests to the publisher endpoint
private int nThreads = 5;
@ -55,13 +58,12 @@ public class PublishGraphDBJobNode extends AsyncJobNode {
int countAll = 0;
int countOk = 0;
Map<Integer, Integer> errors = Maps.newHashMap();
log.info("Publisher endpoint: " + getPublisherEndpoint());
log.info("Publish endpoint: " + getPublishEndpoint());
for (String record : getResultSetClient().iter(rsIn, String.class)) {
countAll++;
Future<Integer> res = executorService.submit( () -> {
try {
HttpPost post = new HttpPost(getPublisherEndpoint());
HttpPost post = new HttpPost(getPublishEndpoint());
List<NameValuePair> params = Lists.newArrayList();
params.add(new BasicNameValuePair("record", record));
UrlEncodedFormEntity ent = new UrlEncodedFormEntity(params, "UTF-8");
@ -79,7 +81,7 @@ public class PublishGraphDBJobNode extends AsyncJobNode {
return statusCode;
}
} catch (ConnectException ce) {
throw new MSROException("unable to connect to Publisher endpoint" + getPublisherEndpoint());
throw new MSROException("unable to connect to Publisher endpoint" + getPublishEndpoint());
}
catch (IOException e) {
e.printStackTrace();
@ -96,7 +98,7 @@ public class PublishGraphDBJobNode extends AsyncJobNode {
for(Future<Integer> res : resList){
if(res.get() == 200) countOk++;
}
log.info(String.format("Got all responses. Ok %s/%s", countOk, countAll));
log.info(String.format("Got all responses. Ok responses: %s/%s", countOk, countAll));
env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "countOk", countOk);
env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "countAll", countAll);
@ -107,6 +109,40 @@ public class PublishGraphDBJobNode extends AsyncJobNode {
log.warn("Problems in publishing: "+countOk+"/"+countAll+" see error maps for details");
}
if(countAll == 0) log.warn("0 resources to publish");
env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "countOk", countOk);
env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "countAll", countAll);
env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "errorsMap", new Gson().toJson(errors));
if (countOk > 0) {
log.info("Feed provenance endpoint: " + getProvenanceFeedEndpoint());
try {
HttpPost post = new HttpPost(getProvenanceFeedEndpoint());
List<NameValuePair> params = Lists.newArrayList();
String datasourceInterfaceValue = getDatasourceInterface();
log.info("feeding provenance for datasourceInterface " + datasourceInterfaceValue);
params.add(new BasicNameValuePair("datasourceApi", datasourceInterfaceValue));
UrlEncodedFormEntity ent = new UrlEncodedFormEntity(params, "UTF-8");
post.setEntity(ent);
HttpClient client = HttpClients.createDefault();
HttpResponse responsePOST = client.execute(post);
int statusCode = responsePOST.getStatusLine().getStatusCode();
switch (statusCode) {
case 200:
log.info("feed provenance completed");
break;
default:
log.error("error feeding provenance " + responsePOST.getStatusLine().getStatusCode() + ": " + responsePOST.getStatusLine().getReasonPhrase());
break;
}
} catch (ConnectException ce) {
throw new MSROException("unable to connect to Publisher endpoint" + getPublishEndpoint());
}
catch (IOException e) {
log.error("error feeding provenance ", e);
}
}
return Arc.DEFAULT_ARC;
}
@ -114,6 +150,14 @@ public class PublishGraphDBJobNode extends AsyncJobNode {
return publisherEndpoint;
}
private String getPublishEndpoint() {
return publisherEndpoint.concat("/publish");
}
private String getProvenanceFeedEndpoint() {
return publisherEndpoint.concat("/feedProvenance");
}
public void setPublisherEndpoint(final String publisherEndpoint) {
this.publisherEndpoint = publisherEndpoint;
}
@ -134,4 +178,13 @@ public class PublishGraphDBJobNode extends AsyncJobNode {
this.eprParam = eprParam;
}
public String getDatasourceInterface() {
return datasourceInterface;
}
public void setDatasourceInterface(String datasourceInterface) {
this.datasourceInterface = datasourceInterface;
}
}

View File

@ -30,6 +30,7 @@
<PARAMETERS>
<PARAM name="eprParam" value="clean_epr"/>
<PARAM name="publisherEndpoint" ref="publisherEndpoint"/>
<PARAM name="datasourceInterface" ref="interface"/>
</PARAMETERS>
<ARCS>
<ARC to="success"/>