From 9f5a0f3ab67564d6da0460ae9e4f9898177b1fe6 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Fri, 9 Jul 2021 12:06:43 +0200 Subject: [PATCH] moved wf indexing of Scholexplorer in dhp-graph-provision --- dhp-workflows/dhp-graph-provision/pom.xml | 6 + .../sx/provision/DropAndCreateESIndex.java | 107 ++++++ .../provision/SparkIndexCollectionOnES.java | 58 +++ .../eu/dnetlib/dhp/sx/provision/cluster.json | 4 + .../dhp/sx/provision/dropAndCreateIndex.json | 14 + .../dnetlib/dhp/sx/provision/index_on_es.json | 32 ++ .../sx/provision/oozie_app/config-default.xml | 14 + .../dhp/sx/provision/oozie_app/workflow.xml | 89 +++++ .../dhp/sx/provision/scholix_index.json | 331 ++++++++++++++++++ .../dhp/sx/provision/summary_index.json | 132 +++++++ 10 files changed, 787 insertions(+) create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/DropAndCreateESIndex.java create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkIndexCollectionOnES.java create mode 100644 dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/cluster.json create mode 100644 dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/dropAndCreateIndex.json create mode 100644 dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/index_on_es.json create mode 100644 dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/workflow.xml create mode 100644 dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/scholix_index.json create mode 100644 dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/summary_index.json diff --git a/dhp-workflows/dhp-graph-provision/pom.xml b/dhp-workflows/dhp-graph-provision/pom.xml index e9bbf63c4..c279436d7 100644 --- a/dhp-workflows/dhp-graph-provision/pom.xml +++ b/dhp-workflows/dhp-graph-provision/pom.xml @@ -140,6 +140,12 @@ httpmime + + org.elasticsearch + elasticsearch-hadoop + + + org.noggit noggit diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/DropAndCreateESIndex.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/DropAndCreateESIndex.java new file mode 100644 index 000000000..8a36f260d --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/DropAndCreateESIndex.java @@ -0,0 +1,107 @@ +package eu.dnetlib.dhp.sx.provision; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import org.apache.commons.io.IOUtils; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpDelete; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Objects; + +public class DropAndCreateESIndex { + + private static final Logger log = LoggerFactory.getLogger(DropAndCreateESIndex.class); + public static final String STATUS_CODE_TEXT = "status code: {}"; + public static final String APPLICATION_JSON = "application/json"; + + public static void main(String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + Objects.requireNonNull(DropAndCreateESIndex.class + .getResourceAsStream( + "/eu/dnetlib/dhp/sx/provision/dropAndCreateIndex.json")))); + parser.parseArgument(args); + + final String index = parser.get("index"); + + final String cluster = parser.get("cluster"); + final String clusterJson = IOUtils + .toString(Objects.requireNonNull(DropAndCreateESIndex.class.getResourceAsStream("/eu/dnetlib/dhp/sx/provision/cluster.json"))); + + + Map clusterMap = new ObjectMapper().readValue(clusterJson,Map.class ); + + final String ip = clusterMap.get(cluster).split(",")[0]; + + + final String url = "http://%s:9200/%s_%s"; + + try(CloseableHttpClient client = HttpClients.createDefault()) { + + HttpDelete delete = new HttpDelete(String.format(url, ip, index, "object")); + + CloseableHttpResponse response = client.execute(delete); + + log.info("deleting Index SUMMARY"); + log.info(STATUS_CODE_TEXT,response.getStatusLine()); + } + + + try(CloseableHttpClient client = HttpClients.createDefault()) { + + HttpDelete delete = new HttpDelete(String.format(url, ip, index, "scholix")); + + CloseableHttpResponse response = client.execute(delete); + + log.info("deleting Index SCHOLIX"); + log.info(STATUS_CODE_TEXT,response.getStatusLine()); + } + + try(CloseableHttpClient client = HttpClients.createDefault()) { + + final String summaryConf = IOUtils + .toString(Objects.requireNonNull(DropAndCreateESIndex.class.getResourceAsStream("/eu/dnetlib/dhp/sx/provision/summary_index.json"))); + + + + HttpPut put = new HttpPut(String.format(url, ip, index, "object")); + + StringEntity entity = new StringEntity(summaryConf); + put.setEntity(entity); + put.setHeader("Accept", APPLICATION_JSON); + put.setHeader("Content-type", APPLICATION_JSON); + + log.info("creating First Index SUMMARY"); + CloseableHttpResponse response = client.execute(put); + log.info(STATUS_CODE_TEXT,response.getStatusLine()); + + } + try(CloseableHttpClient client = HttpClients.createDefault()) { + + final String scholixConf = IOUtils + .toString(Objects.requireNonNull(DropAndCreateESIndex.class.getResourceAsStream("/eu/dnetlib/dhp/sx/provision/scholix_index.json"))); + + + log.info("creating Index SCHOLIX"); + final HttpPut put = new HttpPut(String.format(url, ip, index, "scholix")); + + final StringEntity entity = new StringEntity(scholixConf); + put.setEntity(entity); + put.setHeader("Accept", APPLICATION_JSON); + put.setHeader("Content-type", APPLICATION_JSON); + + final CloseableHttpResponse response = client.execute(put); + log.info(STATUS_CODE_TEXT, response.getStatusLine()); + } + + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkIndexCollectionOnES.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkIndexCollectionOnES.java new file mode 100644 index 000000000..9845534e3 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkIndexCollectionOnES.java @@ -0,0 +1,58 @@ +package eu.dnetlib.dhp.sx.provision; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; +import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; + +public class SparkIndexCollectionOnES { + + public static void main(String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + Objects.requireNonNull(SparkIndexCollectionOnES.class + .getResourceAsStream( + "/eu/dnetlib/dhp/sx/provision/index_on_es.json")))); + parser.parseArgument(args); + + SparkConf conf = new SparkConf() + .setAppName(SparkIndexCollectionOnES.class.getSimpleName()) + .setMaster(parser.get("master")); + + final String sourcePath = parser.get("sourcePath"); + final String index = parser.get("index"); + final String idPath = parser.get("idPath"); + final String cluster = parser.get("cluster"); + final String clusterJson = IOUtils + .toString(Objects.requireNonNull(DropAndCreateESIndex.class.getResourceAsStream("/eu/dnetlib/dhp/sx/provision/cluster.json"))); + + final Map clusterMap = new ObjectMapper().readValue(clusterJson, Map.class); + + final SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); + + try (final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext())) { + + JavaRDD inputRdd = sc.textFile(sourcePath); + + Map esCfg = new HashMap<>(); + esCfg.put("es.nodes", clusterMap.get(cluster)); + esCfg.put("es.mapping.id", idPath); + esCfg.put("es.batch.write.retry.count", "8"); + esCfg.put("es.batch.write.retry.wait", "60s"); + esCfg.put("es.batch.size.entries", "200"); + esCfg.put("es.nodes.wan.only", "true"); + JavaEsSpark.saveJsonToEs(inputRdd, index, esCfg); + } + } +} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/cluster.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/cluster.json new file mode 100644 index 000000000..1cea6a8b9 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/cluster.json @@ -0,0 +1,4 @@ +{ + "cluster1": "10.19.65.51, 10.19.65.52, 10.19.65.53, 10.19.65.54", + "cluster2": "10.19.65.55, 10.19.65.56, 10.19.65.57, 10.19.65.58" +} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/dropAndCreateIndex.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/dropAndCreateIndex.json new file mode 100644 index 000000000..3810f889b --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/dropAndCreateIndex.json @@ -0,0 +1,14 @@ +[ + { + "paramName": "c", + "paramLongName": "cluster", + "paramDescription": "should be cluster1 or cluster2", + "paramRequired": true + }, + { + "paramName": "i", + "paramLongName": "index", + "paramDescription": "index name", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/index_on_es.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/index_on_es.json new file mode 100644 index 000000000..a005bde36 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/index_on_es.json @@ -0,0 +1,32 @@ +[ + { + "paramName": "mt", + "paramLongName": "master", + "paramDescription": "should be local or yarn", + "paramRequired": true + }, + { + "paramName": "s", + "paramLongName": "sourcePath", + "paramDescription": "the working path where generated files", + "paramRequired": true + }, + { + "paramName": "i", + "paramLongName": "index", + "paramDescription": "the index name", + "paramRequired": true + }, + { + "paramName": "c", + "paramLongName": "cluster", + "paramDescription": "the index cluster", + "paramRequired": true + }, + { + "paramName": "id", + "paramLongName": "idPath", + "paramDescription": "the identifier field name", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/config-default.xml new file mode 100644 index 000000000..7c1a43e51 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/config-default.xml @@ -0,0 +1,14 @@ + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + oozie.launcher.mapreduce.user.classpath.first + true + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/workflow.xml new file mode 100644 index 000000000..15903c065 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/workflow.xml @@ -0,0 +1,89 @@ + + + + sourcePath + the sourcePath of the json RDDs + + + index + the index name + + + esCluster + the Index cluster + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + ${jobTracker} + ${nameNode} + eu.dnetlib.dhp.sx.provision.DropAndCreateESIndex + -i${index} + -c${esCluster} + + + + + + + + + yarn + cluster + Index summary + eu.dnetlib.dhp.sx.graph.SparkCreateInputGraph + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --conf spark.dynamicAllocation.maxExecutors="8" + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --masteryarn + --sourcePath${sourcePath}/summaries_json + --index${index}_object + --idPathid + --cluster${esCluster} + + + + + + + + yarn + cluster + Index summary + eu.dnetlib.dhp.sx.graph.SparkCreateInputGraph + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --conf spark.dynamicAllocation.maxExecutors="8" + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --masteryarn + --sourcePath${sourcePath}/scholix_json + --index${index}_scholix + --idPathidentifier + --cluster${esCluster} + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/scholix_index.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/scholix_index.json new file mode 100644 index 000000000..93032712a --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/scholix_index.json @@ -0,0 +1,331 @@ +{ + "mappings": { + "properties": { + "identifier": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "linkprovider": { + "type": "nested", + "properties": { + "identifiers": { + "properties": { + "identifier": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "schema": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + }, + "name": { + "type": "keyword" + } + } + }, + "publicationDate": { + "type": "keyword" + }, + "relationship": { + "properties": { + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "schema": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + }, + "source": { + "type": "nested", + "properties": { + "collectedFrom": { + "properties": { + "completionStatus": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "provider": { + "properties": { + "identifiers": { + "properties": { + "identifier": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "schema": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + }, + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + }, + "provisionMode": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + }, + "creator": { + "properties": { + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + }, + "dnetIdentifier": { + "type": "keyword" + }, + "identifier": { + "type": "nested", + "properties": { + "identifier": { + "type": "keyword" + }, + "schema": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "type": { + "type": "keyword" + } + } + }, + "objectType": { + "type": "keyword" + }, + "publicationDate": { + "type": "keyword" + }, + "publisher": { + "type": "nested", + "properties": { + "name": { + "type": "keyword" + } + } + }, + "title": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + }, + "target": { + "type": "nested", + "properties": { + "collectedFrom": { + "properties": { + "completionStatus": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "provider": { + "properties": { + "identifiers": { + "properties": { + "identifier": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "schema": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + }, + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + }, + "provisionMode": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + }, + "creator": { + "properties": { + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + }, + "dnetIdentifier": { + "type": "keyword" + }, + "identifier": { + "type": "nested", + "properties": { + "identifier": { + "type": "keyword" + }, + "schema": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "type": { + "type": "keyword" + } + } + }, + "objectType": { + "type": "keyword" + }, + "publicationDate": { + "type": "keyword" + }, + "publisher": { + "type": "nested", + "properties": { + "name": { + "type": "keyword" + } + } + }, + "title": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + } + } + }, + "settings": { + "index": { + "refresh_interval": "600s", + "number_of_shards": "48", + "translog": { + "sync_interval": "15s", + "durability": "ASYNC" + }, + "analysis": { + "analyzer": { + "analyzer_keyword": { + "filter": "lowercase", + "tokenizer": "keyword" + } + } + }, + "number_of_replicas": "0" + } + } +} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/summary_index.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/summary_index.json new file mode 100644 index 000000000..72839714c --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/summary_index.json @@ -0,0 +1,132 @@ +{ + "mappings": { + "properties": { + "abstract": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "author": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "datasources": { + "type": "nested", + "properties": { + "completionStatus": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "datasourceId": { + "type": "keyword" + }, + "datasourceName": { + "type": "keyword" + } + } + }, + "date": { + "type": "keyword" + }, + "id": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "localIdentifier": { + "type": "nested", + "properties": { + "id": { + "type": "keyword" + }, + "type": { + "type": "keyword" + } + } + }, + "publisher": { + "type": "keyword" + }, + "relatedDatasets": { + "type": "long" + }, + "relatedPublications": { + "type": "long" + }, + "relatedUnknown": { + "type": "long" + }, + "subject": { + "properties": { + "scheme": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "value": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + }, + "title": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "typology": { + "type": "keyword" + } + } + }, + "settings": { + "index": { + "refresh_interval": "600s", + "number_of_shards": "48", + "translog": { + "sync_interval": "15s", + "durability": "ASYNC" + }, + "analysis": { + "analyzer": { + "analyzer_keyword": { + "filter": "lowercase", + "tokenizer": "keyword" + } + } + }, + "number_of_replicas": "0" + } + } +} \ No newline at end of file