1
0
Fork 0

moved wf indexing of Scholexplorer in dhp-graph-provision

This commit is contained in:
Sandro La Bruzzo 2021-07-09 12:06:43 +02:00
parent 09fccf8000
commit 9f5a0f3ab6
10 changed files with 787 additions and 0 deletions

View File

@ -140,6 +140,12 @@
<artifactId>httpmime</artifactId> <artifactId>httpmime</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.noggit</groupId> <groupId>org.noggit</groupId>
<artifactId>noggit</artifactId> <artifactId>noggit</artifactId>

View File

@ -0,0 +1,107 @@
package eu.dnetlib.dhp.sx.provision;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Objects;
public class DropAndCreateESIndex {
private static final Logger log = LoggerFactory.getLogger(DropAndCreateESIndex.class);
public static final String STATUS_CODE_TEXT = "status code: {}";
public static final String APPLICATION_JSON = "application/json";
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
Objects.requireNonNull(DropAndCreateESIndex.class
.getResourceAsStream(
"/eu/dnetlib/dhp/sx/provision/dropAndCreateIndex.json"))));
parser.parseArgument(args);
final String index = parser.get("index");
final String cluster = parser.get("cluster");
final String clusterJson = IOUtils
.toString(Objects.requireNonNull(DropAndCreateESIndex.class.getResourceAsStream("/eu/dnetlib/dhp/sx/provision/cluster.json")));
Map<String,String> clusterMap = new ObjectMapper().readValue(clusterJson,Map.class );
final String ip = clusterMap.get(cluster).split(",")[0];
final String url = "http://%s:9200/%s_%s";
try(CloseableHttpClient client = HttpClients.createDefault()) {
HttpDelete delete = new HttpDelete(String.format(url, ip, index, "object"));
CloseableHttpResponse response = client.execute(delete);
log.info("deleting Index SUMMARY");
log.info(STATUS_CODE_TEXT,response.getStatusLine());
}
try(CloseableHttpClient client = HttpClients.createDefault()) {
HttpDelete delete = new HttpDelete(String.format(url, ip, index, "scholix"));
CloseableHttpResponse response = client.execute(delete);
log.info("deleting Index SCHOLIX");
log.info(STATUS_CODE_TEXT,response.getStatusLine());
}
try(CloseableHttpClient client = HttpClients.createDefault()) {
final String summaryConf = IOUtils
.toString(Objects.requireNonNull(DropAndCreateESIndex.class.getResourceAsStream("/eu/dnetlib/dhp/sx/provision/summary_index.json")));
HttpPut put = new HttpPut(String.format(url, ip, index, "object"));
StringEntity entity = new StringEntity(summaryConf);
put.setEntity(entity);
put.setHeader("Accept", APPLICATION_JSON);
put.setHeader("Content-type", APPLICATION_JSON);
log.info("creating First Index SUMMARY");
CloseableHttpResponse response = client.execute(put);
log.info(STATUS_CODE_TEXT,response.getStatusLine());
}
try(CloseableHttpClient client = HttpClients.createDefault()) {
final String scholixConf = IOUtils
.toString(Objects.requireNonNull(DropAndCreateESIndex.class.getResourceAsStream("/eu/dnetlib/dhp/sx/provision/scholix_index.json")));
log.info("creating Index SCHOLIX");
final HttpPut put = new HttpPut(String.format(url, ip, index, "scholix"));
final StringEntity entity = new StringEntity(scholixConf);
put.setEntity(entity);
put.setHeader("Accept", APPLICATION_JSON);
put.setHeader("Content-type", APPLICATION_JSON);
final CloseableHttpResponse response = client.execute(put);
log.info(STATUS_CODE_TEXT, response.getStatusLine());
}
}
}

View File

