diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareFOSSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareFOSSparkJob.java index fef7965156..55e3919320 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareFOSSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareFOSSparkJob.java @@ -21,8 +21,10 @@ import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.FOSDataModel; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import eu.dnetlib.dhp.utils.DHPUtils; public class PrepareFOSSparkJob implements Serializable { @@ -71,6 +73,7 @@ public class PrepareFOSSparkJob implements Serializable { Result r = new Result(); FOSDataModel first = it.next(); r.setId(DHPUtils.generateUnresolvedIdentifier(k, DOI)); + HashSet level1 = new HashSet<>(); HashSet level2 = new HashSet<>(); HashSet level3 = new HashSet<>(); @@ -81,6 +84,19 @@ public class PrepareFOSSparkJob implements Serializable { level2.forEach(l -> sbjs.add(getSubject(l, FOS_CLASS_ID, FOS_CLASS_NAME, UPDATE_SUBJECT_FOS_CLASS_ID))); level3.forEach(l -> sbjs.add(getSubject(l, FOS_CLASS_ID, FOS_CLASS_NAME, UPDATE_SUBJECT_FOS_CLASS_ID))); r.setSubject(sbjs); + r + .setDataInfo( + OafMapperUtils + .dataInfo( + false, null, true, + false, + OafMapperUtils + .qualifier( + ModelConstants.PROVENANCE_ENRICH, + null, + ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS), + null)); return r; }, Encoders.bean(Result.class)) .write() diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareSDGSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareSDGSparkJob.java index 27da77c0c6..a31e380fe9 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareSDGSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareSDGSparkJob.java @@ -21,8 +21,10 @@ import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import eu.dnetlib.dhp.utils.DHPUtils; public class PrepareSDGSparkJob implements Serializable { @@ -78,6 +80,19 @@ public class PrepareSDGSparkJob implements Serializable { s -> sbjs .add(getSubject(s.getSbj(), SDG_CLASS_ID, SDG_CLASS_NAME, UPDATE_SUBJECT_SDG_CLASS_ID))); r.setSubject(sbjs); + r + .setDataInfo( + OafMapperUtils + .dataInfo( + false, null, true, + false, + OafMapperUtils + .qualifier( + ModelConstants.PROVENANCE_ENRICH, + null, + ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS), + null)); return r; }, Encoders.bean(Result.class)) .write() diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/SparkSaveUnresolved.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/SparkSaveUnresolved.java index ab83568365..3b97750943 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/SparkSaveUnresolved.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/SparkSaveUnresolved.java @@ -5,6 +5,7 @@ import static eu.dnetlib.dhp.actionmanager.Constants.*; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; +import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; @@ -67,7 +68,19 @@ public class SparkSaveUnresolved implements Serializable { .groupByKey((MapFunction) Result::getId, Encoders.STRING()) .mapGroups((MapGroupsFunction) (k, it) -> { Result ret = it.next(); - it.forEachRemaining(r -> ret.mergeFrom(r)); + it.forEachRemaining(r -> { + if (r.getInstance() != null) { + ret.setInstance(r.getInstance()); + } + if (r.getSubject() != null) { + if (ret.getSubject() != null) + ret.getSubject().addAll(r.getSubject()); + else + ret.setSubject(r.getSubject()); + } + + // ret.mergeFrom(r) + }); return ret; }, Encoders.bean(Result.class)) .write() diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareTest.java index 166430c2f6..cc8108bde7 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareTest.java @@ -146,6 +146,11 @@ public class PrepareTest { .get(0) .getValue()); + final String doi2 = "unresolved::10.3390/s18072310::doi"; + + Assertions.assertEquals(1, tmp.filter(r -> r.getId().equals(doi2)).count()); + Assertions.assertEquals(1, tmp.filter(r -> r.getId().equals(doi2)).collect().get(0).getInstance().size()); + } @Test @@ -259,59 +264,61 @@ public class PrepareTest { .collect() .contains("8. Economic growth")); - } - - @Test - void test3() throws Exception { - final String sourcePath = "/Users/miriam.baglioni/Downloads/doi_fos_results_20_12_2021.csv.gz"; - - final String outputPath = workingDir.toString() + "/fos.json"; - GetFOSSparkJob - .main( - new String[] { - "--isSparkSessionManaged", Boolean.FALSE.toString(), - "--sourcePath", sourcePath, - - "-outputPath", outputPath - - }); - - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - - JavaRDD tmp = sc - .textFile(outputPath) - .map(item -> OBJECT_MAPPER.readValue(item, FOSDataModel.class)); - - tmp.foreach(t -> Assertions.assertTrue(t.getDoi() != null)); - tmp.foreach(t -> Assertions.assertTrue(t.getLevel1() != null)); - tmp.foreach(t -> Assertions.assertTrue(t.getLevel2() != null)); - tmp.foreach(t -> Assertions.assertTrue(t.getLevel3() != null)); + Assertions.assertEquals(32, tmp.filter(row -> row.getDataInfo() != null).count()); } - @Test - void test4() throws Exception { - final String sourcePath = "/Users/miriam.baglioni/Downloads/doi_sdg_results_20_12_21.csv.gz"; - - final String outputPath = workingDir.toString() + "/sdg.json"; - GetSDGSparkJob - .main( - new String[] { - "--isSparkSessionManaged", Boolean.FALSE.toString(), - "--sourcePath", sourcePath, - - "-outputPath", outputPath - - }); - - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - - JavaRDD tmp = sc - .textFile(outputPath) - .map(item -> OBJECT_MAPPER.readValue(item, SDGDataModel.class)); - - tmp.foreach(t -> Assertions.assertTrue(t.getDoi() != null)); - tmp.foreach(t -> Assertions.assertTrue(t.getSbj() != null)); - - } +// @Test +// void test3() throws Exception { +// final String sourcePath = "/Users/miriam.baglioni/Downloads/doi_fos_results_20_12_2021.csv.gz"; +// +// final String outputPath = workingDir.toString() + "/fos.json"; +// GetFOSSparkJob +// .main( +// new String[] { +// "--isSparkSessionManaged", Boolean.FALSE.toString(), +// "--sourcePath", sourcePath, +// +// "-outputPath", outputPath +// +// }); +// +// final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); +// +// JavaRDD tmp = sc +// .textFile(outputPath) +// .map(item -> OBJECT_MAPPER.readValue(item, FOSDataModel.class)); +// +// tmp.foreach(t -> Assertions.assertTrue(t.getDoi() != null)); +// tmp.foreach(t -> Assertions.assertTrue(t.getLevel1() != null)); +// tmp.foreach(t -> Assertions.assertTrue(t.getLevel2() != null)); +// tmp.foreach(t -> Assertions.assertTrue(t.getLevel3() != null)); +// +// } +// +// @Test +// void test4() throws Exception { +// final String sourcePath = "/Users/miriam.baglioni/Downloads/doi_sdg_results_20_12_21.csv.gz"; +// +// final String outputPath = workingDir.toString() + "/sdg.json"; +// GetSDGSparkJob +// .main( +// new String[] { +// "--isSparkSessionManaged", Boolean.FALSE.toString(), +// "--sourcePath", sourcePath, +// +// "-outputPath", outputPath +// +// }); +// +// final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); +// +// JavaRDD tmp = sc +// .textFile(outputPath) +// .map(item -> OBJECT_MAPPER.readValue(item, SDGDataModel.class)); +// +// tmp.foreach(t -> Assertions.assertTrue(t.getDoi() != null)); +// tmp.foreach(t -> Assertions.assertTrue(t.getSbj() != null)); +// +// } } diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java index ce44f00361..a5ecaeabff 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java @@ -196,6 +196,9 @@ public class ProduceTest { final String doi = "unresolved::10.3390/s18072310::doi"; JavaRDD tmp = getResultJavaRDD(); + tmp + .filter(row -> row.getId().equals(doi)) + .foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r))); Assertions .assertEquals( 3, tmp