AriadnePlus/dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/PublishGraphDBJobNode.java

138 lines
4.6 KiB
Java

package eu.dnetlib.ariadneplus.workflows.nodes;
import java.io.IOException;
import java.net.ConnectException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import eu.dnetlib.enabling.resultset.client.ResultSetClient;
import eu.dnetlib.msro.workflows.graph.Arc;
import eu.dnetlib.msro.workflows.nodes.AsyncJobNode;
import eu.dnetlib.msro.workflows.procs.Env;
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
import eu.dnetlib.rmi.common.ResultSet;
import eu.dnetlib.rmi.manager.MSROException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpResponse;
import org.apache.http.NameValuePair;
import org.apache.http.client.HttpClient;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.springframework.beans.factory.annotation.Autowired;
public class PublishGraphDBJobNode extends AsyncJobNode {
private static final Log log = LogFactory.getLog(PublishGraphDBJobNode.class);
private String eprParam;
@Autowired
private ResultSetClient resultSetClient;
private String publisherEndpoint;
//for parallel requests to the publisher endpoint
private int nThreads = 5;
private ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
private List<Future<Integer>> resList = Lists.newArrayList();
@Override
protected String execute(final Env env) throws Exception {
final ResultSet<?> rsIn = env.getAttribute(getEprParam(), ResultSet.class);
if ((rsIn == null)) { throw new MSROException("EprParam (" + getEprParam() + ") not found in ENV"); }
int countAll = 0;
int countOk = 0;
Map<Integer, Integer> errors = Maps.newHashMap();
log.info("Publisher endpoint: " + getPublisherEndpoint());
for (String record : getResultSetClient().iter(rsIn, String.class)) {
countAll++;
Future<Integer> res = executorService.submit( () -> {
try {
HttpPost post = new HttpPost(getPublisherEndpoint());
List<NameValuePair> params = Lists.newArrayList();
params.add(new BasicNameValuePair("record", record));
UrlEncodedFormEntity ent = new UrlEncodedFormEntity(params, "UTF-8");
post.setEntity(ent);
HttpClient client = HttpClients.createDefault();
HttpResponse responsePOST = client.execute(post);
int statusCode = responsePOST.getStatusLine().getStatusCode();
switch (statusCode) {
case 200:
return statusCode;
default:
log.error(responsePOST.getStatusLine().getStatusCode() + ": " + responsePOST.getStatusLine().getReasonPhrase());
log.error("Source record causing error: " + record);
errors.merge(statusCode, 1, Integer::sum);
return statusCode;
}
} catch (ConnectException ce) {
throw new MSROException("unable to connect to Publisher endpoint" + getPublisherEndpoint());
}
catch (IOException e) {
e.printStackTrace();
errors.merge(-1, 1, Integer::sum);
}
return -1;
});
resList.add(res);
}
executorService.shutdown();
//now let's wait for the results. We can block ourselves here: we have nothing else to do
log.info("Waiting for responses");
for(Future<Integer> res : resList){
if(res.get() == 200) countOk++;
}
log.info(String.format("Got all responses. Ok %s/%s", countOk, countAll));
env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "countOk", countOk);
env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "countAll", countAll);
env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "errorsMap", new Gson().toJson(errors));
log.info("publishing completed");
if (!errors.isEmpty()) {
log.warn("Problems in publishing: "+countOk+"/"+countAll+" see error maps for details");
}
if(countAll == 0) log.warn("0 resources to publish");
return Arc.DEFAULT_ARC;
}
public String getPublisherEndpoint() {
return publisherEndpoint;
}
public void setPublisherEndpoint(final String publisherEndpoint) {
this.publisherEndpoint = publisherEndpoint;
}
public ResultSetClient getResultSetClient() {
return resultSetClient;
}
public void setResultSetClient(final ResultSetClient resultSetClient) {
this.resultSetClient = resultSetClient;
}
public String getEprParam() {
return eprParam;
}
public void setEprParam(String eprParam) {
this.eprParam = eprParam;
}
}