@ -0,0 +1,58 @@
package eu.dnetlib.dhp.sx.provision;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
public class SparkIndexCollectionOnES {
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
Objects.requireNonNull(SparkIndexCollectionOnES.class
.getResourceAsStream(
"/eu/dnetlib/dhp/sx/provision/index_on_es.json"))));
parser.parseArgument(args);
SparkConf conf = new SparkConf()
.setAppName(SparkIndexCollectionOnES.class.getSimpleName())
.setMaster(parser.get("master"));
final String sourcePath = parser.get("sourcePath");
final String index = parser.get("index");
final String idPath = parser.get("idPath");
final String cluster = parser.get("cluster");
final String clusterJson = IOUtils
.toString(Objects.requireNonNull(DropAndCreateESIndex.class.getResourceAsStream("/eu/dnetlib/dhp/sx/provision/cluster.json")));
final Map<String, String> clusterMap = new ObjectMapper().readValue(clusterJson, Map.class);
final SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
try (final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext())) {
JavaRDD<String> inputRdd = sc.textFile(sourcePath);
Map<String, String> esCfg = new HashMap<>();
esCfg.put("es.nodes", clusterMap.get(cluster));
esCfg.put("es.mapping.id", idPath);
esCfg.put("es.batch.write.retry.count", "8");
esCfg.put("es.batch.write.retry.wait", "60s");
esCfg.put("es.batch.size.entries", "200");
esCfg.put("es.nodes.wan.only", "true");
JavaEsSpark.saveJsonToEs(inputRdd, index, esCfg);
}
}
}

View File

@ -0,0 +1,4 @@
{
"cluster1": "10.19.65.51, 10.19.65.52, 10.19.65.53, 10.19.65.54",
"cluster2": "10.19.65.55, 10.19.65.56, 10.19.65.57, 10.19.65.58"
}

View File

@ -0,0 +1,14 @@
[
{
"paramName": "c",
"paramLongName": "cluster",
"paramDescription": "should be cluster1 or cluster2",
"paramRequired": true
},
{
"paramName": "i",
"paramLongName": "index",
"paramDescription": "index name",
"paramRequired": true
}
]

View File

@ -0,0 +1,32 @@
[
{
"paramName": "mt",
"paramLongName": "master",
"paramDescription": "should be local or yarn",
"paramRequired": true
},
{
"paramName": "s",
"paramLongName": "sourcePath",
"paramDescription": "the working path where generated files",
"paramRequired": true
},
{
"paramName": "i",
"paramLongName": "index",
"paramDescription": "the index name",
"paramRequired": true
},
{
"paramName": "c",
"paramLongName": "cluster",
"paramDescription": "the index cluster",
"paramRequired": true
},
{
"paramName": "id",
"paramLongName": "idPath",
"paramDescription": "the identifier field name",
"paramRequired": true
}
]

View File

