From eaf0dc68a2373afee7967577f9966d19f9b61334 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Tue, 11 Aug 2020 09:17:03 +0200 Subject: [PATCH] fixed indexing --- .../dhp/provision/DropAndCreateESIndex.java | 97 +++++++++++++++++++ .../provision/SparkConvertDatasetToJson.scala | 38 ++++++++ .../provision/SparkIndexCollectionOnES.java | 30 ++---- .../dnetlib/dhp/provision/dataset2Json.json | 14 +++ .../dhp/provision/dropAndCreateIndex.json | 14 +++ .../eu/dnetlib/dhp/provision/index_on_es.json | 13 +-- .../sx/provision/oozie_app/config-default.xml | 4 + .../dhp/sx/provision/oozie_app/workflow.xml | 82 +++++++++++++++- .../provision/DropAndCreateESIndexTest.java | 13 +++ 9 files changed, 271 insertions(+), 34 deletions(-) create mode 100644 dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/DropAndCreateESIndex.java create mode 100644 dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/SparkConvertDatasetToJson.scala create mode 100644 dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/provision/dataset2Json.json create mode 100644 dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/provision/dropAndCreateIndex.json create mode 100644 dhp-workflows/dhp-graph-provision-scholexplorer/src/test/java/eu/dnetlib/dhp/provision/DropAndCreateESIndexTest.java diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/DropAndCreateESIndex.java b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/DropAndCreateESIndex.java new file mode 100644 index 000000000..1b5849f35 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/DropAndCreateESIndex.java @@ -0,0 +1,97 @@ + +package eu.dnetlib.dhp.provision; + +import java.util.Map; + +import org.apache.commons.io.IOUtils; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpDelete; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.codehaus.jackson.map.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; + +public class DropAndCreateESIndex { + + public static void main(String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + DropAndCreateESIndex.class + .getResourceAsStream( + "/eu/dnetlib/dhp/provision/dropAndCreateIndex.json"))); + parser.parseArgument(args); + + final String index = parser.get("index"); + + final String cluster = parser.get("cluster"); + final String clusterJson = IOUtils + .toString(DropAndCreateESIndex.class.getResourceAsStream("/eu/dnetlib/dhp/provision/cluster.json")); + + final Map clusterMap = new ObjectMapper().readValue(clusterJson, Map.class); + + final String ip = clusterMap.get(cluster).split(",")[0]; + + System.out.println(ip); + + final String url = "http://%s:9200/%s_%s"; + + CloseableHttpClient client = HttpClients.createDefault(); + + HttpDelete delete = new HttpDelete(String.format(url, ip, index, "object")); + + CloseableHttpResponse response = client.execute(delete); + + System.out.println("deleting Index SUMMARY"); + System.out.println(response.getStatusLine()); + client.close(); + client = HttpClients.createDefault(); + + delete = new HttpDelete(String.format(url, ip, index, "scholix")); + + response = client.execute(delete); + + System.out.println("deleting Index SCHOLIX"); + System.out.println(response.getStatusLine()); + client.close(); + client = HttpClients.createDefault(); + + final String summaryConf = IOUtils + .toString(DropAndCreateESIndex.class.getResourceAsStream("/eu/dnetlib/dhp/provision/summary_index.json")); + + final String scholixConf = IOUtils + .toString(DropAndCreateESIndex.class.getResourceAsStream("/eu/dnetlib/dhp/provision/scholix_index.json")); + + HttpPut put = new HttpPut(String.format(url, ip, index, "object")); + + StringEntity entity = new StringEntity(summaryConf); + put.setEntity(entity); + put.setHeader("Accept", "application/json"); + put.setHeader("Content-type", "application/json"); + + System.out.println("creating First Index SUMMARY"); + response = client.execute(put); + + client.close(); + client = HttpClients.createDefault(); + + System.out.println(response.getStatusLine()); + + System.out.println("creating Index SCHOLIX"); + put = new HttpPut(String.format(url, ip, index, "scholix")); + + entity = new StringEntity(scholixConf); + put.setEntity(entity); + put.setHeader("Accept", "application/json"); + put.setHeader("Content-type", "application/json"); + + response = client.execute(put); + System.out.println(response.getStatusLine()); + client.close(); + + } +} diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/SparkConvertDatasetToJson.scala b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/SparkConvertDatasetToJson.scala new file mode 100644 index 000000000..81bdb2941 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/SparkConvertDatasetToJson.scala @@ -0,0 +1,38 @@ +package eu.dnetlib.dhp.provision + +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.provision.scholix.Scholix +import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary +import org.apache.commons.io.IOUtils +import org.apache.hadoop.io.compress.GzipCodec +import org.apache.spark.SparkConf +import org.apache.spark.sql.{Encoder, Encoders, SparkSession} +import org.codehaus.jackson.map.ObjectMapper + +object SparkConvertDatasetToJson { + + def main(args: Array[String]): Unit = { + val parser = new ArgumentApplicationParser(IOUtils.toString(SparkConvertDatasetToJson.getClass.getResourceAsStream("/eu/dnetlib/dhp/provision/dataset2Json.json"))) + parser.parseArgument(args) + val conf = new SparkConf + val spark = SparkSession.builder.config(conf).appName(SparkConvertDatasetToJson.getClass.getSimpleName).master(parser.get("master")).getOrCreate + + implicit val summaryEncoder: Encoder[ScholixSummary] = Encoders.kryo[ScholixSummary] + implicit val scholixEncoder: Encoder[Scholix] = Encoders.kryo[Scholix] + + + val workingPath = parser.get("workingPath") + + + + spark.read.load(s"$workingPath/summary").as[ScholixSummary] + .map(s => new ObjectMapper().writeValueAsString(s))(Encoders.STRING) + .rdd.repartition(500).saveAsTextFile(s"$workingPath/summary_json", classOf[GzipCodec]) + + spark.read.load(s"$workingPath/scholix").as[Scholix] + .map(s => new ObjectMapper().writeValueAsString(s))(Encoders.STRING) + .rdd.repartition(2000).saveAsTextFile(s"$workingPath/scholix_json", classOf[GzipCodec]) + + } + +} diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/SparkIndexCollectionOnES.java b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/SparkIndexCollectionOnES.java index 78d873080..1b435bea3 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/SparkIndexCollectionOnES.java +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/SparkIndexCollectionOnES.java @@ -8,15 +8,12 @@ import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary; public class SparkIndexCollectionOnES { @@ -39,33 +36,20 @@ public class SparkIndexCollectionOnES { final String sourcePath = parser.get("sourcePath"); final String index = parser.get("index"); final String idPath = parser.get("idPath"); - final String type = parser.get("type"); - final String indexHost = parser.get("esHost"); + final String cluster = parser.get("cluster"); + final String clusterJson = IOUtils + .toString(DropAndCreateESIndex.class.getResourceAsStream("/eu/dnetlib/dhp/provision/cluster.json")); + + final Map clusterMap = new ObjectMapper().readValue(clusterJson, Map.class); final SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - JavaRDD inputRdd; - - if ("summary".equalsIgnoreCase(type)) - inputRdd = spark - .read() - .load(sourcePath) - .as(Encoders.bean(ScholixSummary.class)) - .map( - (MapFunction) f -> { - final ObjectMapper mapper = new ObjectMapper(); - return mapper.writeValueAsString(f); - }, - Encoders.STRING()) - .javaRDD(); - else - inputRdd = sc.textFile(sourcePath); + JavaRDD inputRdd = sc.textFile(sourcePath); Map esCfg = new HashMap<>(); - // esCfg.put("es.nodes", "10.19.65.51, 10.19.65.52, 10.19.65.53, 10.19.65.54"); - esCfg.put("es.nodes", indexHost); + esCfg.put("es.nodes", clusterMap.get(cluster)); esCfg.put("es.mapping.id", idPath); esCfg.put("es.batch.write.retry.count", "8"); esCfg.put("es.batch.write.retry.wait", "60s"); diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/provision/dataset2Json.json b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/provision/dataset2Json.json new file mode 100644 index 000000000..41db00cbf --- /dev/null +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/provision/dataset2Json.json @@ -0,0 +1,14 @@ +[ + { + "paramName": "m", + "paramLongName": "master", + "paramDescription": "master should be local or yarn", + "paramRequired": true + }, + { + "paramName": "w", + "paramLongName": "workingPath", + "paramDescription": "the working path", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/provision/dropAndCreateIndex.json b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/provision/dropAndCreateIndex.json new file mode 100644 index 000000000..242aca8c5 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/provision/dropAndCreateIndex.json @@ -0,0 +1,14 @@ +[ +{ + "paramName": "c", + "paramLongName": "cluster", + "paramDescription": "should be cluster1 or cluster2", + "paramRequired": true +}, + { + "paramName": "i", + "paramLongName": "index", + "paramDescription": "index name", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/provision/index_on_es.json b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/provision/index_on_es.json index f70f7dd79..51b001a0d 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/provision/index_on_es.json +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/provision/index_on_es.json @@ -18,19 +18,12 @@ "paramRequired": true }, { - "paramName": "h", - "paramLongName": "esHost", - "paramDescription": "the index host name", + "paramName": "c", + "paramLongName": "cluster", + "paramDescription": "the index cluster", "paramRequired": true }, - - { - "paramName": "t", - "paramLongName": "type", - "paramDescription": "should be scholix or summary", - "paramRequired": true - }, { "paramName": "id", "paramLongName": "idPath", diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/config-default.xml index 6fb2a1253..7c1a43e51 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/config-default.xml +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/config-default.xml @@ -7,4 +7,8 @@ oozie.action.sharelib.for.spark spark2 + + oozie.launcher.mapreduce.user.classpath.first + true + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/workflow.xml index 7ce35cee2..eb606fc6e 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/workflow.xml @@ -8,6 +8,14 @@ graphPath the graph path + + index + the index name + + + esCluster + the Index cluster + sparkDriverMemory memory for driver process @@ -18,7 +26,7 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] @@ -82,6 +90,78 @@ --workingDirPath${workingDirPath} --graphPath${graphPath} + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + generate Scholix + eu.dnetlib.dhp.provision.SparkConvertDatasetToJson + dhp-graph-provision-scholexplorer-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.sql.shuffle.partitions=4000 ${sparkExtraOPT} + -m yarn-cluster + --workingPath${workingDirPath} + + + + + + + + + ${jobTracker} + ${nameNode} + eu.dnetlib.dhp.provision.DropAndCreateESIndex + -i${index} + -c${esCluster} + + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + index summary + eu.dnetlib.dhp.provision.SparkIndexCollectionOnES + dhp-graph-provision-scholexplorer-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="8" + -mt yarn-cluster + --sourcePath${workingDirPath}/summary_json + --index${index}_object + --idPathid + --cluster${esCluster} + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + index scholix + eu.dnetlib.dhp.provision.SparkIndexCollectionOnES + dhp-graph-provision-scholexplorer-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="8" + -mt yarn-cluster + --sourcePath${workingDirPath}/summary_json + --index${index}_scholix + --idPathidentifier + --cluster${esCluster} + diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/test/java/eu/dnetlib/dhp/provision/DropAndCreateESIndexTest.java b/dhp-workflows/dhp-graph-provision-scholexplorer/src/test/java/eu/dnetlib/dhp/provision/DropAndCreateESIndexTest.java new file mode 100644 index 000000000..19e8aa699 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/test/java/eu/dnetlib/dhp/provision/DropAndCreateESIndexTest.java @@ -0,0 +1,13 @@ + +package eu.dnetlib.dhp.provision; + +import org.junit.jupiter.api.Test; + +public class DropAndCreateESIndexTest { + + public void testDropAndCreate() throws Exception { + DropAndCreateESIndex.main("-c localhost -i dli_shadow".split(" ")); + + } + +}