diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java deleted file mode 100644 index 0507f90e5..000000000 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java +++ /dev/null @@ -1,178 +0,0 @@ - -package eu.dnetlib.dhp.actionmanager.createunresolvedentities; - -import static eu.dnetlib.dhp.actionmanager.Constants.*; -import static eu.dnetlib.dhp.actionmanager.Constants.UPDATE_CLASS_NAME; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - -import java.io.Serializable; -import java.util.Arrays; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; - -import org.apache.commons.io.IOUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.databind.ObjectMapper; - -import eu.dnetlib.dhp.actionmanager.bipmodel.BipScore; -import eu.dnetlib.dhp.actionmanager.bipmodel.score.deserializers.BipResultModel; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.oaf.Instance; -import eu.dnetlib.dhp.schema.oaf.KeyValue; -import eu.dnetlib.dhp.schema.oaf.Measure; -import eu.dnetlib.dhp.schema.oaf.Result; -import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; -import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; -import eu.dnetlib.dhp.utils.DHPUtils; - -public class PrepareBipFinder implements Serializable { - - private static final Logger log = LoggerFactory.getLogger(PrepareBipFinder.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - - public static void main(String[] args) throws Exception { - - String jsonConfiguration = IOUtils - .toString( - PrepareBipFinder.class - .getResourceAsStream( - "/eu/dnetlib/dhp/actionmanager/createunresolvedentities/prepare_parameters.json")); - - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - - parser.parseArgument(args); - - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - - final String sourcePath = parser.get("sourcePath"); - log.info("sourcePath {}: ", sourcePath); - - final String outputPath = parser.get("outputPath"); - log.info("outputPath {}: ", outputPath); - - SparkConf conf = new SparkConf(); - - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); - prepareResults(spark, sourcePath, outputPath); - }); - } - - private static void prepareResults(SparkSession spark, String inputPath, String outputPath) { - - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - - JavaRDD bipDeserializeJavaRDD = sc - .textFile(inputPath) - .map(item -> OBJECT_MAPPER.readValue(item, BipResultModel.class)); - - spark - .createDataset(bipDeserializeJavaRDD.flatMap(entry -> entry.keySet().stream().map(key -> { - BipScore bs = new BipScore(); - bs.setId(key); - bs.setScoreList(entry.get(key)); - - return bs; - }).collect(Collectors.toList()).iterator()).rdd(), Encoders.bean(BipScore.class)) - .map((MapFunction) v -> { - Result r = new Result(); - final String cleanedPid = CleaningFunctions.normalizePidValue(DOI, v.getId()); - - r.setId(DHPUtils.generateUnresolvedIdentifier(v.getId(), DOI)); - Instance inst = new Instance(); - inst.setMeasures(getMeasure(v)); - - inst - .setPid( - Arrays - .asList( - OafMapperUtils - .structuredProperty( - cleanedPid, - OafMapperUtils - .qualifier( - DOI, DOI_CLASSNAME, - ModelConstants.DNET_PID_TYPES, - ModelConstants.DNET_PID_TYPES), - null))); - r.setInstance(Arrays.asList(inst)); - r - .setDataInfo( - OafMapperUtils - .dataInfo( - false, null, true, - false, - OafMapperUtils - .qualifier( - ModelConstants.PROVENANCE_ENRICH, - null, - ModelConstants.DNET_PROVENANCE_ACTIONS, - ModelConstants.DNET_PROVENANCE_ACTIONS), - null)); - return r; - }, Encoders.bean(Result.class)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath + "/bip"); - } - - private static List getMeasure(BipScore value) { - return value - .getScoreList() - .stream() - .map(score -> { - Measure m = new Measure(); - m.setId(score.getId()); - m - .setUnit( - score - .getUnit() - .stream() - .map(unit -> { - KeyValue kv = new KeyValue(); - kv.setValue(unit.getValue()); - kv.setKey(unit.getKey()); - kv - .setDataInfo( - OafMapperUtils - .dataInfo( - false, - UPDATE_DATA_INFO_TYPE, - true, - false, - OafMapperUtils - .qualifier( - UPDATE_MEASURE_BIP_CLASS_ID, - UPDATE_CLASS_NAME, - ModelConstants.DNET_PROVENANCE_ACTIONS, - ModelConstants.DNET_PROVENANCE_ACTIONS), - "")); - return kv; - }) - .collect(Collectors.toList())); - return m; - }) - .collect(Collectors.toList()); - } -} diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/oozie_app/workflow.xml index a2935a71d..a5388f28b 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/oozie_app/workflow.xml @@ -5,11 +5,6 @@ fosPath the input path of the resources to be extended - - - - - outputPath the path where to store the actionset @@ -77,35 +72,10 @@ - - - - - - - - - - - - - - - - - - - - - - - - - yarn @@ -125,6 +95,7 @@ --sourcePath${fosPath} --outputPath${workingDir}/input/fos + --delimiter${delimiter} diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareTest.java index ccb0ebbff..da7bcd3de 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareTest.java @@ -67,92 +67,6 @@ public class PrepareTest { spark.stop(); } - @Test - void bipPrepareTest() throws Exception { - final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/bip/bip.json") - .getPath(); - - PrepareBipFinder - .main( - new String[] { - "--isSparkSessionManaged", Boolean.FALSE.toString(), - "--sourcePath", sourcePath, - "--outputPath", workingDir.toString() + "/work" - - }); - - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - - JavaRDD tmp = sc - .textFile(workingDir.toString() + "/work/bip") - .map(item -> OBJECT_MAPPER.readValue(item, Result.class)); - - Assertions.assertEquals(86, tmp.count()); - - String doi1 = "unresolved::10.0000/096020199389707::doi"; - - Assertions.assertEquals(1, tmp.filter(r -> r.getId().equals(doi1)).count()); - Assertions.assertEquals(1, tmp.filter(r -> r.getId().equals(doi1)).collect().get(0).getInstance().size()); - Assertions - .assertEquals( - 3, tmp.filter(r -> r.getId().equals(doi1)).collect().get(0).getInstance().get(0).getMeasures().size()); - Assertions - .assertEquals( - "6.34596412687e-09", tmp - .filter(r -> r.getId().equals(doi1)) - .collect() - .get(0) - .getInstance() - .get(0) - .getMeasures() - .stream() - .filter(sl -> sl.getId().equals("influence")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); - Assertions - .assertEquals( - "0.641151896994", tmp - .filter(r -> r.getId().equals(doi1)) - .collect() - .get(0) - .getInstance() - .get(0) - .getMeasures() - .stream() - .filter(sl -> sl.getId().equals("popularity_alt")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); - Assertions - .assertEquals( - "2.33375102921e-09", tmp - .filter(r -> r.getId().equals(doi1)) - .collect() - .get(0) - .getInstance() - .get(0) - .getMeasures() - .stream() - .filter(sl -> sl.getId().equals("popularity")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); - - final String doi2 = "unresolved::10.3390/s18072310::doi"; - - Assertions.assertEquals(1, tmp.filter(r -> r.getId().equals(doi2)).count()); - Assertions.assertEquals(1, tmp.filter(r -> r.getId().equals(doi2)).collect().get(0).getInstance().size()); - - } - @Test void fosPrepareTest() throws Exception { final String sourcePath = getClass() @@ -338,57 +252,4 @@ public class PrepareTest { } -// @Test -// void test3() throws Exception { -// final String sourcePath = "/Users/miriam.baglioni/Downloads/doi_fos_results_20_12_2021.csv.gz"; -// -// final String outputPath = workingDir.toString() + "/fos.json"; -// GetFOSSparkJob -// .main( -// new String[] { -// "--isSparkSessionManaged", Boolean.FALSE.toString(), -// "--sourcePath", sourcePath, -// -// "-outputPath", outputPath -// -// }); -// -// final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); -// -// JavaRDD tmp = sc -// .textFile(outputPath) -// .map(item -> OBJECT_MAPPER.readValue(item, FOSDataModel.class)); -// -// tmp.foreach(t -> Assertions.assertTrue(t.getDoi() != null)); -// tmp.foreach(t -> Assertions.assertTrue(t.getLevel1() != null)); -// tmp.foreach(t -> Assertions.assertTrue(t.getLevel2() != null)); -// tmp.foreach(t -> Assertions.assertTrue(t.getLevel3() != null)); -// -// } -// -// @Test -// void test4() throws Exception { -// final String sourcePath = "/Users/miriam.baglioni/Downloads/doi_sdg_results_20_12_21.csv.gz"; -// -// final String outputPath = workingDir.toString() + "/sdg.json"; -// GetSDGSparkJob -// .main( -// new String[] { -// "--isSparkSessionManaged", Boolean.FALSE.toString(), -// "--sourcePath", sourcePath, -// -// "-outputPath", outputPath -// -// }); -// -// final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); -// -// JavaRDD tmp = sc -// .textFile(outputPath) -// .map(item -> OBJECT_MAPPER.readValue(item, SDGDataModel.class)); -// -// tmp.foreach(t -> Assertions.assertTrue(t.getDoi() != null)); -// tmp.foreach(t -> Assertions.assertTrue(t.getSbj() != null)); -// -// } } diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java index fce6c1e97..ce116688a 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java @@ -340,18 +340,7 @@ public class ProduceTest { } private JavaRDD getResultJavaRDD() throws Exception { - final String bipPath = getClass() - .getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/bip/bip.json") - .getPath(); - PrepareBipFinder - .main( - new String[] { - "--isSparkSessionManaged", Boolean.FALSE.toString(), - "--sourcePath", bipPath, - "--outputPath", workingDir.toString() + "/work" - - }); final String fosPath = getClass() .getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/fos/fos.json") .getPath(); @@ -449,18 +438,7 @@ public class ProduceTest { } private JavaRDD getResultJavaRDDPlusSDG() throws Exception { - final String bipPath = getClass() - .getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/bip/bip.json") - .getPath(); - PrepareBipFinder - .main( - new String[] { - "--isSparkSessionManaged", Boolean.FALSE.toString(), - "--sourcePath", bipPath, - "--outputPath", workingDir.toString() + "/work" - - }); final String fosPath = getClass() .getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/fos/fos.json") .getPath(); @@ -517,14 +495,6 @@ public class ProduceTest { .filter(row -> row.getSubject() != null) .count()); - Assertions - .assertEquals( - 85, - tmp - .filter(row -> !row.getId().equals(doi)) - .filter(r -> r.getInstance() != null && r.getInstance().size() > 0) - .count()); - } @Test