[DOIBoost] configurable ES server url and index name in crossref importer

This commit is contained in:
Claudio Atzori 2021-01-05 10:00:13 +01:00
parent 26e9d55c13
commit 8879704ba0
4 changed files with 81 additions and 71 deletions

View File

@ -2,18 +2,16 @@
package eu.dnetlib.doiboost.crossref; package eu.dnetlib.doiboost.crossref;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.util.Optional;
import java.util.zip.Inflater; import java.util.zip.Inflater;
import org.apache.commons.codec.binary.Base64; import org.apache.commons.codec.binary.Base64;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
@ -30,34 +28,45 @@ public class CrossrefImporter {
parser.parseArgument(args); parser.parseArgument(args);
final String hdfsuri = parser.get("namenode"); final String namenode = parser.get("namenode");
System.out.println("HDFS URI" + hdfsuri); System.out.println("namenode: " + namenode);
Path hdfswritepath = new Path(parser.get("targetPath"));
System.out.println("TargetPath: " + hdfsuri);
final Long timestamp = StringUtils.isNotBlank(parser.get("timestamp")) Path targetPath = new Path(parser.get("targetPath"));
? Long.parseLong(parser.get("timestamp")) System.out.println("targetPath: " + targetPath);
: -1;
if (timestamp > 0) final Long timestamp = Optional
System.out.println("Timestamp added " + timestamp); .ofNullable(parser.get("timestamp"))
.map(s -> {
try {
return Long.parseLong(s);
} catch (NumberFormatException e) {
return -1L;
}
})
.orElse(-1L);
System.out.println("timestamp: " + timestamp);
final String esServer = parser.get("esServer");
System.out.println("esServer: " + esServer);
final String esIndex = parser.get("esIndex");
System.out.println("esIndex: " + esIndex);
// ====== Init HDFS File System Object // ====== Init HDFS File System Object
Configuration conf = new Configuration(); Configuration conf = new Configuration();
// Set FileSystem URI // Set FileSystem URI
conf.set("fs.defaultFS", hdfsuri); conf.set("fs.defaultFS", namenode);
// Because of Maven // Because of Maven
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
ESClient client = timestamp > 0 // "ip-90-147-167-25.ct1.garrservices.it", "crossref"
? new ESClient("ip-90-147-167-25.ct1.garrservices.it", "crossref", timestamp) final ESClient client = new ESClient(esServer, esIndex, timestamp);
: new ESClient("ip-90-147-167-25.ct1.garrservices.it", "crossref");
try (SequenceFile.Writer writer = SequenceFile try (SequenceFile.Writer writer = SequenceFile
.createWriter( .createWriter(
conf, conf,
SequenceFile.Writer.file(hdfswritepath), SequenceFile.Writer.file(targetPath),
SequenceFile.Writer.keyClass(IntWritable.class), SequenceFile.Writer.keyClass(IntWritable.class),
SequenceFile.Writer.valueClass(Text.class))) { SequenceFile.Writer.valueClass(Text.class))) {
@ -74,8 +83,7 @@ public class CrossrefImporter {
end = System.currentTimeMillis(); end = System.currentTimeMillis();
final float time = (end - start) / 1000.0F; final float time = (end - start) / 1000.0F;
System.out System.out
.println( .println(String.format("Imported %s records last 100000 imported in %s seconds", i, time));
String.format("Imported %d records last 100000 imported in %f seconds", i, time));
start = System.currentTimeMillis(); start = System.currentTimeMillis();
} }
} }

View File

