From 8879704ba0e243bea120673c0d14ce76bb59afd0 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Tue, 5 Jan 2021 10:00:13 +0100 Subject: [PATCH] [DOIBoost] configurable ES server url and index name in crossref importer --- .../doiboost/crossref/CrossrefImporter.java | 46 ++++++++------ .../dnetlib/doiboost/crossref/ESClient.java | 63 ++++++++----------- .../dnetlib/dhp/doiboost/import_from_es.json | 8 ++- .../dhp/doiboost/oozie_app/workflow.xml | 35 +++++++---- 4 files changed, 81 insertions(+), 71 deletions(-) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/CrossrefImporter.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/CrossrefImporter.java index f69a05da1c..cda4983b72 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/CrossrefImporter.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/CrossrefImporter.java @@ -2,18 +2,16 @@ package eu.dnetlib.doiboost.crossref; import java.io.ByteArrayOutputStream; +import java.util.Optional; import java.util.zip.Inflater; import org.apache.commons.codec.binary.Base64; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; @@ -30,34 +28,45 @@ public class CrossrefImporter { parser.parseArgument(args); - final String hdfsuri = parser.get("namenode"); - System.out.println("HDFS URI" + hdfsuri); - Path hdfswritepath = new Path(parser.get("targetPath")); - System.out.println("TargetPath: " + hdfsuri); + final String namenode = parser.get("namenode"); + System.out.println("namenode: " + namenode); - final Long timestamp = StringUtils.isNotBlank(parser.get("timestamp")) - ? Long.parseLong(parser.get("timestamp")) - : -1; + Path targetPath = new Path(parser.get("targetPath")); + System.out.println("targetPath: " + targetPath); - if (timestamp > 0) - System.out.println("Timestamp added " + timestamp); + final Long timestamp = Optional + .ofNullable(parser.get("timestamp")) + .map(s -> { + try { + return Long.parseLong(s); + } catch (NumberFormatException e) { + return -1L; + } + }) + .orElse(-1L); + System.out.println("timestamp: " + timestamp); + + final String esServer = parser.get("esServer"); + System.out.println("esServer: " + esServer); + + final String esIndex = parser.get("esIndex"); + System.out.println("esIndex: " + esIndex); // ====== Init HDFS File System Object Configuration conf = new Configuration(); // Set FileSystem URI - conf.set("fs.defaultFS", hdfsuri); + conf.set("fs.defaultFS", namenode); // Because of Maven conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); - ESClient client = timestamp > 0 - ? new ESClient("ip-90-147-167-25.ct1.garrservices.it", "crossref", timestamp) - : new ESClient("ip-90-147-167-25.ct1.garrservices.it", "crossref"); + // "ip-90-147-167-25.ct1.garrservices.it", "crossref" + final ESClient client = new ESClient(esServer, esIndex, timestamp); try (SequenceFile.Writer writer = SequenceFile .createWriter( conf, - SequenceFile.Writer.file(hdfswritepath), + SequenceFile.Writer.file(targetPath), SequenceFile.Writer.keyClass(IntWritable.class), SequenceFile.Writer.valueClass(Text.class))) { @@ -74,8 +83,7 @@ public class CrossrefImporter { end = System.currentTimeMillis(); final float time = (end - start) / 1000.0F; System.out - .println( - String.format("Imported %d records last 100000 imported in %f seconds", i, time)); + .println(String.format("Imported %s records last 100000 imported in %s seconds", i, time)); start = System.currentTimeMillis(); } } diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/ESClient.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/ESClient.java index e31ccf399b..dcebbbcac8 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/ESClient.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/ESClient.java @@ -1,11 +1,11 @@ package eu.dnetlib.doiboost.crossref; -import java.io.IOException; import java.util.Iterator; import java.util.List; import org.apache.commons.io.IOUtils; +import org.apache.http.HttpHeaders; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.StringEntity; @@ -17,13 +17,17 @@ import org.slf4j.LoggerFactory; import com.jayway.jsonpath.JsonPath; public class ESClient implements Iterator { - private static final Logger logger = LoggerFactory.getLogger(ESClient.class); - static final String blobPath = "$.hits[*].hits[*]._source.blob"; - static final String scrollIdPath = "$._scroll_id"; - static final String JSON_NO_TS = "{\"size\":1000}"; - static final String JSON_WITH_TS = "{\"size\":1000, \"query\":{\"range\":{\"timestamp\":{\"gte\":%d}}}}"; - static final String JSON_SCROLL = "{\"scroll_id\":\"%s\",\"scroll\" : \"1m\"}"; + private static final String BLOB_PATH = "$.hits.hits[*]._source.blob"; + private static final String SCROLL_ID_PATH = "$._scroll_id"; + private static final String JSON_NO_TS = "{\"size\":1000}"; + private static final String JSON_WITH_TS = "{\"size\":1000, \"query\":{\"range\":{\"timestamp\":{\"gte\":%d}}}}"; + private static final String JSON_SCROLL = "{\"scroll_id\":\"%s\",\"scroll\" : \"1m\"}"; + + public static final String APPLICATION_JSON = "application/json"; + + public static final String ES_SEARCH_URL = "http://%s:9200/%s/_search?scroll=1m"; + public static final String ES_SCROLL_URL = "http://%s:9200/_search/scroll"; private final String scrollId; @@ -31,47 +35,30 @@ public class ESClient implements Iterator { private final String esHost; - public ESClient(final String esHost, final String esIndex) throws IOException { - + public ESClient(final String esHost, final String esIndex, final long timestamp) { this.esHost = esHost; - final String body = getResponse( - String.format("http://%s:9200/%s/_search?scroll=1m", esHost, esIndex), JSON_NO_TS); - scrollId = getJPathString(scrollIdPath, body); - buffer = getBlobs(body); - } - public ESClient(final String esHost, final String esIndex, final long timestamp) - throws IOException { - this.esHost = esHost; - final String body = getResponse( - String.format("http://%s:9200/%s/_search?scroll=1m", esHost, esIndex), - String.format(JSON_WITH_TS, timestamp)); - scrollId = getJPathString(scrollIdPath, body); + final String body = timestamp > 0 + ? getResponse(String.format(ES_SEARCH_URL, esHost, esIndex), String.format(JSON_WITH_TS, timestamp)) + : getResponse(String.format(ES_SEARCH_URL, esHost, esIndex), JSON_NO_TS); + scrollId = getJPathString(SCROLL_ID_PATH, body); buffer = getBlobs(body); } private String getResponse(final String url, final String json) { - CloseableHttpClient client = HttpClients.createDefault(); - try { - + try (CloseableHttpClient client = HttpClients.createDefault()) { HttpPost httpPost = new HttpPost(url); if (json != null) { StringEntity entity = new StringEntity(json); httpPost.setEntity(entity); - httpPost.setHeader("Accept", "application/json"); - httpPost.setHeader("Content-type", "application/json"); + httpPost.setHeader(HttpHeaders.ACCEPT, APPLICATION_JSON); + httpPost.setHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_JSON); + } + try (CloseableHttpResponse response = client.execute(httpPost)) { + return IOUtils.toString(response.getEntity().getContent()); } - CloseableHttpResponse response = client.execute(httpPost); - - return IOUtils.toString(response.getEntity().getContent()); } catch (Throwable e) { throw new RuntimeException("Error on executing request ", e); - } finally { - try { - client.close(); - } catch (IOException e) { - throw new RuntimeException("Unable to close client ", e); - } } } @@ -87,7 +74,7 @@ public class ESClient implements Iterator { } private List getBlobs(final String body) { - final List res = JsonPath.read(body, "$.hits.hits[*]._source.blob"); + final List res = JsonPath.read(body, BLOB_PATH); return res; } @@ -102,11 +89,11 @@ public class ESClient implements Iterator { if (buffer.isEmpty()) { final String json_param = String.format(JSON_SCROLL, scrollId); - final String body = getResponse(String.format("http://%s:9200/_search/scroll", esHost), json_param); + final String body = getResponse(String.format(ES_SCROLL_URL, esHost), json_param); try { buffer = getBlobs(body); } catch (Throwable e) { - logger.error("Error on get next page: body:" + body); + System.out.println("Error on get next page: body:" + body); } } return nextItem; diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/import_from_es.json b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/import_from_es.json index 87a138d525..0920d516a4 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/import_from_es.json +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/import_from_es.json @@ -1,5 +1,7 @@ [ - {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the sequencial file to write", "paramRequired": true}, - {"paramName":"n", "paramLongName":"namenode", "paramDescription": "the hive metastore uris", "paramRequired": true}, - {"paramName":"ts", "paramLongName":"timestamp", "paramDescription": "timestamp", "paramRequired": false} + {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the sequencial file to write", "paramRequired": true}, + {"paramName":"n", "paramLongName":"namenode", "paramDescription": "the hive metastore uris", "paramRequired": true}, + {"paramName":"ts", "paramLongName":"timestamp", "paramDescription": "timestamp", "paramRequired": false}, + {"paramName":"ess", "paramLongName":"esServer", "paramDescription": "elasticsearch server url", "paramRequired": true}, + {"paramName":"esi", "paramLongName":"esIndex", "paramDescription": "elasticsearch index name", "paramRequired": true} ] \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml index bc859d0e05..9464c1a615 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml @@ -36,7 +36,6 @@ - inputPathCrossref the Crossref input path @@ -45,6 +44,14 @@ crossrefTimestamp Timestamp for the Crossref incremental Harvesting + + esServer + elasticsearch server url for the Crossref Harvesting + + + esIndex + elasticsearch index name for the Crossref Harvesting + @@ -65,11 +72,19 @@ the ORCID working path - - - + + ${jobTracker} + ${nameNode} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + @@ -85,20 +100,18 @@ - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - ${jobTracker} - ${nameNode} eu.dnetlib.doiboost.crossref.CrossrefImporter - -t${inputPathCrossref}/index_update - -n${nameNode} - -ts${timestamp} + --targetPath${inputPathCrossref}/index_update + --namenode${nameNode} + --esServer${esServer} + --esIndex${esIndex} + --timestamp${crossrefTimestamp}