diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateScholix.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateScholix.java index b1bda0154..2e08849cd 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateScholix.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateScholix.java @@ -3,16 +3,20 @@ package eu.dnetlib.dhp.provision; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.provision.scholix.Scholix; -import eu.dnetlib.dhp.utils.DHPUtils; +import eu.dnetlib.dhp.provision.scholix.ScholixResource; +import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.sql.SparkSession; import scala.Tuple2; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + public class SparkGenerateScholix { private static final String jsonIDPath = "$.id"; @@ -21,6 +25,8 @@ public class SparkGenerateScholix { + + public static void main(String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkGenerateScholix.class.getResourceAsStream("/eu/dnetlib/dhp/provision/input_generate_summary_parameters.json"))); parser.parseArgument(args); @@ -37,29 +43,48 @@ public class SparkGenerateScholix { final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - final JavaRDD relationToExport = sc.textFile(graphPath + "/relation").filter(ProvisionUtil::isNotDeleted).repartition(4000); - final JavaPairRDD scholixSummary = sc.textFile(workingDirPath + "/summary").mapToPair((PairFunction) i -> new Tuple2<>(DHPUtils.getJPathString(jsonIDPath, i), i)); - scholixSummary.join( - relationToExport - .mapToPair((PairFunction) i -> new Tuple2<>(DHPUtils.getJPathString(sourceIDPath, i), i))) - .map(Tuple2::_2) - .mapToPair(summaryRelation -> - new Tuple2<>( - DHPUtils.getJPathString(targetIDPath, summaryRelation._2()), - Scholix.generateScholixWithSource(summaryRelation._1(), summaryRelation._2()))) -// .join(scholixSummary) +// final JavaRDD relationToExport = sc.textFile(graphPath + "/relation").filter(ProvisionUtil::isNotDeleted).repartition(4000); + final JavaPairRDD scholixSummary = + sc.textFile(workingDirPath + "/summary") + .flatMapToPair((PairFlatMapFunction) i -> { + final ObjectMapper mapper = new ObjectMapper(); + final ScholixSummary summary = mapper.readValue(i, ScholixSummary.class); + ScholixResource tmp = ScholixResource.fromSummary(summary); + final List> result = new ArrayList<>(); + for (int k = 0; k<10; k++) + result.add(new Tuple2<>(String.format("%s::%d", tmp.getDnetIdentifier(), k), tmp)); + return result.iterator(); + }); +// scholixSummary.join( +// relationToExport +// .mapToPair((PairFunction) i -> new Tuple2<>(DHPUtils.getJPathString(sourceIDPath, i), i))) // .map(Tuple2::_2) -// .map(i -> i._1().addTarget(i._2())) - .map(s-> { +// .mapToPair(summaryRelation -> +// new Tuple2<>( +// DHPUtils.getJPathString(targetIDPath, summaryRelation._2()), +// Scholix.generateScholixWithSource(summaryRelation._1(), summaryRelation._2()))) +// +// .map(t-> t._2().setTarget(new ScholixResource().setDnetIdentifier(t._1()))) +// .map(s-> { +// ObjectMapper mapper = new ObjectMapper(); +// return mapper.writeValueAsString(s); +// }) +// .saveAsTextFile(workingDirPath + "/scholix", GzipCodec.class); + + sc.textFile(workingDirPath + "/scholix") + .mapToPair(t -> { ObjectMapper mapper = new ObjectMapper(); - return mapper.writeValueAsString(s); + Scholix scholix = mapper.readValue(t, Scholix.class); + Random rand = new Random(); + return new Tuple2<>(String.format("%s::%d",scholix.getTarget().getDnetIdentifier(), rand.nextInt(10)), scholix); }) - .saveAsTextFile(workingDirPath + "/scholix", GzipCodec.class); - - - ; - - + .join(scholixSummary) + .map(t-> { + Scholix item = t._2()._1().setTarget(t._2()._2()); + item.generateIdentifier(); + return item; + }) + .map(s-> new ObjectMapper().writeValueAsString(s)).saveAsTextFile(workingDirPath + "/scholix_index", GzipCodec.class); } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkIndexCollectionOnES.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkIndexCollectionOnES.java index e7c97ee1c..7f240cbef 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkIndexCollectionOnES.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkIndexCollectionOnES.java @@ -24,6 +24,7 @@ public class SparkIndexCollectionOnES { final String sourcePath = parser.get("sourcePath"); final String index = parser.get("index"); + final String idPath = parser.get("idPath"); final SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); @@ -34,7 +35,7 @@ public class SparkIndexCollectionOnES { Map esCfg = new HashMap<>(); esCfg.put("es.nodes", "10.19.65.51, 10.19.65.52, 10.19.65.53, 10.19.65.54"); - esCfg.put("es.mapping.id", "id"); + esCfg.put("es.mapping.id", idPath); esCfg.put("es.batch.write.retry.count", "8"); esCfg.put("es.batch.write.retry.wait", "60s"); esCfg.put("es.batch.size.entries", "200"); diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/Scholix.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/Scholix.java index c6fc792aa..3ebccfea0 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/Scholix.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/Scholix.java @@ -47,7 +47,7 @@ public class Scholix implements Serializable { } - private void generateIdentifier( ) { + public void generateIdentifier( ) { setIdentifier(DHPUtils.md5(String.format("%s::%s::%s",source.getDnetIdentifier(),relationship.getName(), target.getDnetIdentifier()))); } diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/Application/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/Application/provision/oozie_app/workflow.xml index 8ce51069f..83f386f5c 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/Application/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/Application/provision/oozie_app/workflow.xml @@ -25,9 +25,19 @@ number of cores used by single executor + + idScholix + the + + + idSummary + number of cores used by single executor + + + - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] @@ -103,7 +113,7 @@ ${nameNode} yarn-cluster cluster - generate Summary + generate Scholix eu.dnetlib.dhp.provision.SparkGenerateScholix dhp-graph-provision-${projectVersion}.jar --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} @@ -111,9 +121,29 @@ --workingDirPath${workingDirPath} --graphPath${graphPath} + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + index scholix + eu.dnetlib.dhp.provision.SparkIndexCollectionOnES + dhp-graph-provision-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --num-executors 20 --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="32" + -mt yarn-cluster + --sourcePath${workingDirPath}/scholix_index + --index${index}_scholix + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/provision/index_on_es.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/provision/index_on_es.json index e1c30ba39..d4904d8d3 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/provision/index_on_es.json +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/provision/index_on_es.json @@ -16,5 +16,11 @@ "paramLongName": "index", "paramDescription": "the index name", "paramRequired": true + }, + { + "paramName": "id", + "paramLongName": "idPath", + "paramDescription": "the identifier field name", + "paramRequired": true } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/provision/ExtractInfoTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/provision/ExtractInfoTest.java index a41413d00..12e91a72c 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/provision/ExtractInfoTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/provision/ExtractInfoTest.java @@ -1,5 +1,6 @@ package eu.dnetlib.dhp.provision; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.provision.scholix.Scholix; import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary; @@ -7,14 +8,13 @@ import org.apache.commons.io.IOUtils; import org.junit.Ignore; import org.junit.Test; +import scala.Tuple2; + public class ExtractInfoTest { @Test public void test() throws Exception { - final String json = IOUtils.toString(getClass().getResourceAsStream("record.json")); - - ProvisionUtil.getItemType(json,ProvisionUtil.TARGETJSONPATH); } @@ -43,6 +43,7 @@ public class ExtractInfoTest { } + @Test @Ignore public void testIndex() throws Exception {