@ -1,11 +1,11 @@
package eu.dnetlib.doiboost.crossref; package eu.dnetlib.doiboost.crossref;
import java.io.IOException;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity; import org.apache.http.entity.StringEntity;
@ -17,13 +17,17 @@ import org.slf4j.LoggerFactory;
import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.JsonPath;
public class ESClient implements Iterator<String> { public class ESClient implements Iterator<String> {
private static final Logger logger = LoggerFactory.getLogger(ESClient.class);
static final String blobPath = "$.hits[*].hits[*]._source.blob"; private static final String BLOB_PATH = "$.hits.hits[*]._source.blob";
static final String scrollIdPath = "$._scroll_id"; private static final String SCROLL_ID_PATH = "$._scroll_id";
static final String JSON_NO_TS = "{\"size\":1000}"; private static final String JSON_NO_TS = "{\"size\":1000}";
static final String JSON_WITH_TS = "{\"size\":1000, \"query\":{\"range\":{\"timestamp\":{\"gte\":%d}}}}"; private static final String JSON_WITH_TS = "{\"size\":1000, \"query\":{\"range\":{\"timestamp\":{\"gte\":%d}}}}";
static final String JSON_SCROLL = "{\"scroll_id\":\"%s\",\"scroll\" : \"1m\"}"; private static final String JSON_SCROLL = "{\"scroll_id\":\"%s\",\"scroll\" : \"1m\"}";
public static final String APPLICATION_JSON = "application/json";
public static final String ES_SEARCH_URL = "http://%s:9200/%s/_search?scroll=1m";
public static final String ES_SCROLL_URL = "http://%s:9200/_search/scroll";
private final String scrollId; private final String scrollId;
@ -31,47 +35,30 @@ public class ESClient implements Iterator<String> {
private final String esHost; private final String esHost;
public ESClient(final String esHost, final String esIndex) throws IOException { public ESClient(final String esHost, final String esIndex, final long timestamp) {
this.esHost = esHost; this.esHost = esHost;
final String body = getResponse(
String.format("http://%s:9200/%s/_search?scroll=1m", esHost, esIndex), JSON_NO_TS);
scrollId = getJPathString(scrollIdPath, body);
buffer = getBlobs(body);
}
public ESClient(final String esHost, final String esIndex, final long timestamp) final String body = timestamp > 0
throws IOException { ? getResponse(String.format(ES_SEARCH_URL, esHost, esIndex), String.format(JSON_WITH_TS, timestamp))
this.esHost = esHost; : getResponse(String.format(ES_SEARCH_URL, esHost, esIndex), JSON_NO_TS);
final String body = getResponse( scrollId = getJPathString(SCROLL_ID_PATH, body);
String.format("http://%s:9200/%s/_search?scroll=1m", esHost, esIndex),
String.format(JSON_WITH_TS, timestamp));
scrollId = getJPathString(scrollIdPath, body);
buffer = getBlobs(body); buffer = getBlobs(body);
} }
private String getResponse(final String url, final String json) { private String getResponse(final String url, final String json) {
CloseableHttpClient client = HttpClients.createDefault(); try (CloseableHttpClient client = HttpClients.createDefault()) {
try {
HttpPost httpPost = new HttpPost(url); HttpPost httpPost = new HttpPost(url);
if (json != null) { if (json != null) {
StringEntity entity = new StringEntity(json); StringEntity entity = new StringEntity(json);
httpPost.setEntity(entity); httpPost.setEntity(entity);
httpPost.setHeader("Accept", "application/json"); httpPost.setHeader(HttpHeaders.ACCEPT, APPLICATION_JSON);
httpPost.setHeader("Content-type", "application/json"); httpPost.setHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_JSON);
}
try (CloseableHttpResponse response = client.execute(httpPost)) {
return IOUtils.toString(response.getEntity().getContent());
} }
CloseableHttpResponse response = client.execute(httpPost);
return IOUtils.toString(response.getEntity().getContent());
} catch (Throwable e) { } catch (Throwable e) {
throw new RuntimeException("Error on executing request ", e); throw new RuntimeException("Error on executing request ", e);
} finally {
try {
client.close();
} catch (IOException e) {
throw new RuntimeException("Unable to close client ", e);
}
} }
} }
@ -87,7 +74,7 @@ public class ESClient implements Iterator<String> {
} }
private List<String> getBlobs(final String body) { private List<String> getBlobs(final String body) {
final List<String> res = JsonPath.read(body, "$.hits.hits[*]._source.blob"); final List<String> res = JsonPath.read(body, BLOB_PATH);
return res; return res;
} }
@ -102,11 +89,11 @@ public class ESClient implements Iterator<String> {
if (buffer.isEmpty()) { if (buffer.isEmpty()) {
final String json_param = String.format(JSON_SCROLL, scrollId); final String json_param = String.format(JSON_SCROLL, scrollId);
final String body = getResponse(String.format("http://%s:9200/_search/scroll", esHost), json_param); final String body = getResponse(String.format(ES_SCROLL_URL, esHost), json_param);
try { try {
buffer = getBlobs(body); buffer = getBlobs(body);
} catch (Throwable e) { } catch (Throwable e) {
logger.error("Error on get next page: body:" + body); System.out.println("Error on get next page: body:" + body);
} }
} }
return nextItem; return nextItem;

View File

@ -1,5 +1,7 @@
[ [
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the sequencial file to write", "paramRequired": true}, {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the sequencial file to write", "paramRequired": true},
{"paramName":"n", "paramLongName":"namenode", "paramDescription": "the hive metastore uris", "paramRequired": true}, {"paramName":"n", "paramLongName":"namenode", "paramDescription": "the hive metastore uris", "paramRequired": true},
{"paramName":"ts", "paramLongName":"timestamp", "paramDescription": "timestamp", "paramRequired": false} {"paramName":"ts", "paramLongName":"timestamp", "paramDescription": "timestamp", "paramRequired": false},
{"paramName":"ess", "paramLongName":"esServer", "paramDescription": "elasticsearch server url", "paramRequired": true},
{"paramName":"esi", "paramLongName":"esIndex", "paramDescription": "elasticsearch index name", "paramRequired": true}
] ]

View File

@ -36,7 +36,6 @@
<!-- Crossref Parameters --> <!-- Crossref Parameters -->
<property> <property>
<name>inputPathCrossref</name> <name>inputPathCrossref</name>
<description>the Crossref input path</description> <description>the Crossref input path</description>
@ -45,6 +44,14 @@
<name>crossrefTimestamp</name> <name>crossrefTimestamp</name>
<description>Timestamp for the Crossref incremental Harvesting</description> <description>Timestamp for the Crossref incremental Harvesting</description>
</property> </property>
<property>
<name>esServer</name>
<description>elasticsearch server url for the Crossref Harvesting</description>
</property>
<property>
<name>esIndex</name>
<description>elasticsearch index name for the Crossref Harvesting</description>
</property>
<!-- MAG Parameters --> <!-- MAG Parameters -->
<property> <property>
@ -65,11 +72,19 @@
<description>the ORCID working path</description> <description>the ORCID working path</description>
</property> </property>
</parameters> </parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="resume_from"/> <start to="resume_from"/>
<decision name="resume_from"> <decision name="resume_from">
@ -85,20 +100,18 @@
</switch> </switch>
</decision> </decision>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<action name="ImportCrossRef"> <action name="ImportCrossRef">
<java> <java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.doiboost.crossref.CrossrefImporter</main-class> <main-class>eu.dnetlib.doiboost.crossref.CrossrefImporter</main-class>
<arg>-t</arg><arg>${inputPathCrossref}/index_update</arg> <arg>--targetPath</arg><arg>${inputPathCrossref}/index_update</arg>
<arg>-n</arg><arg>${nameNode}</arg> <arg>--namenode</arg><arg>${nameNode}</arg>
<arg>-ts</arg><arg>${timestamp}</arg> <arg>--esServer</arg><arg>${esServer}</arg>
<arg>--esIndex</arg><arg>${esIndex}</arg>
<arg>--timestamp</arg><arg>${crossrefTimestamp}</arg>
</java> </java>
<ok to="GenerateCrossrefDataset"/> <ok to="GenerateCrossrefDataset"/>
<error to="Kill"/> <error to="Kill"/>