fixed indexing

This commit is contained in:
Sandro La Bruzzo 2020-08-11 09:17:03 +02:00
parent cf6b68ce5a
commit eaf0dc68a2
9 changed files with 271 additions and 34 deletions

View File

@ -0,0 +1,97 @@
package eu.dnetlib.dhp.provision;
import java.util.Map;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.codehaus.jackson.map.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
public class DropAndCreateESIndex {
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
DropAndCreateESIndex.class
.getResourceAsStream(
"/eu/dnetlib/dhp/provision/dropAndCreateIndex.json")));
parser.parseArgument(args);
final String index = parser.get("index");
final String cluster = parser.get("cluster");
final String clusterJson = IOUtils
.toString(DropAndCreateESIndex.class.getResourceAsStream("/eu/dnetlib/dhp/provision/cluster.json"));
final Map<String, String> clusterMap = new ObjectMapper().readValue(clusterJson, Map.class);
final String ip = clusterMap.get(cluster).split(",")[0];
System.out.println(ip);
final String url = "http://%s:9200/%s_%s";
CloseableHttpClient client = HttpClients.createDefault();
HttpDelete delete = new HttpDelete(String.format(url, ip, index, "object"));
CloseableHttpResponse response = client.execute(delete);
System.out.println("deleting Index SUMMARY");
System.out.println(response.getStatusLine());
client.close();
client = HttpClients.createDefault();
delete = new HttpDelete(String.format(url, ip, index, "scholix"));
response = client.execute(delete);
System.out.println("deleting Index SCHOLIX");
System.out.println(response.getStatusLine());
client.close();
client = HttpClients.createDefault();
final String summaryConf = IOUtils
.toString(DropAndCreateESIndex.class.getResourceAsStream("/eu/dnetlib/dhp/provision/summary_index.json"));
final String scholixConf = IOUtils
.toString(DropAndCreateESIndex.class.getResourceAsStream("/eu/dnetlib/dhp/provision/scholix_index.json"));
HttpPut put = new HttpPut(String.format(url, ip, index, "object"));
StringEntity entity = new StringEntity(summaryConf);
put.setEntity(entity);
put.setHeader("Accept", "application/json");
put.setHeader("Content-type", "application/json");
System.out.println("creating First Index SUMMARY");
response = client.execute(put);
client.close();
client = HttpClients.createDefault();
System.out.println(response.getStatusLine());
System.out.println("creating Index SCHOLIX");
put = new HttpPut(String.format(url, ip, index, "scholix"));
entity = new StringEntity(scholixConf);
put.setEntity(entity);
put.setHeader("Accept", "application/json");
put.setHeader("Content-type", "application/json");
response = client.execute(put);
System.out.println(response.getStatusLine());
client.close();
}
}

View File

@ -0,0 +1,38 @@
package eu.dnetlib.dhp.provision
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.provision.scholix.Scholix
import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary
import org.apache.commons.io.IOUtils
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.codehaus.jackson.map.ObjectMapper
object SparkConvertDatasetToJson {
def main(args: Array[String]): Unit = {
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkConvertDatasetToJson.getClass.getResourceAsStream("/eu/dnetlib/dhp/provision/dataset2Json.json")))
parser.parseArgument(args)
val conf = new SparkConf
val spark = SparkSession.builder.config(conf).appName(SparkConvertDatasetToJson.getClass.getSimpleName).master(parser.get("master")).getOrCreate
implicit val summaryEncoder: Encoder[ScholixSummary] = Encoders.kryo[ScholixSummary]
implicit val scholixEncoder: Encoder[Scholix] = Encoders.kryo[Scholix]
val workingPath = parser.get("workingPath")
spark.read.load(s"$workingPath/summary").as[ScholixSummary]
.map(s => new ObjectMapper().writeValueAsString(s))(Encoders.STRING)
.rdd.repartition(500).saveAsTextFile(s"$workingPath/summary_json", classOf[GzipCodec])
spark.read.load(s"$workingPath/scholix").as[Scholix]
.map(s => new ObjectMapper().writeValueAsString(s))(Encoders.STRING)
.rdd.repartition(2000).saveAsTextFile(s"$workingPath/scholix_json", classOf[GzipCodec])
}
}

View File

