diff --git a/dhp-workflows/dhp-graph-provision/pom.xml b/dhp-workflows/dhp-graph-provision/pom.xml
index e9bbf63c43..c279436d79 100644
--- a/dhp-workflows/dhp-graph-provision/pom.xml
+++ b/dhp-workflows/dhp-graph-provision/pom.xml
@@ -140,6 +140,12 @@
httpmime
+
+ org.elasticsearch
+ elasticsearch-hadoop
+
+
+
org.noggit
noggit
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/DropAndCreateESIndex.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/DropAndCreateESIndex.java
new file mode 100644
index 0000000000..8a36f260dd
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/DropAndCreateESIndex.java
@@ -0,0 +1,107 @@
+package eu.dnetlib.dhp.sx.provision;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import org.apache.commons.io.IOUtils;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Objects;
+
+public class DropAndCreateESIndex {
+
+ private static final Logger log = LoggerFactory.getLogger(DropAndCreateESIndex.class);
+ public static final String STATUS_CODE_TEXT = "status code: {}";
+ public static final String APPLICATION_JSON = "application/json";
+
+ public static void main(String[] args) throws Exception {
+
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils
+ .toString(
+ Objects.requireNonNull(DropAndCreateESIndex.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/sx/provision/dropAndCreateIndex.json"))));
+ parser.parseArgument(args);
+
+ final String index = parser.get("index");
+
+ final String cluster = parser.get("cluster");
+ final String clusterJson = IOUtils
+ .toString(Objects.requireNonNull(DropAndCreateESIndex.class.getResourceAsStream("/eu/dnetlib/dhp/sx/provision/cluster.json")));
+
+
+ Map clusterMap = new ObjectMapper().readValue(clusterJson,Map.class );
+
+ final String ip = clusterMap.get(cluster).split(",")[0];
+
+
+ final String url = "http://%s:9200/%s_%s";
+
+ try(CloseableHttpClient client = HttpClients.createDefault()) {
+
+ HttpDelete delete = new HttpDelete(String.format(url, ip, index, "object"));
+
+ CloseableHttpResponse response = client.execute(delete);
+
+ log.info("deleting Index SUMMARY");
+ log.info(STATUS_CODE_TEXT,response.getStatusLine());
+ }
+
+
+ try(CloseableHttpClient client = HttpClients.createDefault()) {
+
+ HttpDelete delete = new HttpDelete(String.format(url, ip, index, "scholix"));
+
+ CloseableHttpResponse response = client.execute(delete);
+
+ log.info("deleting Index SCHOLIX");
+ log.info(STATUS_CODE_TEXT,response.getStatusLine());
+ }
+
+ try(CloseableHttpClient client = HttpClients.createDefault()) {
+
+ final String summaryConf = IOUtils
+ .toString(Objects.requireNonNull(DropAndCreateESIndex.class.getResourceAsStream("/eu/dnetlib/dhp/sx/provision/summary_index.json")));
+
+
+
+ HttpPut put = new HttpPut(String.format(url, ip, index, "object"));
+
+ StringEntity entity = new StringEntity(summaryConf);
+ put.setEntity(entity);
+ put.setHeader("Accept", APPLICATION_JSON);
+ put.setHeader("Content-type", APPLICATION_JSON);
+
+ log.info("creating First Index SUMMARY");
+ CloseableHttpResponse response = client.execute(put);
+ log.info(STATUS_CODE_TEXT,response.getStatusLine());
+
+ }
+ try(CloseableHttpClient client = HttpClients.createDefault()) {
+
+ final String scholixConf = IOUtils
+ .toString(Objects.requireNonNull(DropAndCreateESIndex.class.getResourceAsStream("/eu/dnetlib/dhp/sx/provision/scholix_index.json")));
+
+
+ log.info("creating Index SCHOLIX");
+ final HttpPut put = new HttpPut(String.format(url, ip, index, "scholix"));
+
+ final StringEntity entity = new StringEntity(scholixConf);
+ put.setEntity(entity);
+ put.setHeader("Accept", APPLICATION_JSON);
+ put.setHeader("Content-type", APPLICATION_JSON);
+
+ final CloseableHttpResponse response = client.execute(put);
+ log.info(STATUS_CODE_TEXT, response.getStatusLine());
+ }
+
+ }
+}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkIndexCollectionOnES.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkIndexCollectionOnES.java
new file mode 100644
index 0000000000..9845534e34
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkIndexCollectionOnES.java
@@ -0,0 +1,58 @@
+package eu.dnetlib.dhp.sx.provision;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+
+public class SparkIndexCollectionOnES {
+
+ public static void main(String[] args) throws Exception {
+
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils
+ .toString(
+ Objects.requireNonNull(SparkIndexCollectionOnES.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/sx/provision/index_on_es.json"))));
+ parser.parseArgument(args);
+
+ SparkConf conf = new SparkConf()
+ .setAppName(SparkIndexCollectionOnES.class.getSimpleName())
+ .setMaster(parser.get("master"));
+
+ final String sourcePath = parser.get("sourcePath");
+ final String index = parser.get("index");
+ final String idPath = parser.get("idPath");
+ final String cluster = parser.get("cluster");
+ final String clusterJson = IOUtils
+ .toString(Objects.requireNonNull(DropAndCreateESIndex.class.getResourceAsStream("/eu/dnetlib/dhp/sx/provision/cluster.json")));
+
+ final Map clusterMap = new ObjectMapper().readValue(clusterJson, Map.class);
+
+ final SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
+
+ try (final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext())) {
+
+ JavaRDD inputRdd = sc.textFile(sourcePath);
+
+ Map esCfg = new HashMap<>();
+ esCfg.put("es.nodes", clusterMap.get(cluster));
+ esCfg.put("es.mapping.id", idPath);
+ esCfg.put("es.batch.write.retry.count", "8");
+ esCfg.put("es.batch.write.retry.wait", "60s");
+ esCfg.put("es.batch.size.entries", "200");
+ esCfg.put("es.nodes.wan.only", "true");
+ JavaEsSpark.saveJsonToEs(inputRdd, index, esCfg);
+ }
+ }
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/cluster.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/cluster.json
new file mode 100644
index 0000000000..1cea6a8b92
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/cluster.json
@@ -0,0 +1,4 @@
+{
+ "cluster1": "10.19.65.51, 10.19.65.52, 10.19.65.53, 10.19.65.54",
+ "cluster2": "10.19.65.55, 10.19.65.56, 10.19.65.57, 10.19.65.58"
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/dropAndCreateIndex.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/dropAndCreateIndex.json
new file mode 100644
index 0000000000..3810f889b5
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/dropAndCreateIndex.json
@@ -0,0 +1,14 @@
+[
+ {
+ "paramName": "c",
+ "paramLongName": "cluster",
+ "paramDescription": "should be cluster1 or cluster2",
+ "paramRequired": true
+ },
+ {
+ "paramName": "i",
+ "paramLongName": "index",
+ "paramDescription": "index name",
+ "paramRequired": true
+ }
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/index_on_es.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/index_on_es.json
new file mode 100644
index 0000000000..a005bde369
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/index_on_es.json
@@ -0,0 +1,32 @@
+[
+ {
+ "paramName": "mt",
+ "paramLongName": "master",
+ "paramDescription": "should be local or yarn",
+ "paramRequired": true
+ },
+ {
+ "paramName": "s",
+ "paramLongName": "sourcePath",
+ "paramDescription": "the working path where generated files",
+ "paramRequired": true
+ },
+ {
+ "paramName": "i",
+ "paramLongName": "index",
+ "paramDescription": "the index name",
+ "paramRequired": true
+ },
+ {
+ "paramName": "c",
+ "paramLongName": "cluster",
+ "paramDescription": "the index cluster",
+ "paramRequired": true
+ },
+ {
+ "paramName": "id",
+ "paramLongName": "idPath",
+ "paramDescription": "the identifier field name",
+ "paramRequired": true
+ }
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/config-default.xml
new file mode 100644
index 0000000000..7c1a43e513
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/config-default.xml
@@ -0,0 +1,14 @@
+
+
+ oozie.use.system.libpath
+ true
+
+
+ oozie.action.sharelib.for.spark
+ spark2
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/workflow.xml
new file mode 100644
index 0000000000..15903c0657
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/workflow.xml
@@ -0,0 +1,89 @@
+
+
+
+ sourcePath
+ the sourcePath of the json RDDs
+
+
+ index
+ the index name
+
+
+ esCluster
+ the Index cluster
+
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+ eu.dnetlib.dhp.sx.provision.DropAndCreateESIndex
+ -i${index}
+ -c${esCluster}
+
+
+
+
+
+
+
+
+ yarn
+ cluster
+ Index summary
+ eu.dnetlib.dhp.sx.graph.SparkCreateInputGraph
+ dhp-graph-mapper-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --conf spark.dynamicAllocation.maxExecutors="8"
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --masteryarn
+ --sourcePath${sourcePath}/summaries_json
+ --index${index}_object
+ --idPathid
+ --cluster${esCluster}
+
+
+
+
+
+
+
+ yarn
+ cluster
+ Index summary
+ eu.dnetlib.dhp.sx.graph.SparkCreateInputGraph
+ dhp-graph-mapper-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --conf spark.dynamicAllocation.maxExecutors="8"
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --masteryarn
+ --sourcePath${sourcePath}/scholix_json
+ --index${index}_scholix
+ --idPathidentifier
+ --cluster${esCluster}
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/scholix_index.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/scholix_index.json
new file mode 100644
index 0000000000..93032712a7
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/scholix_index.json
@@ -0,0 +1,331 @@
+{
+ "mappings": {
+ "properties": {
+ "identifier": {
+ "type": "text",
+ "fields": {
+ "keyword": {
+ "type": "keyword",
+ "ignore_above": 256
+ }
+ }
+ },
+ "linkprovider": {
+ "type": "nested",
+ "properties": {
+ "identifiers": {
+ "properties": {
+ "identifier": {
+ "type": "text",
+ "fields": {
+ "keyword": {
+ "type": "keyword",
+ "ignore_above": 256
+ }
+ }
+ },
+ "schema": {
+ "type": "text",
+ "fields": {
+ "keyword": {
+ "type": "keyword",
+ "ignore_above": 256
+ }
+ }
+ }
+ }
+ },
+ "name": {
+ "type": "keyword"
+ }
+ }
+ },
+ "publicationDate": {
+ "type": "keyword"
+ },
+ "relationship": {
+ "properties": {
+ "name": {
+ "type": "text",
+ "fields": {
+ "keyword": {
+ "type": "keyword",
+ "ignore_above": 256
+ }
+ }
+ },
+ "schema": {
+ "type": "text",
+ "fields": {
+ "keyword": {
+ "type": "keyword",
+ "ignore_above": 256
+ }
+ }
+ }
+ }
+ },
+ "source": {
+ "type": "nested",
+ "properties": {
+ "collectedFrom": {
+ "properties": {
+ "completionStatus": {
+ "type": "text",
+ "fields": {
+ "keyword": {
+ "type": "keyword",
+ "ignore_above": 256
+ }
+ }
+ },
+ "provider": {
+ "properties": {
+ "identifiers": {
+ "properties": {
+ "identifier": {
+ "type": "text",
+ "fields": {
+ "keyword": {
+ "type": "keyword",
+ "ignore_above": 256
+ }
+ }
+ },
+ "schema": {
+ "type": "text",
+ "fields": {
+ "keyword": {
+ "type": "keyword",
+ "ignore_above": 256
+ }
+ }
+ }
+ }
+ },
+ "name": {
+ "type": "text",
+ "fields": {
+ "keyword": {
+ "type": "keyword",
+ "ignore_above": 256
+ }
+ }
+ }
+ }
+ },
+ "provisionMode": {
+ "type": "text",
+ "fields": {
+ "keyword": {
+ "type": "keyword",
+ "ignore_above": 256
+ }
+ }
+ }
+ }
+ },
+ "creator": {
+ "properties": {
+ "name": {
+ "type": "text",
+ "fields": {
+ "keyword": {
+ "type": "keyword",
+ "ignore_above": 256
+ }
+ }
+ }
+ }
+ },
+ "dnetIdentifier": {
+ "type": "keyword"
+ },
+ "identifier": {
+ "type": "nested",
+ "properties": {
+ "identifier": {
+ "type": "keyword"
+ },
+ "schema": {
+ "type": "text",
+ "fields": {
+ "keyword": {
+ "type": "keyword",
+ "ignore_above": 256
+ }
+ }
+ },
+ "type": {
+ "type": "keyword"
+ }
+ }
+ },
+ "objectType": {
+ "type": "keyword"
+ },
+ "publicationDate": {
+ "type": "keyword"
+ },
+ "publisher": {
+ "type": "nested",
+ "properties": {
+ "name": {
+ "type": "keyword"
+ }
+ }
+ },
+ "title": {
+ "type": "text",
+ "fields": {
+ "keyword": {
+ "type": "keyword",
+ "ignore_above": 256
+ }
+ }
+ }
+ }
+ },
+ "target": {
+ "type": "nested",
+ "properties": {
+ "collectedFrom": {
+ "properties": {
+ "completionStatus": {
+ "type": "text",
+ "fields": {
+ "keyword": {
+ "type": "keyword",
+ "ignore_above": 256
+ }
+ }
+ },
+ "provider": {
+ "properties": {
+ "identifiers": {
+ "properties": {
+ "identifier": {
+ "type": "text",
+ "fields": {
+ "keyword": {
+ "type": "keyword",
+ "ignore_above": 256
+ }
+ }
+ },
+ "schema": {
+ "type": "text",
+ "fields": {
+ "keyword": {
+ "type": "keyword",
+ "ignore_above": 256
+ }
+ }
+ }
+ }
+ },
+ "name": {
+ "type": "text",
+ "fields": {
+ "keyword": {
+ "type": "keyword",
+ "ignore_above": 256
+ }
+ }
+ }
+ }
+ },
+ "provisionMode": {
+ "type": "text",
+ "fields": {
+ "keyword": {
+ "type": "keyword",
+ "ignore_above": 256
+ }
+ }
+ }
+ }
+ },
+ "creator": {
+ "properties": {
+ "name": {
+ "type": "text",
+ "fields": {
+ "keyword": {
+ "type": "keyword",
+ "ignore_above": 256
+ }
+ }
+ }
+ }
+ },
+ "dnetIdentifier": {
+ "type": "keyword"
+ },
+ "identifier": {
+ "type": "nested",
+ "properties": {
+ "identifier": {
+ "type": "keyword"
+ },
+ "schema": {
+ "type": "text",
+ "fields": {
+ "keyword": {
+ "type": "keyword",
+ "ignore_above": 256
+ }
+ }
+ },
+ "type": {
+ "type": "keyword"
+ }
+ }
+ },
+ "objectType": {
+ "type": "keyword"
+ },
+ "publicationDate": {
+ "type": "keyword"
+ },
+ "publisher": {
+ "type": "nested",
+ "properties": {
+ "name": {
+ "type": "keyword"
+ }
+ }
+ },
+ "title": {
+ "type": "text",
+ "fields": {
+ "keyword": {
+ "type": "keyword",
+ "ignore_above": 256
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "settings": {
+ "index": {
+ "refresh_interval": "600s",
+ "number_of_shards": "48",
+ "translog": {
+ "sync_interval": "15s",
+ "durability": "ASYNC"
+ },
+ "analysis": {
+ "analyzer": {
+ "analyzer_keyword": {
+ "filter": "lowercase",
+ "tokenizer": "keyword"
+ }
+ }
+ },
+ "number_of_replicas": "0"
+ }
+ }
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/summary_index.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/summary_index.json
new file mode 100644
index 0000000000..72839714c2
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/provision/summary_index.json
@@ -0,0 +1,132 @@
+{
+ "mappings": {
+ "properties": {
+ "abstract": {
+ "type": "text",
+ "fields": {
+ "keyword": {
+ "type": "keyword",
+ "ignore_above": 256
+ }
+ }
+ },
+ "author": {
+ "type": "text",
+ "fields": {
+ "keyword": {
+ "type": "keyword",
+ "ignore_above": 256
+ }
+ }
+ },
+ "datasources": {
+ "type": "nested",
+ "properties": {
+ "completionStatus": {
+ "type": "text",
+ "fields": {
+ "keyword": {
+ "type": "keyword",
+ "ignore_above": 256
+ }
+ }
+ },
+ "datasourceId": {
+ "type": "keyword"
+ },
+ "datasourceName": {
+ "type": "keyword"
+ }
+ }
+ },
+ "date": {
+ "type": "keyword"
+ },
+ "id": {
+ "type": "text",
+ "fields": {
+ "keyword": {
+ "type": "keyword",
+ "ignore_above": 256
+ }
+ }
+ },
+ "localIdentifier": {
+ "type": "nested",
+ "properties": {
+ "id": {
+ "type": "keyword"
+ },
+ "type": {
+ "type": "keyword"
+ }
+ }
+ },
+ "publisher": {
+ "type": "keyword"
+ },
+ "relatedDatasets": {
+ "type": "long"
+ },
+ "relatedPublications": {
+ "type": "long"
+ },
+ "relatedUnknown": {
+ "type": "long"
+ },
+ "subject": {
+ "properties": {
+ "scheme": {
+ "type": "text",
+ "fields": {
+ "keyword": {
+ "type": "keyword",
+ "ignore_above": 256
+ }
+ }
+ },
+ "value": {
+ "type": "text",
+ "fields": {
+ "keyword": {
+ "type": "keyword",
+ "ignore_above": 256
+ }
+ }
+ }
+ }
+ },
+ "title": {
+ "type": "text",
+ "fields": {
+ "keyword": {
+ "type": "keyword",
+ "ignore_above": 256
+ }
+ }
+ },
+ "typology": {
+ "type": "keyword"
+ }
+ }
+ },
+ "settings": {
+ "index": {
+ "refresh_interval": "600s",
+ "number_of_shards": "48",
+ "translog": {
+ "sync_interval": "15s",
+ "durability": "ASYNC"
+ },
+ "analysis": {
+ "analyzer": {
+ "analyzer_keyword": {
+ "filter": "lowercase",
+ "tokenizer": "keyword"
+ }
+ }
+ },
+ "number_of_replicas": "0"
+ }
+ }
+}
\ No newline at end of file