diff --git a/dhp-workflows/dhp-enrichment/null/publication/._SUCCESS.crc b/dhp-workflows/dhp-enrichment/null/publication/._SUCCESS.crc deleted file mode 100644 index 3b7b04493..000000000 Binary files a/dhp-workflows/dhp-enrichment/null/publication/._SUCCESS.crc and /dev/null differ diff --git a/dhp-workflows/dhp-enrichment/null/publication/.part-00000-d0707c22-40cb-470f-b7ad-68b08e2882ec-c000.json.gz.crc b/dhp-workflows/dhp-enrichment/null/publication/.part-00000-d0707c22-40cb-470f-b7ad-68b08e2882ec-c000.json.gz.crc deleted file mode 100644 index 24a358fe2..000000000 Binary files a/dhp-workflows/dhp-enrichment/null/publication/.part-00000-d0707c22-40cb-470f-b7ad-68b08e2882ec-c000.json.gz.crc and /dev/null differ diff --git a/dhp-workflows/dhp-enrichment/null/publication/_SUCCESS b/dhp-workflows/dhp-enrichment/null/publication/_SUCCESS deleted file mode 100644 index e69de29bb..000000000 diff --git a/dhp-workflows/dhp-enrichment/null/publication/part-00000-d0707c22-40cb-470f-b7ad-68b08e2882ec-c000.json.gz b/dhp-workflows/dhp-enrichment/null/publication/part-00000-d0707c22-40cb-470f-b7ad-68b08e2882ec-c000.json.gz deleted file mode 100644 index 768f68d92..000000000 Binary files a/dhp-workflows/dhp-enrichment/null/publication/part-00000-d0707c22-40cb-470f-b7ad-68b08e2882ec-c000.json.gz and /dev/null differ diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java index 316644479..c12fb54d9 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java @@ -12,14 +12,13 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.oaf.Country; -import eu.dnetlib.dhp.schema.oaf.DataInfo; -import eu.dnetlib.dhp.schema.oaf.Qualifier; -import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.*; public class PropagationConstant { @@ -237,4 +236,30 @@ public class PropagationConstant { .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); } + public static Dataset readOafKryoPath( + SparkSession spark, String inputPath, Class clazz) { + return spark + .read() + .textFile(inputPath) + .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.kryo(clazz)); + } + + public static Class[] getModelClasses() { + List> modelClasses = Lists.newArrayList(ModelSupport.getOafModelClasses()); + modelClasses + .addAll( + Lists + .newArrayList( + Result.class, + Qualifier.class, + DataInfo.class, + Publication.class, + eu.dnetlib.dhp.schema.oaf.Dataset.class, + Software.class, + OtherResearchProduct.class, + Subject.class, + AccessRight.class)); + return modelClasses.toArray(new Class[] {}); + } + } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/subjecttoresultfromsemrel/PrepareResultResultStep1.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/subjecttoresultfromsemrel/PrepareResultResultStep1.java index 0a82c3981..f35ad52e1 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/subjecttoresultfromsemrel/PrepareResultResultStep1.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/subjecttoresultfromsemrel/PrepareResultResultStep1.java @@ -70,7 +70,7 @@ public class PrepareResultResultStep1 implements Serializable { final List allowedSemRel = Arrays .asList( - parser.get("allowedSemRel").split(";")) + parser.get("allowedsemrels").split(";")) .stream() .map(s -> s.toLowerCase()) .collect(Collectors.toList()); @@ -98,7 +98,7 @@ public class PrepareResultResultStep1 implements Serializable { Dataset result = readPath(spark, inputPath + "/" + resultType, resultClazz) .filter( (FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() && - !r.getDataInfo().getInvisible() && + !r.getDataInfo().getInvisible() && Optional.ofNullable(r.getSubject()).isPresent() && r .getSubject() .stream() @@ -116,22 +116,28 @@ public class PrepareResultResultStep1 implements Serializable { (MapGroupsFunction, ResultSubjectList>) (k, it) -> getResultSubjectList(subjectClassList, k, it), Encoders.bean(ResultSubjectList.class)) + .filter(Objects::nonNull) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(outputPath + "/" + resultType); } - @NotNull private static ResultSubjectList getResultSubjectList(List subjectClassList, String k, Iterator> it) { + Tuple2 first = it.next(); + if (!Optional.ofNullable(first._1()).isPresent()) { + return null; + } ResultSubjectList rsl = new ResultSubjectList(); rsl.setResId(k); - Tuple2 first = it.next(); List sbjInfo = new ArrayList<>(); Set subjectSet = new HashSet<>(); extracted(subjectClassList, first._1().getSubject(), sbjInfo, subjectSet); - it.forEachRemaining(t2 -> extracted(subjectClassList, t2._1().getSubject(), sbjInfo, subjectSet)); + it.forEachRemaining(t2 -> { + if (Optional.ofNullable(t2._1()).isPresent()) + extracted(subjectClassList, t2._1().getSubject(), sbjInfo, subjectSet); + }); rsl.setSubjectList(sbjInfo); return rsl; } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/subjecttoresultfromsemrel/SparkSubjectPropagationStep2.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/subjecttoresultfromsemrel/SparkSubjectPropagationStep2.java index d546a8d8f..2a3bcff51 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/subjecttoresultfromsemrel/SparkSubjectPropagationStep2.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/subjecttoresultfromsemrel/SparkSubjectPropagationStep2.java @@ -50,6 +50,7 @@ public class SparkSubjectPropagationStep2 implements Serializable { final String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); + final String resultClassName = parser.get("resultTableName"); log.info("resultTableName: {}", resultClassName); @@ -58,14 +59,15 @@ public class SparkSubjectPropagationStep2 implements Serializable { final String resultType = parser.get("resultType"); log.info("resultType: {}", resultType); - final String inputPath = parser.get("inputPath"); + final String inputPath = parser.get("sourcePath"); log.info("inputPath: {}", inputPath); final String workingPath = parser.get("workingPath"); log.info("workingPath: {}", workingPath); SparkConf conf = new SparkConf(); - + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(getModelClasses()); runWithSparkSession( conf, isSparkSessionManaged, @@ -83,7 +85,11 @@ public class SparkSubjectPropagationStep2 implements Serializable { Class resultClazz, String resultType) { - Dataset results = readPath(spark, inputPath + "/" + resultType, resultClazz); + Dataset> results = readOafKryoPath(spark, inputPath + "/" + resultType, resultClazz) + .map( + (MapFunction>) r -> new Tuple2(r.getId(), r), + Encoders.tuple(Encoders.STRING(), Encoders.kryo(resultClazz))); + Dataset preparedResult = readPath( spark, preparedPath + "/publication", ResultSubjectList.class) .union(readPath(spark, preparedPath + "/dataset", ResultSubjectList.class)) @@ -93,20 +99,26 @@ public class SparkSubjectPropagationStep2 implements Serializable { results .joinWith( preparedResult, - results.col("id").equalTo(preparedResult.col("resId")), + results.col("_1").equalTo(preparedResult.col("resId")), "left") - .map((MapFunction, R>) t2 -> { - R res = t2._1(); + .map((MapFunction, ResultSubjectList>, String>) t2 -> { + R res = t2._1()._2(); + // estraggo le tipologie di subject dal result + Map> resultMap = new HashMap<>(); if (Optional.ofNullable(t2._2()).isPresent()) { - // estraggo le tipologie di subject dal result - Map> resultMap = new HashMap<>(); - res.getSubject().stream().forEach(s -> { - String cid = s.getQualifier().getClassid(); - if (!resultMap.containsKey(cid)) { - resultMap.put(cid, new ArrayList<>()); - } - resultMap.get(cid).add(s.getValue()); - }); + if(Optional.ofNullable(res.getSubject()).isPresent()){ + res.getSubject().stream().forEach(s -> { + String cid = s.getQualifier().getClassid(); + if(!cid.equals(ModelConstants.DNET_SUBJECT_KEYWORD)){ + if (!resultMap.containsKey(cid)) { + resultMap.put(cid, new ArrayList<>()); + } + resultMap.get(cid).add(s.getValue()); + } + }); + }else{ + res.setSubject(new ArrayList<>()); + } // Remove from the list all the subjects with the same class already present in the result List distinctClassId = t2 @@ -142,12 +154,12 @@ public class SparkSubjectPropagationStep2 implements Serializable { } } - return res; - }, Encoders.bean(resultClazz)) + return OBJECT_MAPPER.writeValueAsString(res); + }, Encoders.STRING()) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") - .json(workingPath + "/" + resultType); + .text(workingPath + "/" + resultType); readPath(spark, workingPath + "/" + resultType, resultClazz) .write() diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/subjectpropagation/input_preparesubjecttoresult_parameters.json b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/subjectpropagation/input_preparesubjecttoresult_parameters.json index a8ec1d5b3..1e3ac1af4 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/subjectpropagation/input_preparesubjecttoresult_parameters.json +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/subjectpropagation/input_preparesubjecttoresult_parameters.json @@ -3,7 +3,7 @@ { "paramName":"asr", - "paramLongName":"allowedSemRel", + "paramLongName":"allowedsemrels", "paramDescription": "the set of semantic relations between the results to be exploited to perform the propagation", "paramRequired": true }, diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/subjectpropagation/input_propagatesubject_parameters.json b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/subjectpropagation/input_propagatesubject_parameters.json index 0cb51c598..76942cbe6 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/subjectpropagation/input_propagatesubject_parameters.json +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/subjectpropagation/input_propagatesubject_parameters.json @@ -13,8 +13,8 @@ "paramRequired": true }, { - "paramName":"ip", - "paramLongName":"inputPath", + "paramName":"sp", + "paramLongName":"sourcePath", "paramDescription": "the path of the input graph", "paramRequired": true }, diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/subjectpropagation/oozie_app/config-default.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/subjectpropagation/oozie_app/config-default.xml index caf3c6050..0ce8cef58 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/subjectpropagation/oozie_app/config-default.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/subjectpropagation/oozie_app/config-default.xml @@ -48,7 +48,7 @@ sparkExecutorMemory - 6G + 10G sparkExecutorCores diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/subjectpropagation/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/subjectpropagation/oozie_app/workflow.xml index b7f48a4e0..b16a1b00f 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/subjectpropagation/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/subjectpropagation/oozie_app/workflow.xml @@ -1,12 +1,4 @@ - - - - - - - - sourcePath @@ -16,14 +8,6 @@ subjectlist the list of subject classid to propagate (split by ;) - - resultType - the result tapy - - - resultTableName - the class of the result - allowedsemrels the allowed semantics @@ -64,14 +48,14 @@ - + yarn cluster - PrepareProjectResultsAssociation + PrepareSubjectResultsAssociation eu.dnetlib.dhp.subjecttoresultfromsemrel.PrepareResultResultStep1 dhp-enrichment-${projectVersion}.jar @@ -98,7 +82,7 @@ yarn cluster - PrepareProjectResultsAssociation + PrepareSubjectResultsAssociation eu.dnetlib.dhp.subjecttoresultfromsemrel.PrepareResultResultStep1 dhp-enrichment-${projectVersion}.jar @@ -125,7 +109,7 @@ yarn cluster - PrepareProjectResultsAssociation + PrepareSubjectResultsAssociation eu.dnetlib.dhp.subjecttoresultfromsemrel.PrepareResultResultStep1 dhp-enrichment-${projectVersion}.jar @@ -152,7 +136,7 @@ yarn cluster - PrepareProjectResultsAssociation + PrepareSubjectResultsAssociation eu.dnetlib.dhp.subjecttoresultfromsemrel.PrepareResultResultStep1 dhp-enrichment-${projectVersion}.jar @@ -188,12 +172,12 @@ yarn cluster - ProjectToResultPropagation - eu.dnetlib.dhp.projecttoresult.SparkResultToProjectThroughSemRelJob + SubjectToResultPropagation + eu.dnetlib.dhp.subjecttoresultfromsemrel.SparkSubjectPropagationStep2 dhp-enrichment-${projectVersion}.jar --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} + --executor-memory=8G --driver-memory=${sparkDriverMemory} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} @@ -201,8 +185,9 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} + --conf spark.sql.shuffle.partitions=3840 - --inputPath${sourcePath} + --sourcePath${sourcePath} --outputPath${outputPath} --workingPath${workingDir}/working --resultTableNameeu.dnetlib.dhp.schema.oaf.Publication @@ -217,8 +202,8 @@ yarn cluster - ProjectToResultPropagation - eu.dnetlib.dhp.projecttoresult.SparkResultToProjectThroughSemRelJob + SubjectToResultPropagation + eu.dnetlib.dhp.subjecttoresultfromsemrel.SparkSubjectPropagationStep2 dhp-enrichment-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -230,8 +215,9 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} + --conf spark.sql.shuffle.partitions=3840 - --inputPath${sourcePath} + --sourcePath${sourcePath} --outputPath${outputPath} --workingPath${workingDir}/working --resultTableNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct @@ -246,8 +232,8 @@ yarn cluster - ProjectToResultPropagation - eu.dnetlib.dhp.projecttoresult.SparkResultToProjectThroughSemRelJob + SubjectToResultPropagation + eu.dnetlib.dhp.subjecttoresultfromsemrel.SparkSubjectPropagationStep2 dhp-enrichment-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -259,8 +245,9 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} + --conf spark.sql.shuffle.partitions=3840 - --inputPath${sourcePath} + --sourcePath${sourcePath} --outputPath${outputPath} --workingPath${workingDir}/working --resultTableNameeu.dnetlib.dhp.schema.oaf.Dataset @@ -275,8 +262,8 @@ yarn cluster - ProjectToResultPropagation - eu.dnetlib.dhp.projecttoresult.SparkResultToProjectThroughSemRelJob + SubjectToResultPropagation + eu.dnetlib.dhp.subjecttoresultfromsemrel.SparkSubjectPropagationStep2 dhp-enrichment-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -288,8 +275,9 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} + --conf spark.sql.shuffle.partitions=3840 - --inputPath${sourcePath} + --sourcePath${sourcePath} --outputPath${outputPath} --workingPath${workingDir}/working --resultTableNameeu.dnetlib.dhp.schema.oaf.Software @@ -300,7 +288,7 @@ - + diff --git a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/subjectpropagation/SubjectPreparationJobTest.java b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/subjectpropagation/SubjectPreparationJobTest.java index f782feab9..0b3b45d7e 100644 --- a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/subjectpropagation/SubjectPreparationJobTest.java +++ b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/subjectpropagation/SubjectPreparationJobTest.java @@ -81,7 +81,7 @@ public class SubjectPreparationJobTest { PrepareResultResultStep1 .main( new String[] { - "-allowedSemRel", + "-allowedsemrels", "IsSupplementedBy;IsSupplementTo;IsPreviousVersionOf;IsNewVersionOf;IsIdenticalTo;Obsoletes;IsObsoletedBy;IsVersionOf", "-subjectlist", "fos;sdg", "-resultType", "publication", diff --git a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/subjectpropagation/SubjectPropagationJobTest.java b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/subjectpropagation/SubjectPropagationJobTest.java index b324b49d8..48c425bbc 100644 --- a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/subjectpropagation/SubjectPropagationJobTest.java +++ b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/subjectpropagation/SubjectPropagationJobTest.java @@ -76,7 +76,7 @@ public class SubjectPropagationJobTest { .getResource("/eu/dnetlib/dhp/subjectpropagation/preparedInfo") .getPath(), "-resultType", "publication", - "-inputPath", getClass() + "-sourcePath", getClass() .getResource("/eu/dnetlib/dhp/subjectpropagation") .getPath(), "-isSparkSessionManaged", Boolean.FALSE.toString(),