@ -8,15 +8,12 @@ import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary;
public class SparkIndexCollectionOnES {
@ -39,33 +36,20 @@ public class SparkIndexCollectionOnES {
final String sourcePath = parser.get("sourcePath");
final String index = parser.get("index");
final String idPath = parser.get("idPath");
final String type = parser.get("type");
final String indexHost = parser.get("esHost");
final String cluster = parser.get("cluster");
final String clusterJson = IOUtils
.toString(DropAndCreateESIndex.class.getResourceAsStream("/eu/dnetlib/dhp/provision/cluster.json"));
final Map<String, String> clusterMap = new ObjectMapper().readValue(clusterJson, Map.class);
final SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<String> inputRdd;
if ("summary".equalsIgnoreCase(type))
inputRdd = spark
.read()
.load(sourcePath)
.as(Encoders.bean(ScholixSummary.class))
.map(
(MapFunction<ScholixSummary, String>) f -> {
final ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(f);
},
Encoders.STRING())
.javaRDD();
else
inputRdd = sc.textFile(sourcePath);
JavaRDD<String> inputRdd = sc.textFile(sourcePath);
Map<String, String> esCfg = new HashMap<>();
// esCfg.put("es.nodes", "10.19.65.51, 10.19.65.52, 10.19.65.53, 10.19.65.54");
esCfg.put("es.nodes", indexHost);
esCfg.put("es.nodes", clusterMap.get(cluster));
esCfg.put("es.mapping.id", idPath);
esCfg.put("es.batch.write.retry.count", "8");
esCfg.put("es.batch.write.retry.wait", "60s");

View File

@ -0,0 +1,14 @@
[
{
"paramName": "m",
"paramLongName": "master",
"paramDescription": "master should be local or yarn",
"paramRequired": true
},
{
"paramName": "w",
"paramLongName": "workingPath",
"paramDescription": "the working path",
"paramRequired": true
}
]

View File

@ -0,0 +1,14 @@
[
{
"paramName": "c",
"paramLongName": "cluster",
"paramDescription": "should be cluster1 or cluster2",
"paramRequired": true
},
{
"paramName": "i",
"paramLongName": "index",
"paramDescription": "index name",
"paramRequired": true
}
]

View File

@ -18,19 +18,12 @@
"paramRequired": true
},
{
"paramName": "h",
"paramLongName": "esHost",
"paramDescription": "the index host name",
"paramName": "c",
"paramLongName": "cluster",
"paramDescription": "the index cluster",
"paramRequired": true
},
{
"paramName": "t",
"paramLongName": "type",
"paramDescription": "should be scholix or summary",
"paramRequired": true
},
{
"paramName": "id",
"paramLongName": "idPath",

View File

@ -7,4 +7,8 @@
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>

View File

@ -8,6 +8,14 @@
<name>graphPath</name>
<description>the graph path</description>
</property>
<property>
<name>index</name>
<description>the index name</description>
</property>
<property>
<name>esCluster</name>
<description>the Index cluster</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
@ -18,7 +26,7 @@
</property>
</parameters>
<start to="DeleteTargetPath"/>
<start to="DropAndCreateIndex"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
@ -82,6 +90,78 @@
<arg>--workingDirPath</arg><arg>${workingDirPath}</arg>
<arg>--graphPath</arg><arg>${graphPath}</arg>
</spark>
<ok to="datasetToJson"/>
<error to="Kill"/>
</action>
<action name="datasetToJson">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>generate Scholix</name>
<class>eu.dnetlib.dhp.provision.SparkConvertDatasetToJson</class>
<jar>dhp-graph-provision-scholexplorer-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.sql.shuffle.partitions=4000 ${sparkExtraOPT}</spark-opts>
<arg>-m</arg> <arg>yarn-cluster</arg>
<arg>--workingPath</arg><arg>${workingDirPath}</arg>
</spark>
<ok to="DropAndCreateIndex"/>
<error to="Kill"/>
</action>
<action name="DropAndCreateIndex">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.provision.DropAndCreateESIndex</main-class>
<arg>-i</arg><arg>${index}</arg>
<arg>-c</arg><arg>${esCluster}</arg>
</java>
<ok to="indexSummary"/>
<error to="Kill"/>
</action>
<action name="indexSummary">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>index summary</name>
<class>eu.dnetlib.dhp.provision.SparkIndexCollectionOnES</class>
<jar>dhp-graph-provision-scholexplorer-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="8" </spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${workingDirPath}/summary_json</arg>
<arg>--index</arg><arg>${index}_object</arg>
<arg>--idPath</arg><arg>id</arg>
<arg>--cluster</arg><arg>${esCluster}</arg>
</spark>
<ok to="indexScholix"/>
<error to="Kill"/>
</action>
<action name="indexScholix">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>index scholix</name>
<class>eu.dnetlib.dhp.provision.SparkIndexCollectionOnES</class>
<jar>dhp-graph-provision-scholexplorer-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="8" </spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${workingDirPath}/summary_json</arg>
<arg>--index</arg><arg>${index}_scholix</arg>
<arg>--idPath</arg><arg>identifier</arg>
<arg>--cluster</arg><arg>${esCluster}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>

View File

@ -0,0 +1,13 @@
package eu.dnetlib.dhp.provision;
import org.junit.jupiter.api.Test;
public class DropAndCreateESIndexTest {
public void testDropAndCreate() throws Exception {
DropAndCreateESIndex.main("-c localhost -i dli_shadow".split(" "));
}
}