@ -0,0 +1,14 @@
<configuration>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,89 @@
<workflow-app name="Index Scholexplorer Infospace" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>
<description>the sourcePath of the json RDDs</description>
</property>
<property>
<name>index</name>
<description>the index name</description>
</property>
<property>
<name>esCluster</name>
<description>the Index cluster</description>
</property>
</parameters>
<start to="DropAndCreateIndex"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="DropAndCreateIndex">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.sx.provision.DropAndCreateESIndex</main-class>
<arg>-i</arg><arg>${index}</arg>
<arg>-c</arg><arg>${esCluster}</arg>
</java>
<ok to="indexSummary"/>
<error to="Kill"/>
</action>
<action name="indexSummary">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Index summary</name>
<class>eu.dnetlib.dhp.sx.graph.SparkCreateInputGraph</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--conf spark.dynamicAllocation.maxExecutors="8"
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--master</arg><arg>yarn</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/summaries_json</arg>
<arg>--index</arg><arg>${index}_object</arg>
<arg>--idPath</arg><arg>id</arg>
<arg>--cluster</arg><arg>${esCluster}</arg>
</spark>
<ok to="indexScholix"/>
<error to="Kill"/>
</action>
<action name="indexScholix">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Index summary</name>
<class>eu.dnetlib.dhp.sx.graph.SparkCreateInputGraph</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--conf spark.dynamicAllocation.maxExecutors="8"
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--master</arg><arg>yarn</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/scholix_json</arg>
<arg>--index</arg><arg>${index}_scholix</arg>
<arg>--idPath</arg><arg>identifier</arg>
<arg>--cluster</arg><arg>${esCluster}</arg>
</spark>
<ok to="indexScholix"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,331 @@
{
"mappings": {
"properties": {
"identifier": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"linkprovider": {
"type": "nested",
"properties": {
"identifiers": {
"properties": {
"identifier": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"schema": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
},
"name": {
"type": "keyword"
}
}
},
"publicationDate": {
"type": "keyword"
},
"relationship": {
"properties": {
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"schema": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
},
"source": {
"type": "nested",
"properties": {
"collectedFrom": {
"properties": {
"completionStatus": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"provider": {
"properties": {
"identifiers": {
"properties": {
"identifier": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"schema": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
},
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
},
"provisionMode": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
},
"creator": {
"properties": {
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
},
"dnetIdentifier": {
"type": "keyword"
},
"identifier": {
"type": "nested",
"properties": {
"identifier": {
"type": "keyword"
},
"schema": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"type": {
"type": "keyword"
}
}
},
"objectType": {
"type": "keyword"
},
"publicationDate": {
"type": "keyword"
},
"publisher": {
"type": "nested",
"properties": {
"name": {
"type": "keyword"
}
}
},
"title": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
},
"target": {
"type": "nested",
"properties": {
"collectedFrom": {
"properties": {
"completionStatus": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"provider": {
"properties": {
"identifiers": {
"properties": {
"identifier": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"schema": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
},
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
},
"provisionMode": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
},
"creator": {
"properties": {
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
},
"dnetIdentifier": {
"type": "keyword"
},
"identifier": {
"type": "nested",
"properties": {
"identifier": {
"type": "keyword"
},
"schema": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"type": {
"type": "keyword"
}
}
},
"objectType": {
"type": "keyword"
},
"publicationDate": {
"type": "keyword"
},
"publisher": {
"type": "nested",
"properties": {
"name": {
"type": "keyword"
}
}
},
"title": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}
}
},
"settings": {
"index": {
"refresh_interval": "600s",
"number_of_shards": "48",
"translog": {
"sync_interval": "15s",
"durability": "ASYNC"
},
"analysis": {
"analyzer": {
"analyzer_keyword": {
"filter": "lowercase",
"tokenizer": "keyword"
}
}
},
"number_of_replicas": "0"
}
}
}

View File

@ -0,0 +1,132 @@
{
"mappings": {
"properties": {
"abstract": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"author": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"datasources": {
"type": "nested",
"properties": {
"completionStatus": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"datasourceId": {
"type": "keyword"
},
"datasourceName": {
"type": "keyword"
}
}
},
"date": {
"type": "keyword"
},
"id": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"localIdentifier": {
"type": "nested",
"properties": {
"id": {
"type": "keyword"
},
"type": {
"type": "keyword"
}
}
},
"publisher": {
"type": "keyword"
},
"relatedDatasets": {
"type": "long"
},
"relatedPublications": {
"type": "long"
},
"relatedUnknown": {
"type": "long"
},
"subject": {
"properties": {
"scheme": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"value": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
},
"title": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"typology": {
"type": "keyword"
}
}
},
"settings": {
"index": {
"refresh_interval": "600s",
"number_of_shards": "48",
"translog": {
"sync_interval": "15s",
"durability": "ASYNC"
},
"analysis": {
"analyzer": {
"analyzer_keyword": {
"filter": "lowercase",
"tokenizer": "keyword"
}
}
},
"number_of_replicas": "0"
}
}
}