From c4ccd7b32c3ee272188bad034b6f0264c074bd88 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Fri, 1 Oct 2021 12:59:47 +0200 Subject: [PATCH] - --- .../dhp/oa/dedup/SparkWhitelistSimRels.java | 196 +-- .../dnetlib/dhp/oa/dedup/SparkDedupTest.java | 1266 +++++++++-------- .../eu/dnetlib/dhp/PropagationConstant.java | 11 +- .../SparkOrcidToResultFromSemRelJob.java | 7 +- ...kResultToCommunityFromOrganizationJob.java | 4 +- ...parkResultToCommunityThroughSemRelJob.java | 4 +- .../dhp/oa/graph/dump/ResultMapper.java | 84 +- .../dhp/oa/graph/dump/SaveCommunityMap.java | 10 +- .../community/SparkPrepareResultProject.java | 10 +- .../dump/complete/DumpGraphEntities.java | 25 +- .../dump/complete/QueryInformationSystem.java | 6 +- .../dump/complete/SparkDumpRelationJob.java | 2 +- .../funderresults/SparkDumpFunderResults.java | 53 +- .../SparkResultLinkedToProject.java | 1 - .../dhp/oa/graph/dump/DumpJobTest.java | 380 +++-- .../dump/PrepareResultProjectJobTest.java | 152 +- .../oa/graph/dump/UpdateProjectInfoTest.java | 83 +- .../graph/dump/complete/DumpRelationTest.java | 80 +- .../complete/QueryInformationSystemTest.java | 8 +- .../dump/funderresult/SplitPerFunderTest.java | 4 +- .../provision/IndexRecordTransformerTest.java | 3 +- pom.xml | 2 +- 22 files changed, 1273 insertions(+), 1118 deletions(-) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkWhitelistSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkWhitelistSimRels.java index fa7d33570..48ba8a6f6 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkWhitelistSimRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkWhitelistSimRels.java @@ -1,3 +1,4 @@ + package eu.dnetlib.dhp.oa.dedup; import java.io.IOException; @@ -35,117 +36,124 @@ import scala.Tuple3; public class SparkWhitelistSimRels extends AbstractSparkAction { - private static final Logger log = LoggerFactory.getLogger(SparkCreateSimRels.class); + private static final Logger log = LoggerFactory.getLogger(SparkCreateSimRels.class); - private static final String WHITELIST_SEPARATOR = "####"; + private static final String WHITELIST_SEPARATOR = "####"; - public SparkWhitelistSimRels(ArgumentApplicationParser parser, SparkSession spark) { - super(parser, spark); - } + public SparkWhitelistSimRels(ArgumentApplicationParser parser, SparkSession spark) { + super(parser, spark); + } - public static void main(String[] args) throws Exception { - ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkCreateSimRels.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/whitelistSimRels_parameters.json"))); - parser.parseArgument(args); + public static void main(String[] args) throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCreateSimRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/whitelistSimRels_parameters.json"))); + parser.parseArgument(args); - SparkConf conf = new SparkConf(); - new SparkWhitelistSimRels(parser, getSparkSession(conf)) - .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); - } + SparkConf conf = new SparkConf(); + new SparkWhitelistSimRels(parser, getSparkSession(conf)) + .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); + } - @Override - public void run(ISLookUpService isLookUpService) - throws DocumentException, IOException, ISLookUpException, SAXException { + @Override + public void run(ISLookUpService isLookUpService) + throws DocumentException, IOException, ISLookUpException, SAXException { - // read oozie parameters - final String graphBasePath = parser.get("graphBasePath"); - final String isLookUpUrl = parser.get("isLookUpUrl"); - final String actionSetId = parser.get("actionSetId"); - final String workingPath = parser.get("workingPath"); - final int numPartitions = Optional - .ofNullable(parser.get("numPartitions")) - .map(Integer::valueOf) - .orElse(NUM_PARTITIONS); - final String whiteListPath = parser.get("whiteListPath"); + // read oozie parameters + final String graphBasePath = parser.get("graphBasePath"); + final String isLookUpUrl = parser.get("isLookUpUrl"); + final String actionSetId = parser.get("actionSetId"); + final String workingPath = parser.get("workingPath"); + final int numPartitions = Optional + .ofNullable(parser.get("numPartitions")) + .map(Integer::valueOf) + .orElse(NUM_PARTITIONS); + final String whiteListPath = parser.get("whiteListPath"); - log.info("numPartitions: '{}'", numPartitions); - log.info("graphBasePath: '{}'", graphBasePath); - log.info("isLookUpUrl: '{}'", isLookUpUrl); - log.info("actionSetId: '{}'", actionSetId); - log.info("workingPath: '{}'", workingPath); - log.info("whiteListPath: '{}'", whiteListPath); + log.info("numPartitions: '{}'", numPartitions); + log.info("graphBasePath: '{}'", graphBasePath); + log.info("isLookUpUrl: '{}'", isLookUpUrl); + log.info("actionSetId: '{}'", actionSetId); + log.info("workingPath: '{}'", workingPath); + log.info("whiteListPath: '{}'", whiteListPath); - JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - //file format: source####target - Dataset> whiteListRels = spark.createDataset(sc - .textFile(whiteListPath) - //check if the line is in the correct format: id1####id2 - .filter(s -> s.contains(WHITELIST_SEPARATOR) && s.split(WHITELIST_SEPARATOR).length == 2) - .map(s -> new Tuple2<>(s.split(WHITELIST_SEPARATOR)[0], s.split(WHITELIST_SEPARATOR)[1])) - .rdd(), - Encoders.tuple(Encoders.STRING(), Encoders.STRING())); + // file format: source####target + Dataset> whiteListRels = spark + .createDataset( + sc + .textFile(whiteListPath) + // check if the line is in the correct format: id1####id2 + .filter(s -> s.contains(WHITELIST_SEPARATOR) && s.split(WHITELIST_SEPARATOR).length == 2) + .map(s -> new Tuple2<>(s.split(WHITELIST_SEPARATOR)[0], s.split(WHITELIST_SEPARATOR)[1])) + .rdd(), + Encoders.tuple(Encoders.STRING(), Encoders.STRING())); - // for each dedup configuration - for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) { + // for each dedup configuration + for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) { - final String entity = dedupConf.getWf().getEntityType(); - final String subEntity = dedupConf.getWf().getSubEntityValue(); - log.info("Adding whitelist simrels for: '{}'", subEntity); + final String entity = dedupConf.getWf().getEntityType(); + final String subEntity = dedupConf.getWf().getSubEntityValue(); + log.info("Adding whitelist simrels for: '{}'", subEntity); - final String outputPath = DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity); + final String outputPath = DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity); - Dataset> entities = spark.createDataset(sc - .textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) - .repartition(numPartitions) - .mapToPair( - (PairFunction) s -> { - MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s); - return new Tuple2<>(d.getIdentifier(), "present"); - }) - .rdd(), - Encoders.tuple(Encoders.STRING(), Encoders.STRING())); + Dataset> entities = spark + .createDataset( + sc + .textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) + .repartition(numPartitions) + .mapToPair( + (PairFunction) s -> { + MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s); + return new Tuple2<>(d.getIdentifier(), "present"); + }) + .rdd(), + Encoders.tuple(Encoders.STRING(), Encoders.STRING())); - Dataset> whiteListRels1 = whiteListRels - .joinWith(entities, whiteListRels.col("_1").equalTo(entities.col("_1")), "inner") - .map((MapFunction, Tuple2>, Tuple2>) Tuple2::_1, Encoders.tuple(Encoders.STRING(), Encoders.STRING())); + Dataset> whiteListRels1 = whiteListRels + .joinWith(entities, whiteListRels.col("_1").equalTo(entities.col("_1")), "inner") + .map( + (MapFunction, Tuple2>, Tuple2>) Tuple2::_1, + Encoders.tuple(Encoders.STRING(), Encoders.STRING())); - Dataset> whiteListRels2 = whiteListRels1 - .joinWith(entities, whiteListRels1.col("_2").equalTo(entities.col("_1")), "inner") - .map((MapFunction, Tuple2>, Tuple2>) Tuple2::_1, Encoders.tuple(Encoders.STRING(), Encoders.STRING())); + Dataset> whiteListRels2 = whiteListRels1 + .joinWith(entities, whiteListRels1.col("_2").equalTo(entities.col("_1")), "inner") + .map( + (MapFunction, Tuple2>, Tuple2>) Tuple2::_1, + Encoders.tuple(Encoders.STRING(), Encoders.STRING())); - Dataset whiteListSimRels = whiteListRels2 - .map((MapFunction, Relation>) - r -> createSimRel(r._1(), r._2(), entity), - Encoders.bean(Relation.class) - ); + Dataset whiteListSimRels = whiteListRels2 + .map( + (MapFunction, Relation>) r -> createSimRel(r._1(), r._2(), entity), + Encoders.bean(Relation.class)); - saveParquet(whiteListSimRels, outputPath, SaveMode.Append); - } - } + saveParquet(whiteListSimRels, outputPath, SaveMode.Append); + } + } - private Relation createSimRel(String source, String target, String entity) { - final Relation r = new Relation(); - r.setSource(source); - r.setTarget(target); - r.setSubRelType("dedupSimilarity"); - r.setRelClass("isSimilarTo"); - r.setDataInfo(new DataInfo()); + private Relation createSimRel(String source, String target, String entity) { + final Relation r = new Relation(); + r.setSource(source); + r.setTarget(target); + r.setSubRelType("dedupSimilarity"); + r.setRelClass("isSimilarTo"); + r.setDataInfo(new DataInfo()); - switch (entity) { - case "result": - r.setRelType("resultResult"); - break; - case "organization": - r.setRelType("organizationOrganization"); - break; - default: - throw new IllegalArgumentException("unmanaged entity type: " + entity); - } - return r; - } + switch (entity) { + case "result": + r.setRelType("resultResult"); + break; + case "organization": + r.setRelType("organizationOrganization"); + break; + default: + throw new IllegalArgumentException("unmanaged entity type: " + entity); + } + return r; + } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java index fa03f93a6..549988767 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java @@ -48,634 +48,640 @@ import scala.Tuple2; @TestMethodOrder(MethodOrderer.OrderAnnotation.class) public class SparkDedupTest implements Serializable { - @Mock(serializable = true) - ISLookUpService isLookUpService; - - private static SparkSession spark; - private static JavaSparkContext jsc; - - private static String testGraphBasePath; - private static String testOutputBasePath; - private static String testDedupGraphBasePath; - private static final String testActionSetId = "test-orchestrator"; - private static String whitelistPath; - private static List whiteList; - - private static String WHITELIST_SEPARATOR = "####"; - - @BeforeAll - public static void cleanUp() throws IOException, URISyntaxException { - - testGraphBasePath = Paths - .get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/entities").toURI()) - .toFile() - .getAbsolutePath(); - testOutputBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-") - .toAbsolutePath() - .toString(); - - testDedupGraphBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-") - .toAbsolutePath() - .toString(); - - whitelistPath = Paths - .get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/whitelist.simrels.txt").toURI()) - .toFile() - .getAbsolutePath(); - whiteList = IOUtils.readLines(new FileReader(whitelistPath)); - - FileUtils.deleteDirectory(new File(testOutputBasePath)); - FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); - - final SparkConf conf = new SparkConf(); - conf.set("spark.sql.shuffle.partitions", "200"); - spark = SparkSession - .builder() - .appName(SparkDedupTest.class.getSimpleName()) - .master("local[*]") - .config(conf) - .getOrCreate(); - - jsc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - - } - - @BeforeEach - public void setUp() throws IOException, ISLookUpException { - - lenient() - .when(isLookUpService.getResourceProfileByQuery(Mockito.contains(testActionSetId))) - .thenReturn( - IOUtils - .toString( - SparkDedupTest.class - .getResourceAsStream( - "/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator.xml"))); - - lenient() - .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("organization"))) - .thenReturn( - IOUtils - .toString( - SparkDedupTest.class - .getResourceAsStream( - "/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json"))); - - lenient() - .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("publication"))) - .thenReturn( - IOUtils - .toString( - SparkDedupTest.class - .getResourceAsStream( - "/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json"))); - - lenient() - .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("software"))) - .thenReturn( - IOUtils - .toString( - SparkDedupTest.class - .getResourceAsStream( - "/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json"))); - - lenient() - .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("dataset"))) - .thenReturn( - IOUtils - .toString( - SparkDedupTest.class - .getResourceAsStream( - "/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json"))); - - lenient() - .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("otherresearchproduct"))) - .thenReturn( - IOUtils - .toString( - SparkDedupTest.class - .getResourceAsStream( - "/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json"))); - } - - @Test - @Order(1) - void createSimRelsTest() throws Exception { - - ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkCreateSimRels.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json"))); - - parser - .parseArgument( - new String[]{ - "-i", testGraphBasePath, - "-asi", testActionSetId, - "-la", "lookupurl", - "-w", testOutputBasePath, - "-np", "50" - }); - - new SparkCreateSimRels(parser, spark).run(isLookUpService); - - long orgs_simrel = spark - .read() - .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "organization")) - .count(); - - long pubs_simrel = spark - .read() - .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "publication")) - .count(); - - long sw_simrel = spark - .read() - .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "software")) - .count(); - - long ds_simrel = spark - .read() - .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "dataset")) - .count(); - - long orp_simrel = spark - .read() - .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "otherresearchproduct")) - .count(); - - assertEquals(3082, orgs_simrel); - assertEquals(7036, pubs_simrel); - assertEquals(336, sw_simrel); - assertEquals(442, ds_simrel); - assertEquals(6750, orp_simrel); - } - - @Test - @Order(2) - void whitelistSimRelsTest() throws Exception { - - ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkWhitelistSimRels.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/whitelistSimRels_parameters.json"))); - - parser - .parseArgument( - new String[]{ - "-i", testGraphBasePath, - "-asi", testActionSetId, - "-la", "lookupurl", - "-w", testOutputBasePath, - "-np", "50", - "-wl", whitelistPath - }); - - new SparkWhitelistSimRels(parser, spark).run(isLookUpService); - - long orgs_simrel = spark - .read() - .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "organization")) - .count(); - - long pubs_simrel = spark - .read() - .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "publication")) - .count(); - - long ds_simrel = spark - .read() - .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "dataset")) - .count(); - - long orp_simrel = spark - .read() - .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "otherresearchproduct")) - .count(); - - //entities simrels supposed to be equal to the number of previous step (no rels in whitelist) - assertEquals(3082, orgs_simrel); - assertEquals(7036, pubs_simrel); - assertEquals(442, ds_simrel); - assertEquals(6750, orp_simrel); - - //entities simrels to be different from the number of previous step (new simrels in the whitelist) - Dataset sw_simrel = spark - .read() - .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "software")); - - //check if the first relation in the whitelist exists - assertTrue(sw_simrel - .as(Encoders.bean(Relation.class)) - .toJavaRDD() - .filter(rel -> - rel.getSource().equalsIgnoreCase(whiteList.get(0).split(WHITELIST_SEPARATOR)[0]) && rel.getTarget().equalsIgnoreCase(whiteList.get(0).split(WHITELIST_SEPARATOR)[1])).count() > 0); - //check if the second relation in the whitelist exists - assertTrue(sw_simrel - .as(Encoders.bean(Relation.class)) - .toJavaRDD() - .filter(rel -> - rel.getSource().equalsIgnoreCase(whiteList.get(1).split(WHITELIST_SEPARATOR)[0]) && rel.getTarget().equalsIgnoreCase(whiteList.get(1).split(WHITELIST_SEPARATOR)[1])).count() > 0); - - assertEquals(338, sw_simrel.count()); - - } - - @Test - @Order(3) - void cutMergeRelsTest() throws Exception { - - ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkCreateMergeRels.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json"))); - - parser - .parseArgument( - new String[]{ - "-i", - testGraphBasePath, - "-asi", - testActionSetId, - "-la", - "lookupurl", - "-w", - testOutputBasePath, - "-cc", - "3" - }); - - new SparkCreateMergeRels(parser, spark).run(isLookUpService); - - long orgs_mergerel = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel") - .as(Encoders.bean(Relation.class)) - .filter((FilterFunction) r -> r.getRelClass().equalsIgnoreCase("merges")) - .groupBy("source") - .agg(count("target").alias("cnt")) - .select("source", "cnt") - .where("cnt > 3") - .count(); - - long pubs_mergerel = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel") - .as(Encoders.bean(Relation.class)) - .filter((FilterFunction) r -> r.getRelClass().equalsIgnoreCase("merges")) - .groupBy("source") - .agg(count("target").alias("cnt")) - .select("source", "cnt") - .where("cnt > 3") - .count(); - long sw_mergerel = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel") - .as(Encoders.bean(Relation.class)) - .filter((FilterFunction) r -> r.getRelClass().equalsIgnoreCase("merges")) - .groupBy("source") - .agg(count("target").alias("cnt")) - .select("source", "cnt") - .where("cnt > 3") - .count(); - - long ds_mergerel = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel") - .as(Encoders.bean(Relation.class)) - .filter((FilterFunction) r -> r.getRelClass().equalsIgnoreCase("merges")) - .groupBy("source") - .agg(count("target").alias("cnt")) - .select("source", "cnt") - .where("cnt > 3") - .count(); - - long orp_mergerel = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel") - .as(Encoders.bean(Relation.class)) - .filter((FilterFunction) r -> r.getRelClass().equalsIgnoreCase("merges")) - .groupBy("source") - .agg(count("target").alias("cnt")) - .select("source", "cnt") - .where("cnt > 3") - .count(); - - assertEquals(0, orgs_mergerel); - assertEquals(0, pubs_mergerel); - assertEquals(0, sw_mergerel); - assertEquals(0, ds_mergerel); - assertEquals(0, orp_mergerel); - - FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel")); - FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel")); - FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/software_mergerel")); - FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel")); - FileUtils - .deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel")); - } - - @Test - @Order(4) - void createMergeRelsTest() throws Exception { - - ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkCreateMergeRels.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json"))); - - parser - .parseArgument( - new String[]{ - "-i", - testGraphBasePath, - "-asi", - testActionSetId, - "-la", - "lookupurl", - "-w", - testOutputBasePath - }); - - new SparkCreateMergeRels(parser, spark).run(isLookUpService); - - long orgs_mergerel = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel") - .count(); - long pubs_mergerel = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel") - .count(); - long sw_mergerel = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel") - .count(); - long ds_mergerel = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel") - .count(); - - long orp_mergerel = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel") - .count(); - - assertEquals(1272, orgs_mergerel); - assertEquals(1438, pubs_mergerel); - assertEquals(286, sw_mergerel); - assertEquals(472, ds_mergerel); - assertEquals(718, orp_mergerel); - - } - - @Test - @Order(5) - void createDedupRecordTest() throws Exception { - - ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkCreateDedupRecord.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/createDedupRecord_parameters.json"))); - parser - .parseArgument( - new String[]{ - "-i", - testGraphBasePath, - "-asi", - testActionSetId, - "-la", - "lookupurl", - "-w", - testOutputBasePath - }); - - new SparkCreateDedupRecord(parser, spark).run(isLookUpService); - - long orgs_deduprecord = jsc - .textFile(testOutputBasePath + "/" + testActionSetId + "/organization_deduprecord") - .count(); - long pubs_deduprecord = jsc - .textFile(testOutputBasePath + "/" + testActionSetId + "/publication_deduprecord") - .count(); - long sw_deduprecord = jsc - .textFile(testOutputBasePath + "/" + testActionSetId + "/software_deduprecord") - .count(); - long ds_deduprecord = jsc.textFile(testOutputBasePath + "/" + testActionSetId + "/dataset_deduprecord").count(); - long orp_deduprecord = jsc - .textFile( - testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_deduprecord") - .count(); - - assertEquals(85, orgs_deduprecord); - assertEquals(65, pubs_deduprecord); - assertEquals(49, sw_deduprecord); - assertEquals(97, ds_deduprecord); - assertEquals(89, orp_deduprecord); - } - - @Test - @Order(6) - void updateEntityTest() throws Exception { - - ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkUpdateEntity.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json"))); - parser - .parseArgument( - new String[]{ - "-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath - }); - - new SparkUpdateEntity(parser, spark).run(isLookUpService); - - long organizations = jsc.textFile(testDedupGraphBasePath + "/organization").count(); - long publications = jsc.textFile(testDedupGraphBasePath + "/publication").count(); - long projects = jsc.textFile(testDedupGraphBasePath + "/project").count(); - long datasource = jsc.textFile(testDedupGraphBasePath + "/datasource").count(); - long softwares = jsc.textFile(testDedupGraphBasePath + "/software").count(); - long dataset = jsc.textFile(testDedupGraphBasePath + "/dataset").count(); - long otherresearchproduct = jsc.textFile(testDedupGraphBasePath + "/otherresearchproduct").count(); - - long mergedOrgs = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel") - .as(Encoders.bean(Relation.class)) - .where("relClass=='merges'") - .javaRDD() - .map(Relation::getTarget) - .distinct() - .count(); - - long mergedPubs = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel") - .as(Encoders.bean(Relation.class)) - .where("relClass=='merges'") - .javaRDD() - .map(Relation::getTarget) - .distinct() - .count(); - - long mergedSw = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel") - .as(Encoders.bean(Relation.class)) - .where("relClass=='merges'") - .javaRDD() - .map(Relation::getTarget) - .distinct() - .count(); - - long mergedDs = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel") - .as(Encoders.bean(Relation.class)) - .where("relClass=='merges'") - .javaRDD() - .map(Relation::getTarget) - .distinct() - .count(); - - long mergedOrp = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel") - .as(Encoders.bean(Relation.class)) - .where("relClass=='merges'") - .javaRDD() - .map(Relation::getTarget) - .distinct() - .count(); - - assertEquals(896, publications); - assertEquals(838, organizations); - assertEquals(100, projects); - assertEquals(100, datasource); - assertEquals(198, softwares); - assertEquals(389, dataset); - assertEquals(517, otherresearchproduct); - - long deletedOrgs = jsc - .textFile(testDedupGraphBasePath + "/organization") - .filter(this::isDeletedByInference) - .count(); - - long deletedPubs = jsc - .textFile(testDedupGraphBasePath + "/publication") - .filter(this::isDeletedByInference) - .count(); - - long deletedSw = jsc - .textFile(testDedupGraphBasePath + "/software") - .filter(this::isDeletedByInference) - .count(); - - long deletedDs = jsc - .textFile(testDedupGraphBasePath + "/dataset") - .filter(this::isDeletedByInference) - .count(); - - long deletedOrp = jsc - .textFile(testDedupGraphBasePath + "/otherresearchproduct") - .filter(this::isDeletedByInference) - .count(); - - assertEquals(mergedOrgs, deletedOrgs); - assertEquals(mergedPubs, deletedPubs); - assertEquals(mergedSw, deletedSw); - assertEquals(mergedDs, deletedDs); - assertEquals(mergedOrp, deletedOrp); - } - - @Test - @Order(7) - void propagateRelationTest() throws Exception { - - ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkPropagateRelation.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json"))); - parser - .parseArgument( - new String[]{ - "-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath - }); - - new SparkPropagateRelation(parser, spark).run(isLookUpService); - - long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count(); - - assertEquals(4860, relations); - - // check deletedbyinference - final Dataset mergeRels = spark - .read() - .load(DedupUtility.createMergeRelPath(testOutputBasePath, "*", "*")) - .as(Encoders.bean(Relation.class)); - final JavaPairRDD mergedIds = mergeRels - .where("relClass == 'merges'") - .select(mergeRels.col("target")) - .distinct() - .toJavaRDD() - .mapToPair( - (PairFunction) r -> new Tuple2(r.getString(0), "d")); - - JavaRDD toCheck = jsc - .textFile(testDedupGraphBasePath + "/relation") - .mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.source", json), json)) - .join(mergedIds) - .map(t -> t._2()._1()) - .mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.target", json), json)) - .join(mergedIds) - .map(t -> t._2()._1()); - - long deletedbyinference = toCheck.filter(this::isDeletedByInference).count(); - long updated = toCheck.count(); - - assertEquals(updated, deletedbyinference); - } - - @Test - @Order(8) - void testRelations() throws Exception { - testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_1.json", 12, 10); - testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_2.json", 10, 2); - } - - private void testUniqueness(String path, int expected_total, int expected_unique) { - Dataset rel = spark - .read() - .textFile(getClass().getResource(path).getPath()) - .map( - (MapFunction) s -> new ObjectMapper().readValue(s, Relation.class), - Encoders.bean(Relation.class)); - - assertEquals(expected_total, rel.count()); - assertEquals(expected_unique, rel.distinct().count()); - } - - @AfterAll - public static void finalCleanUp() throws IOException { - FileUtils.deleteDirectory(new File(testOutputBasePath)); - FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); - } - - public boolean isDeletedByInference(String s) { - return s.contains("\"deletedbyinference\":true"); - } + @Mock(serializable = true) + ISLookUpService isLookUpService; + + private static SparkSession spark; + private static JavaSparkContext jsc; + + private static String testGraphBasePath; + private static String testOutputBasePath; + private static String testDedupGraphBasePath; + private static final String testActionSetId = "test-orchestrator"; + private static String whitelistPath; + private static List whiteList; + + private static String WHITELIST_SEPARATOR = "####"; + + @BeforeAll + public static void cleanUp() throws IOException, URISyntaxException { + + testGraphBasePath = Paths + .get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/entities").toURI()) + .toFile() + .getAbsolutePath(); + testOutputBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-") + .toAbsolutePath() + .toString(); + + testDedupGraphBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-") + .toAbsolutePath() + .toString(); + + whitelistPath = Paths + .get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/whitelist.simrels.txt").toURI()) + .toFile() + .getAbsolutePath(); + whiteList = IOUtils.readLines(new FileReader(whitelistPath)); + + FileUtils.deleteDirectory(new File(testOutputBasePath)); + FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); + + final SparkConf conf = new SparkConf(); + conf.set("spark.sql.shuffle.partitions", "200"); + spark = SparkSession + .builder() + .appName(SparkDedupTest.class.getSimpleName()) + .master("local[*]") + .config(conf) + .getOrCreate(); + + jsc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + } + + @BeforeEach + public void setUp() throws IOException, ISLookUpException { + + lenient() + .when(isLookUpService.getResourceProfileByQuery(Mockito.contains(testActionSetId))) + .thenReturn( + IOUtils + .toString( + SparkDedupTest.class + .getResourceAsStream( + "/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator.xml"))); + + lenient() + .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("organization"))) + .thenReturn( + IOUtils + .toString( + SparkDedupTest.class + .getResourceAsStream( + "/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json"))); + + lenient() + .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("publication"))) + .thenReturn( + IOUtils + .toString( + SparkDedupTest.class + .getResourceAsStream( + "/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json"))); + + lenient() + .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("software"))) + .thenReturn( + IOUtils + .toString( + SparkDedupTest.class + .getResourceAsStream( + "/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json"))); + + lenient() + .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("dataset"))) + .thenReturn( + IOUtils + .toString( + SparkDedupTest.class + .getResourceAsStream( + "/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json"))); + + lenient() + .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("otherresearchproduct"))) + .thenReturn( + IOUtils + .toString( + SparkDedupTest.class + .getResourceAsStream( + "/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json"))); + } + + @Test + @Order(1) + void createSimRelsTest() throws Exception { + + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCreateSimRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json"))); + + parser + .parseArgument( + new String[] { + "-i", testGraphBasePath, + "-asi", testActionSetId, + "-la", "lookupurl", + "-w", testOutputBasePath, + "-np", "50" + }); + + new SparkCreateSimRels(parser, spark).run(isLookUpService); + + long orgs_simrel = spark + .read() + .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "organization")) + .count(); + + long pubs_simrel = spark + .read() + .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "publication")) + .count(); + + long sw_simrel = spark + .read() + .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "software")) + .count(); + + long ds_simrel = spark + .read() + .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "dataset")) + .count(); + + long orp_simrel = spark + .read() + .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "otherresearchproduct")) + .count(); + + assertEquals(3082, orgs_simrel); + assertEquals(7036, pubs_simrel); + assertEquals(336, sw_simrel); + assertEquals(442, ds_simrel); + assertEquals(6750, orp_simrel); + } + + @Test + @Order(2) + void whitelistSimRelsTest() throws Exception { + + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkWhitelistSimRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/whitelistSimRels_parameters.json"))); + + parser + .parseArgument( + new String[] { + "-i", testGraphBasePath, + "-asi", testActionSetId, + "-la", "lookupurl", + "-w", testOutputBasePath, + "-np", "50", + "-wl", whitelistPath + }); + + new SparkWhitelistSimRels(parser, spark).run(isLookUpService); + + long orgs_simrel = spark + .read() + .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "organization")) + .count(); + + long pubs_simrel = spark + .read() + .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "publication")) + .count(); + + long ds_simrel = spark + .read() + .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "dataset")) + .count(); + + long orp_simrel = spark + .read() + .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "otherresearchproduct")) + .count(); + + // entities simrels supposed to be equal to the number of previous step (no rels in whitelist) + assertEquals(3082, orgs_simrel); + assertEquals(7036, pubs_simrel); + assertEquals(442, ds_simrel); + assertEquals(6750, orp_simrel); + + // entities simrels to be different from the number of previous step (new simrels in the whitelist) + Dataset sw_simrel = spark + .read() + .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "software")); + + // check if the first relation in the whitelist exists + assertTrue( + sw_simrel + .as(Encoders.bean(Relation.class)) + .toJavaRDD() + .filter( + rel -> rel.getSource().equalsIgnoreCase(whiteList.get(0).split(WHITELIST_SEPARATOR)[0]) + && rel.getTarget().equalsIgnoreCase(whiteList.get(0).split(WHITELIST_SEPARATOR)[1])) + .count() > 0); + // check if the second relation in the whitelist exists + assertTrue( + sw_simrel + .as(Encoders.bean(Relation.class)) + .toJavaRDD() + .filter( + rel -> rel.getSource().equalsIgnoreCase(whiteList.get(1).split(WHITELIST_SEPARATOR)[0]) + && rel.getTarget().equalsIgnoreCase(whiteList.get(1).split(WHITELIST_SEPARATOR)[1])) + .count() > 0); + + assertEquals(338, sw_simrel.count()); + + } + + @Test + @Order(3) + void cutMergeRelsTest() throws Exception { + + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCreateMergeRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json"))); + + parser + .parseArgument( + new String[] { + "-i", + testGraphBasePath, + "-asi", + testActionSetId, + "-la", + "lookupurl", + "-w", + testOutputBasePath, + "-cc", + "3" + }); + + new SparkCreateMergeRels(parser, spark).run(isLookUpService); + + long orgs_mergerel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel") + .as(Encoders.bean(Relation.class)) + .filter((FilterFunction) r -> r.getRelClass().equalsIgnoreCase("merges")) + .groupBy("source") + .agg(count("target").alias("cnt")) + .select("source", "cnt") + .where("cnt > 3") + .count(); + + long pubs_mergerel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel") + .as(Encoders.bean(Relation.class)) + .filter((FilterFunction) r -> r.getRelClass().equalsIgnoreCase("merges")) + .groupBy("source") + .agg(count("target").alias("cnt")) + .select("source", "cnt") + .where("cnt > 3") + .count(); + long sw_mergerel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel") + .as(Encoders.bean(Relation.class)) + .filter((FilterFunction) r -> r.getRelClass().equalsIgnoreCase("merges")) + .groupBy("source") + .agg(count("target").alias("cnt")) + .select("source", "cnt") + .where("cnt > 3") + .count(); + + long ds_mergerel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel") + .as(Encoders.bean(Relation.class)) + .filter((FilterFunction) r -> r.getRelClass().equalsIgnoreCase("merges")) + .groupBy("source") + .agg(count("target").alias("cnt")) + .select("source", "cnt") + .where("cnt > 3") + .count(); + + long orp_mergerel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel") + .as(Encoders.bean(Relation.class)) + .filter((FilterFunction) r -> r.getRelClass().equalsIgnoreCase("merges")) + .groupBy("source") + .agg(count("target").alias("cnt")) + .select("source", "cnt") + .where("cnt > 3") + .count(); + + assertEquals(0, orgs_mergerel); + assertEquals(0, pubs_mergerel); + assertEquals(0, sw_mergerel); + assertEquals(0, ds_mergerel); + assertEquals(0, orp_mergerel); + + FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel")); + FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel")); + FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/software_mergerel")); + FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel")); + FileUtils + .deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel")); + } + + @Test + @Order(4) + void createMergeRelsTest() throws Exception { + + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCreateMergeRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json"))); + + parser + .parseArgument( + new String[] { + "-i", + testGraphBasePath, + "-asi", + testActionSetId, + "-la", + "lookupurl", + "-w", + testOutputBasePath + }); + + new SparkCreateMergeRels(parser, spark).run(isLookUpService); + + long orgs_mergerel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel") + .count(); + long pubs_mergerel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel") + .count(); + long sw_mergerel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel") + .count(); + long ds_mergerel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel") + .count(); + + long orp_mergerel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel") + .count(); + + assertEquals(1272, orgs_mergerel); + assertEquals(1438, pubs_mergerel); + assertEquals(286, sw_mergerel); + assertEquals(472, ds_mergerel); + assertEquals(718, orp_mergerel); + + } + + @Test + @Order(5) + void createDedupRecordTest() throws Exception { + + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCreateDedupRecord.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/createDedupRecord_parameters.json"))); + parser + .parseArgument( + new String[] { + "-i", + testGraphBasePath, + "-asi", + testActionSetId, + "-la", + "lookupurl", + "-w", + testOutputBasePath + }); + + new SparkCreateDedupRecord(parser, spark).run(isLookUpService); + + long orgs_deduprecord = jsc + .textFile(testOutputBasePath + "/" + testActionSetId + "/organization_deduprecord") + .count(); + long pubs_deduprecord = jsc + .textFile(testOutputBasePath + "/" + testActionSetId + "/publication_deduprecord") + .count(); + long sw_deduprecord = jsc + .textFile(testOutputBasePath + "/" + testActionSetId + "/software_deduprecord") + .count(); + long ds_deduprecord = jsc.textFile(testOutputBasePath + "/" + testActionSetId + "/dataset_deduprecord").count(); + long orp_deduprecord = jsc + .textFile( + testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_deduprecord") + .count(); + + assertEquals(85, orgs_deduprecord); + assertEquals(65, pubs_deduprecord); + assertEquals(49, sw_deduprecord); + assertEquals(97, ds_deduprecord); + assertEquals(89, orp_deduprecord); + } + + @Test + @Order(6) + void updateEntityTest() throws Exception { + + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkUpdateEntity.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json"))); + parser + .parseArgument( + new String[] { + "-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath + }); + + new SparkUpdateEntity(parser, spark).run(isLookUpService); + + long organizations = jsc.textFile(testDedupGraphBasePath + "/organization").count(); + long publications = jsc.textFile(testDedupGraphBasePath + "/publication").count(); + long projects = jsc.textFile(testDedupGraphBasePath + "/project").count(); + long datasource = jsc.textFile(testDedupGraphBasePath + "/datasource").count(); + long softwares = jsc.textFile(testDedupGraphBasePath + "/software").count(); + long dataset = jsc.textFile(testDedupGraphBasePath + "/dataset").count(); + long otherresearchproduct = jsc.textFile(testDedupGraphBasePath + "/otherresearchproduct").count(); + + long mergedOrgs = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel") + .as(Encoders.bean(Relation.class)) + .where("relClass=='merges'") + .javaRDD() + .map(Relation::getTarget) + .distinct() + .count(); + + long mergedPubs = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel") + .as(Encoders.bean(Relation.class)) + .where("relClass=='merges'") + .javaRDD() + .map(Relation::getTarget) + .distinct() + .count(); + + long mergedSw = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel") + .as(Encoders.bean(Relation.class)) + .where("relClass=='merges'") + .javaRDD() + .map(Relation::getTarget) + .distinct() + .count(); + + long mergedDs = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel") + .as(Encoders.bean(Relation.class)) + .where("relClass=='merges'") + .javaRDD() + .map(Relation::getTarget) + .distinct() + .count(); + + long mergedOrp = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel") + .as(Encoders.bean(Relation.class)) + .where("relClass=='merges'") + .javaRDD() + .map(Relation::getTarget) + .distinct() + .count(); + + assertEquals(896, publications); + assertEquals(838, organizations); + assertEquals(100, projects); + assertEquals(100, datasource); + assertEquals(198, softwares); + assertEquals(389, dataset); + assertEquals(517, otherresearchproduct); + + long deletedOrgs = jsc + .textFile(testDedupGraphBasePath + "/organization") + .filter(this::isDeletedByInference) + .count(); + + long deletedPubs = jsc + .textFile(testDedupGraphBasePath + "/publication") + .filter(this::isDeletedByInference) + .count(); + + long deletedSw = jsc + .textFile(testDedupGraphBasePath + "/software") + .filter(this::isDeletedByInference) + .count(); + + long deletedDs = jsc + .textFile(testDedupGraphBasePath + "/dataset") + .filter(this::isDeletedByInference) + .count(); + + long deletedOrp = jsc + .textFile(testDedupGraphBasePath + "/otherresearchproduct") + .filter(this::isDeletedByInference) + .count(); + + assertEquals(mergedOrgs, deletedOrgs); + assertEquals(mergedPubs, deletedPubs); + assertEquals(mergedSw, deletedSw); + assertEquals(mergedDs, deletedDs); + assertEquals(mergedOrp, deletedOrp); + } + + @Test + @Order(7) + void propagateRelationTest() throws Exception { + + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkPropagateRelation.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json"))); + parser + .parseArgument( + new String[] { + "-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath + }); + + new SparkPropagateRelation(parser, spark).run(isLookUpService); + + long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count(); + + assertEquals(4860, relations); + + // check deletedbyinference + final Dataset mergeRels = spark + .read() + .load(DedupUtility.createMergeRelPath(testOutputBasePath, "*", "*")) + .as(Encoders.bean(Relation.class)); + final JavaPairRDD mergedIds = mergeRels + .where("relClass == 'merges'") + .select(mergeRels.col("target")) + .distinct() + .toJavaRDD() + .mapToPair( + (PairFunction) r -> new Tuple2(r.getString(0), "d")); + + JavaRDD toCheck = jsc + .textFile(testDedupGraphBasePath + "/relation") + .mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.source", json), json)) + .join(mergedIds) + .map(t -> t._2()._1()) + .mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.target", json), json)) + .join(mergedIds) + .map(t -> t._2()._1()); + + long deletedbyinference = toCheck.filter(this::isDeletedByInference).count(); + long updated = toCheck.count(); + + assertEquals(updated, deletedbyinference); + } + + @Test + @Order(8) + void testRelations() throws Exception { + testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_1.json", 12, 10); + testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_2.json", 10, 2); + } + + private void testUniqueness(String path, int expected_total, int expected_unique) { + Dataset rel = spark + .read() + .textFile(getClass().getResource(path).getPath()) + .map( + (MapFunction) s -> new ObjectMapper().readValue(s, Relation.class), + Encoders.bean(Relation.class)); + + assertEquals(expected_total, rel.count()); + assertEquals(expected_unique, rel.distinct().count()); + } + + @AfterAll + public static void finalCleanUp() throws IOException { + FileUtils.deleteDirectory(new File(testOutputBasePath)); + FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); + } + + public boolean isDeletedByInference(String s) { + return s.contains("\"deletedbyinference\":true"); + } } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java index 0d7c74475..23e97a97a 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java @@ -69,7 +69,7 @@ public class PropagationConstant { PROPAGATION_DATA_INFO_TYPE, PROPAGATION_COUNTRY_INSTREPO_CLASS_ID, PROPAGATION_COUNTRY_INSTREPO_CLASS_NAME, - ModelConstants.DNET_PROVENANCE_ACTIONS)); + ModelConstants.DNET_PROVENANCE_ACTIONS)); return nc; } @@ -84,7 +84,8 @@ public class PropagationConstant { return di; } - public static Qualifier getQualifier(String inference_class_id, String inference_class_name, String qualifierSchema) { + public static Qualifier getQualifier(String inference_class_id, String inference_class_name, + String qualifierSchema) { Qualifier pa = new Qualifier(); pa.setClassid(inference_class_id); pa.setClassname(inference_class_name); @@ -108,7 +109,11 @@ public class PropagationConstant { r.setRelClass(rel_class); r.setRelType(rel_type); r.setSubRelType(subrel_type); - r.setDataInfo(getDataInfo(inference_provenance, inference_class_id, inference_class_name, ModelConstants.DNET_PROVENANCE_ACTIONS)); + r + .setDataInfo( + getDataInfo( + inference_provenance, inference_class_id, inference_class_name, + ModelConstants.DNET_PROVENANCE_ACTIONS)); return r; } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java index 68949b900..a38b4da2e 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java @@ -173,14 +173,17 @@ public class SparkOrcidToResultFromSemRelJob { if (toaddpid) { StructuredProperty p = new StructuredProperty(); p.setValue(autoritative_author.getOrcid()); - p.setQualifier(getQualifier(ModelConstants.ORCID_PENDING, ModelConstants.ORCID_CLASSNAME, ModelConstants.DNET_PID_TYPES)); + p + .setQualifier( + getQualifier( + ModelConstants.ORCID_PENDING, ModelConstants.ORCID_CLASSNAME, ModelConstants.DNET_PID_TYPES)); p .setDataInfo( getDataInfo( PROPAGATION_DATA_INFO_TYPE, PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_ID, PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_NAME, - ModelConstants.DNET_PROVENANCE_ACTIONS)); + ModelConstants.DNET_PROVENANCE_ACTIONS)); Optional> authorPid = Optional.ofNullable(author.getPid()); if (authorPid.isPresent()) { diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java index 1289ff644..50df08f8c 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java @@ -10,7 +10,6 @@ import java.util.List; import java.util.Optional; import java.util.stream.Collectors; -import eu.dnetlib.dhp.schema.common.ModelConstants; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; @@ -22,6 +21,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.Context; import eu.dnetlib.dhp.schema.oaf.Result; import scala.Tuple2; @@ -130,7 +130,7 @@ public class SparkResultToCommunityFromOrganizationJob { PROPAGATION_DATA_INFO_TYPE, PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME, - ModelConstants.DNET_PROVENANCE_ACTIONS))); + ModelConstants.DNET_PROVENANCE_ACTIONS))); propagatedContexts.add(newContext); } } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java index 7f76ead94..f31a26230 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java @@ -7,7 +7,6 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; import java.util.*; import java.util.stream.Collectors; -import eu.dnetlib.dhp.schema.common.ModelConstants; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; @@ -20,6 +19,7 @@ import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList; +import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.*; import scala.Tuple2; @@ -126,7 +126,7 @@ public class SparkResultToCommunityThroughSemRelJob { PROPAGATION_DATA_INFO_TYPE, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, - ModelConstants.DNET_PROVENANCE_ACTIONS))); + ModelConstants.DNET_PROVENANCE_ACTIONS))); return newContext; } return null; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultMapper.java index f4bf8d52a..988a8c7a4 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultMapper.java @@ -6,6 +6,8 @@ import java.util.*; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Encoders; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.dump.oaf.*; @@ -23,8 +25,6 @@ import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult; import eu.dnetlib.dhp.schema.dump.oaf.community.Context; import eu.dnetlib.dhp.schema.dump.oaf.graph.GraphResult; import eu.dnetlib.dhp.schema.oaf.*; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Encoders; public class ResultMapper implements Serializable { @@ -278,16 +278,17 @@ public class ResultMapper implements Serializable { } - Optional .ofNullable(input.getPid()) .ifPresent( - value -> out.setPid(value - .stream() - .map( - p -> - ControlledField - .newInstance(p.getQualifier().getClassid(), p.getValue())).collect(Collectors.toList()))); + value -> out + .setPid( + value + .stream() + .map( + p -> ControlledField + .newInstance(p.getQualifier().getClassid(), p.getValue())) + .collect(Collectors.toList()))); oStr = Optional.ofNullable(input.getDateofacceptance()); if (oStr.isPresent()) { @@ -298,11 +299,10 @@ public class ResultMapper implements Serializable { out.setPublisher(oStr.get().getValue()); } - Optional .ofNullable(input.getSource()) - .ifPresent(value -> out.setSource(value.stream().map(s -> s.getValue()).collect(Collectors.toList()) )); - // value.stream().forEach(s -> sourceList.add(s.getValue()))); + .ifPresent(value -> out.setSource(value.stream().map(s -> s.getValue()).collect(Collectors.toList()))); + // value.stream().forEach(s -> sourceList.add(s.getValue()))); // out.setSource(input.getSource().stream().map(s -> s.getValue()).collect(Collectors.toList())); List subjectList = new ArrayList<>(); Optional @@ -577,48 +577,60 @@ public class ResultMapper implements Serializable { Optional di = Optional.ofNullable(pid.getDataInfo()); if (di.isPresent()) { return Pid - .newInstance( - ControlledField - .newInstance( - pid.getQualifier().getClassid(), - pid.getValue()), - Provenance - .newInstance( - di.get().getProvenanceaction().getClassname(), - di.get().getTrust())); + .newInstance( + ControlledField + .newInstance( + pid.getQualifier().getClassid(), + pid.getValue()), + Provenance + .newInstance( + di.get().getProvenanceaction().getClassname(), + di.get().getTrust())); } else { return Pid - .newInstance( - ControlledField - .newInstance( - pid.getQualifier().getClassid(), - pid.getValue()) + .newInstance( + ControlledField + .newInstance( + pid.getQualifier().getClassid(), + pid.getValue()) - ); + ); } } private static Pid getOrcid(List p) { List pid_list = p.stream().map(pid -> { if (pid.getQualifier().getClassid().equals(ModelConstants.ORCID) || - (pid.getQualifier().getClassid().equals(ModelConstants.ORCID_PENDING))){ + (pid.getQualifier().getClassid().equals(ModelConstants.ORCID_PENDING))) { return pid; } - return null; + return null; }).filter(pid -> pid != null).collect(Collectors.toList()); - if(pid_list.size() == 1){ + if (pid_list.size() == 1) { return getAuthorPid(pid_list.get(0)); } - List orcid = pid_list.stream().filter(ap -> ap.getQualifier().getClassid() - .equals(ModelConstants.ORCID)).collect(Collectors.toList()); - if(orcid.size() == 1){ + List orcid = pid_list + .stream() + .filter( + ap -> ap + .getQualifier() + .getClassid() + .equals(ModelConstants.ORCID)) + .collect(Collectors.toList()); + if (orcid.size() == 1) { return getAuthorPid(orcid.get(0)); } - orcid = pid_list.stream().filter(ap -> ap.getQualifier().getClassid() - .equals(ModelConstants.ORCID_PENDING)).collect(Collectors.toList()); - if(orcid.size() == 1){ + orcid = pid_list + .stream() + .filter( + ap -> ap + .getQualifier() + .getClassid() + .equals(ModelConstants.ORCID_PENDING)) + .collect(Collectors.toList()); + if (orcid.size() == 1) { return getAuthorPid(orcid.get(0)); } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SaveCommunityMap.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SaveCommunityMap.java index 6cdd741ea..971324744 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SaveCommunityMap.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SaveCommunityMap.java @@ -86,10 +86,10 @@ public class SaveCommunityMap implements Serializable { private void saveCommunityMap(boolean singleCommunity, String community_id) throws ISLookUpException, IOException, DocumentException, SAXException { - writer - .write( - Utils.OBJECT_MAPPER - .writeValueAsString(queryInformationSystem.getCommunityMap(singleCommunity, community_id))); - } + writer + .write( + Utils.OBJECT_MAPPER + .writeValueAsString(queryInformationSystem.getCommunityMap(singleCommunity, community_id))); + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkPrepareResultProject.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkPrepareResultProject.java index edd6d9b23..3508e4126 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkPrepareResultProject.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkPrepareResultProject.java @@ -8,8 +8,6 @@ import java.io.StringReader; import java.util.*; import java.util.stream.Collectors; -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.dump.oaf.community.Validated; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; @@ -28,9 +26,11 @@ import org.xml.sax.SAXException; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.dump.oaf.Provenance; import eu.dnetlib.dhp.schema.dump.oaf.community.Funder; import eu.dnetlib.dhp.schema.dump.oaf.community.Project; +import eu.dnetlib.dhp.schema.dump.oaf.community.Validated; import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.Field; import eu.dnetlib.dhp.schema.oaf.Relation; @@ -80,7 +80,9 @@ public class SparkPrepareResultProject implements Serializable { private static void prepareResultProjectList(SparkSession spark, String inputPath, String outputPath) { Dataset relation = Utils .readPath(spark, inputPath + "/relation", Relation.class) - .filter("dataInfo.deletedbyinference = false and lower(relClass) = '" + ModelConstants.IS_PRODUCED_BY.toLowerCase() + "'"); + .filter( + "dataInfo.deletedbyinference = false and lower(relClass) = '" + + ModelConstants.IS_PRODUCED_BY.toLowerCase() + "'"); Dataset projects = Utils .readPath(spark, inputPath + "/project", eu.dnetlib.dhp.schema.oaf.Project.class); @@ -159,7 +161,7 @@ public class SparkPrepareResultProject implements Serializable { provenance.setTrust(di.get().getTrust()); p.setProvenance(provenance); } - if (relation.getValidated()){ + if (relation.getValidated()) { p.setValidated(Validated.newInstance(relation.getValidated(), relation.getValidationDate())); } return p; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/DumpGraphEntities.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/DumpGraphEntities.java index 7441baaef..7f64db41c 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/DumpGraphEntities.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/DumpGraphEntities.java @@ -9,7 +9,6 @@ import java.io.StringReader; import java.util.*; import java.util.stream.Collectors; -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.ForeachFunction; @@ -23,6 +22,8 @@ import org.dom4j.DocumentException; import org.dom4j.Node; import org.dom4j.io.SAXReader; +import com.fasterxml.jackson.databind.ObjectMapper; + import eu.dnetlib.dhp.oa.graph.dump.DumpProducts; import eu.dnetlib.dhp.oa.graph.dump.Utils; import eu.dnetlib.dhp.schema.common.ModelSupport; @@ -453,18 +454,20 @@ public class DumpGraphEntities implements Serializable { private static void organizationMap(SparkSession spark, String inputPath, String outputPath, Class inputClazz) { - Utils.readPath(spark, inputPath, inputClazz) - .map( - (MapFunction) o -> mapOrganization((eu.dnetlib.dhp.schema.oaf.Organization) o), - Encoders.bean(Organization.class)) - .filter((FilterFunction) o -> o!= null) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath); + Utils + .readPath(spark, inputPath, inputClazz) + .map( + (MapFunction) o -> mapOrganization((eu.dnetlib.dhp.schema.oaf.Organization) o), + Encoders.bean(Organization.class)) + .filter((FilterFunction) o -> o != null) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); } - private static eu.dnetlib.dhp.schema.dump.oaf.graph.Organization mapOrganization(eu.dnetlib.dhp.schema.oaf.Organization org) { + private static eu.dnetlib.dhp.schema.dump.oaf.graph.Organization mapOrganization( + eu.dnetlib.dhp.schema.oaf.Organization org) { if (org.getDataInfo().getDeletedbyinference()) return null; Organization organization = new Organization(); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/QueryInformationSystem.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/QueryInformationSystem.java index be1f21171..88c92da53 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/QueryInformationSystem.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/QueryInformationSystem.java @@ -5,8 +5,6 @@ import java.io.StringReader; import java.util.*; import java.util.function.Consumer; -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.common.ModelSupport; import org.dom4j.Document; import org.dom4j.DocumentException; import org.dom4j.Element; @@ -15,6 +13,8 @@ import org.dom4j.io.SAXReader; import org.jetbrains.annotations.NotNull; import org.xml.sax.SAXException; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.utils.DHPUtils; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; @@ -140,7 +140,7 @@ public class QueryInformationSystem { } private String makeOpenaireId(Node el, String prefix) { - if (!prefix.equals(ModelSupport.entityIdPrefix.get("project"))){ + if (!prefix.equals(ModelSupport.entityIdPrefix.get("project"))) { return null; } String funder = null; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkDumpRelationJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkDumpRelationJob.java index 7370b43f8..ab7e7e595 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkDumpRelationJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkDumpRelationJob.java @@ -107,7 +107,7 @@ public class SparkDumpRelationJob implements Serializable { } } } - if(relation.getValidated()){ + if (relation.getValidated()) { rel_new.setValidated(relation.getValidated()); rel_new.setValidationDate(relation.getValidationDate()); } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java index d520a9c6a..9cddba3c5 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java @@ -41,9 +41,9 @@ public class SparkDumpFunderResults implements Serializable { parser.parseArgument(args); Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); final String inputPath = parser.get("sourcePath"); @@ -58,32 +58,31 @@ public class SparkDumpFunderResults implements Serializable { SparkConf conf = new SparkConf(); runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - Utils.removeOutputDir(spark, outputPath); - writeResultProjectList(spark, inputPath, outputPath, graphPath); - }); + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + writeResultProjectList(spark, inputPath, outputPath, graphPath); + }); } private static void writeResultProjectList(SparkSession spark, String inputPath, String outputPath, - String graphPath) { + String graphPath) { Dataset project = Utils - .readPath(spark, graphPath + "/project", eu.dnetlib.dhp.schema.oaf.Project.class); + .readPath(spark, graphPath + "/project", eu.dnetlib.dhp.schema.oaf.Project.class); Dataset result = Utils - .readPath(spark, inputPath + "/publication", CommunityResult.class) - .union(Utils.readPath(spark, inputPath + "/dataset", CommunityResult.class)) - .union(Utils.readPath(spark, inputPath + "/orp", CommunityResult.class)) - .union(Utils.readPath(spark, inputPath + "/software", CommunityResult.class)); - + .readPath(spark, inputPath + "/publication", CommunityResult.class) + .union(Utils.readPath(spark, inputPath + "/dataset", CommunityResult.class)) + .union(Utils.readPath(spark, inputPath + "/orp", CommunityResult.class)) + .union(Utils.readPath(spark, inputPath + "/software", CommunityResult.class)); List funderList = project - .select("id") - .map((MapFunction) value -> value.getString(0).substring(0, 15), Encoders.STRING()) - .distinct() - .collectAsList(); + .select("id") + .map((MapFunction) value -> value.getString(0).substring(0, 15), Encoders.STRING()) + .distinct() + .collectAsList(); funderList.forEach(funder -> { String fundernsp = funder.substring(3); @@ -104,7 +103,7 @@ public class SparkDumpFunderResults implements Serializable { } private static void dumpResults(String nsp, Dataset results, String outputPath, - String funderName) { + String funderName) { results.map((MapFunction) r -> { if (!Optional.ofNullable(r.getProjects()).isPresent()) { @@ -123,15 +122,15 @@ public class SparkDumpFunderResults implements Serializable { } return null; }, Encoders.bean(CommunityResult.class)) - .filter(Objects::nonNull) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath + "/" + funderName); + .filter(Objects::nonNull) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "/" + funderName); } private static void writeFunderResult(String funder, Dataset results, String outputPath, - String funderDump) { + String funderDump) { if (funder.startsWith("40|irb")) { dumpResults(funder, results, outputPath, "HRZZ"); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkResultLinkedToProject.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkResultLinkedToProject.java index e17aa285c..156ae6d63 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkResultLinkedToProject.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkResultLinkedToProject.java @@ -80,7 +80,6 @@ public class SparkResultLinkedToProject implements Serializable { private static void writeResultsLinkedToProjects(SparkSession spark, Class inputClazz, String inputPath, String outputPath, String graphPath) { - Dataset results = Utils .readPath(spark, inputPath, inputClazz) .filter("dataInfo.deletedbyinference = false and datainfo.invisible = false"); diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/DumpJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/DumpJobTest.java index 3a4f05fb5..bf6301ec4 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/DumpJobTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/DumpJobTest.java @@ -7,17 +7,10 @@ import java.nio.file.Path; import java.util.Arrays; import java.util.List; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.sun.xml.internal.ws.policy.AssertionSet; -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.dump.oaf.Instance; -import eu.dnetlib.dhp.schema.dump.oaf.OpenAccessRoute; import org.apache.commons.io.FileUtils; -import org.apache.neethi.Assertion; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.ForeachFunction; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; @@ -25,10 +18,14 @@ import org.junit.jupiter.api.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.gson.Gson; import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.dump.oaf.Instance; +import eu.dnetlib.dhp.schema.dump.oaf.OpenAccessRoute; import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult; import eu.dnetlib.dhp.schema.dump.oaf.graph.GraphResult; import eu.dnetlib.dhp.schema.oaf.Dataset; @@ -145,70 +142,121 @@ public class DumpJobTest { } @Test - public void testPublicationDump(){ + public void testPublicationDump() { final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultDump/publication_extendedinstance") - .getPath(); + .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultDump/publication_extendedinstance") + .getPath(); final String communityMapPath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/communityMapPath/communitymap.json") - .getPath(); + .getResource("/eu/dnetlib/dhp/oa/graph/dump/communityMapPath/communitymap.json") + .getPath(); DumpProducts dump = new DumpProducts(); dump - .run( - // false, sourcePath, workingDir.toString() + "/result", communityMapPath, Publication.class, - false, sourcePath, workingDir.toString() + "/result", communityMapPath, Publication.class, - GraphResult.class, Constants.DUMPTYPE.COMPLETE.getType()); + .run( + // false, sourcePath, workingDir.toString() + "/result", communityMapPath, Publication.class, + false, sourcePath, workingDir.toString() + "/result", communityMapPath, Publication.class, + GraphResult.class, Constants.DUMPTYPE.COMPLETE.getType()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .textFile(workingDir.toString() + "/result") - .map(item -> OBJECT_MAPPER.readValue(item, GraphResult.class)); + .textFile(workingDir.toString() + "/result") + .map(item -> OBJECT_MAPPER.readValue(item, GraphResult.class)); org.apache.spark.sql.Dataset verificationDataset = spark - .createDataset(tmp.rdd(), Encoders.bean(GraphResult.class)); + .createDataset(tmp.rdd(), Encoders.bean(GraphResult.class)); Assertions.assertEquals(1, verificationDataset.count()); GraphResult gr = verificationDataset.first(); - Assertions.assertEquals(2, gr.getMeasures().size()); - Assertions.assertTrue(gr.getMeasures().stream().anyMatch(m -> m.getKey().equals("influence") - && m.getValue().equals("1.62759106106e-08"))); - Assertions.assertTrue(gr.getMeasures().stream().anyMatch(m -> m.getKey().equals("popularity") - && m.getValue().equals("0.22519296"))); + Assertions + .assertTrue( + gr + .getMeasures() + .stream() + .anyMatch( + m -> m.getKey().equals("influence") + && m.getValue().equals("1.62759106106e-08"))); + Assertions + .assertTrue( + gr + .getMeasures() + .stream() + .anyMatch( + m -> m.getKey().equals("popularity") + && m.getValue().equals("0.22519296"))); Assertions.assertEquals(6, gr.getAuthor().size()); - Assertions.assertTrue(gr.getAuthor().stream().anyMatch(a -> a.getFullname().equals("Nikolaidou,Charitini") && - a.getName().equals("Charitini") && a.getSurname().equals("Nikolaidou") - && a.getRank() == 1 && a.getPid() == null)); + Assertions + .assertTrue( + gr + .getAuthor() + .stream() + .anyMatch( + a -> a.getFullname().equals("Nikolaidou,Charitini") && + a.getName().equals("Charitini") && a.getSurname().equals("Nikolaidou") + && a.getRank() == 1 && a.getPid() == null)); - Assertions.assertTrue(gr.getAuthor().stream().anyMatch(a -> a.getFullname().equals("Votsi,Nefta") && - a.getName().equals("Nefta") && a.getSurname().equals("Votsi") - && a.getRank() == 2 && a.getPid().getId().getScheme().equals(ModelConstants.ORCID) - && a.getPid().getId().getValue().equals("0000-0001-6651-1178") && a.getPid().getProvenance() != null)); + Assertions + .assertTrue( + gr + .getAuthor() + .stream() + .anyMatch( + a -> a.getFullname().equals("Votsi,Nefta") && + a.getName().equals("Nefta") && a.getSurname().equals("Votsi") + && a.getRank() == 2 && a.getPid().getId().getScheme().equals(ModelConstants.ORCID) + && a.getPid().getId().getValue().equals("0000-0001-6651-1178") + && a.getPid().getProvenance() != null)); - Assertions.assertTrue(gr.getAuthor().stream().anyMatch(a -> a.getFullname().equals("Sgardelis,Steanos") && - a.getName().equals("Steanos") && a.getSurname().equals("Sgardelis") - && a.getRank() == 3 && a.getPid().getId().getScheme().equals(ModelConstants.ORCID_PENDING) - && a.getPid().getId().getValue().equals("0000-0001-6651-1178") && a.getPid().getProvenance() != null)); + Assertions + .assertTrue( + gr + .getAuthor() + .stream() + .anyMatch( + a -> a.getFullname().equals("Sgardelis,Steanos") && + a.getName().equals("Steanos") && a.getSurname().equals("Sgardelis") + && a.getRank() == 3 && a.getPid().getId().getScheme().equals(ModelConstants.ORCID_PENDING) + && a.getPid().getId().getValue().equals("0000-0001-6651-1178") + && a.getPid().getProvenance() != null)); - Assertions.assertTrue(gr.getAuthor().stream().anyMatch(a -> a.getFullname().equals("Halley,John") && - a.getName().equals("John") && a.getSurname().equals("Halley") - && a.getRank() == 4 && a.getPid() == null)); + Assertions + .assertTrue( + gr + .getAuthor() + .stream() + .anyMatch( + a -> a.getFullname().equals("Halley,John") && + a.getName().equals("John") && a.getSurname().equals("Halley") + && a.getRank() == 4 && a.getPid() == null)); - Assertions.assertTrue(gr.getAuthor().stream().anyMatch(a -> a.getFullname().equals("Pantis,John") && - a.getName().equals("John") && a.getSurname().equals("Pantis") - && a.getRank() == 5 && a.getPid().getId().getScheme().equals(ModelConstants.ORCID) - && a.getPid().getId().getValue().equals("0000-0001-6651-1178") && a.getPid().getProvenance() != null)); + Assertions + .assertTrue( + gr + .getAuthor() + .stream() + .anyMatch( + a -> a.getFullname().equals("Pantis,John") && + a.getName().equals("John") && a.getSurname().equals("Pantis") + && a.getRank() == 5 && a.getPid().getId().getScheme().equals(ModelConstants.ORCID) + && a.getPid().getId().getValue().equals("0000-0001-6651-1178") + && a.getPid().getProvenance() != null)); - Assertions.assertTrue(gr.getAuthor().stream().anyMatch(a -> a.getFullname().equals("Tsiafouli,Maria") && - a.getName().equals("Maria") && a.getSurname().equals("Tsiafouli") - && a.getRank() == 6 && a.getPid().getId().getScheme().equals(ModelConstants.ORCID_PENDING) - && a.getPid().getId().getValue().equals("0000-0001-6651-1178") && a.getPid().getProvenance() != null)); + Assertions + .assertTrue( + gr + .getAuthor() + .stream() + .anyMatch( + a -> a.getFullname().equals("Tsiafouli,Maria") && + a.getName().equals("Maria") && a.getSurname().equals("Tsiafouli") + && a.getRank() == 6 && a.getPid().getId().getScheme().equals(ModelConstants.ORCID_PENDING) + && a.getPid().getId().getValue().equals("0000-0001-6651-1178") + && a.getPid().getProvenance() != null)); Assertions.assertEquals("publication", gr.getType()); @@ -216,27 +264,52 @@ public class DumpJobTest { Assertions.assertEquals("English", gr.getLanguage().getLabel()); Assertions.assertEquals(1, gr.getCountry().size()); - Assertions.assertEquals("IT" , gr.getCountry().get(0).getCode()); - Assertions.assertEquals("Italy" , gr.getCountry().get(0).getLabel()); - Assertions.assertTrue( gr.getCountry().get(0).getProvenance() == null); + Assertions.assertEquals("IT", gr.getCountry().get(0).getCode()); + Assertions.assertEquals("Italy", gr.getCountry().get(0).getLabel()); + Assertions.assertTrue(gr.getCountry().get(0).getProvenance() == null); Assertions.assertEquals(12, gr.getSubjects().size()); - Assertions.assertTrue(gr.getSubjects().stream().anyMatch(s -> s.getSubject().getValue().equals("Ecosystem Services hotspots") - && s.getSubject().getScheme().equals("ACM") && s.getProvenance() != null && - s.getProvenance().getProvenance().equals("sysimport:crosswalk:repository"))); - Assertions.assertTrue(gr.getSubjects().stream().anyMatch(s -> s.getSubject().getValue().equals("Natura 2000") - && s.getSubject().getScheme().equals("") && s.getProvenance() != null && - s.getProvenance().getProvenance().equals("sysimport:crosswalk:repository"))); + Assertions + .assertTrue( + gr + .getSubjects() + .stream() + .anyMatch( + s -> s.getSubject().getValue().equals("Ecosystem Services hotspots") + && s.getSubject().getScheme().equals("ACM") && s.getProvenance() != null && + s.getProvenance().getProvenance().equals("sysimport:crosswalk:repository"))); + Assertions + .assertTrue( + gr + .getSubjects() + .stream() + .anyMatch( + s -> s.getSubject().getValue().equals("Natura 2000") + && s.getSubject().getScheme().equals("") && s.getProvenance() != null && + s.getProvenance().getProvenance().equals("sysimport:crosswalk:repository"))); - Assertions.assertEquals("Ecosystem Service capacity is higher in areas of multiple designation types", + Assertions + .assertEquals( + "Ecosystem Service capacity is higher in areas of multiple designation types", gr.getMaintitle()); Assertions.assertEquals(null, gr.getSubtitle()); Assertions.assertEquals(1, gr.getDescription().size()); - Assertions.assertTrue(gr.getDescription().get(0).startsWith("The implementation of the Ecosystem Service (ES) concept into practice")); - Assertions.assertTrue(gr.getDescription().get(0).endsWith("start complying with new standards and demands for nature conservation and environmental management.")); + Assertions + .assertTrue( + gr + .getDescription() + .get(0) + .startsWith("The implementation of the Ecosystem Service (ES) concept into practice")); + Assertions + .assertTrue( + gr + .getDescription() + .get(0) + .endsWith( + "start complying with new standards and demands for nature conservation and environmental management.")); Assertions.assertEquals("2017-01-01", gr.getPublicationdate()); @@ -255,7 +328,9 @@ public class DumpJobTest { Assertions.assertEquals(0, gr.getCoverage().size()); Assertions.assertEquals(ModelConstants.ACCESS_RIGHT_OPEN, gr.getBestaccessright().getLabel()); - Assertions.assertEquals(Constants.accessRightsCoarMap.get(ModelConstants.ACCESS_RIGHT_OPEN), gr.getBestaccessright().getCode()); + Assertions + .assertEquals( + Constants.accessRightsCoarMap.get(ModelConstants.ACCESS_RIGHT_OPEN), gr.getBestaccessright().getCode()); Assertions.assertEquals(null, gr.getBestaccessright().getOpenAccessRoute()); Assertions.assertEquals("One Ecosystem", gr.getContainer().getName()); @@ -284,12 +359,16 @@ public class DumpJobTest { Assertions.assertEquals("50|pensoft_____::00ea4a1cd53806a97d62ea6bf268f2a2", gr.getId()); Assertions.assertEquals(2, gr.getOriginalId().size()); - Assertions.assertTrue(gr.getOriginalId().contains("50|pensoft_____::00ea4a1cd53806a97d62ea6bf268f2a2") - && gr.getOriginalId().contains("10.3897/oneeco.2.e13718")); + Assertions + .assertTrue( + gr.getOriginalId().contains("50|pensoft_____::00ea4a1cd53806a97d62ea6bf268f2a2") + && gr.getOriginalId().contains("10.3897/oneeco.2.e13718")); Assertions.assertEquals(1, gr.getPid().size()); - Assertions.assertTrue(gr.getPid().get(0).getScheme().equals("doi") - && gr.getPid().get(0).getValue().equals("10.1016/j.triboint.2014.05.004")); + Assertions + .assertTrue( + gr.getPid().get(0).getScheme().equals("doi") + && gr.getPid().get(0).getValue().equals("10.1016/j.triboint.2014.05.004")); Assertions.assertEquals("2020-03-23T00:20:51.392Z", gr.getDateofcollection()); @@ -298,53 +377,63 @@ public class DumpJobTest { Instance instance = gr.getInstance().get(0); Assertions.assertEquals(0, instance.getPid().size()); Assertions.assertEquals(1, instance.getAlternateIdentifier().size()); - Assertions.assertTrue(instance.getAlternateIdentifier().get(0).getScheme().equals("doi") - && instance.getAlternateIdentifier().get(0).getValue().equals("10.3897/oneeco.2.e13718")); + Assertions + .assertTrue( + instance.getAlternateIdentifier().get(0).getScheme().equals("doi") + && instance.getAlternateIdentifier().get(0).getValue().equals("10.3897/oneeco.2.e13718")); Assertions.assertEquals(null, instance.getLicense()); - Assertions.assertTrue(instance.getAccessright().getCode().equals(Constants.accessRightsCoarMap - .get(ModelConstants.ACCESS_RIGHT_OPEN))); + Assertions + .assertTrue( + instance + .getAccessright() + .getCode() + .equals( + Constants.accessRightsCoarMap + .get(ModelConstants.ACCESS_RIGHT_OPEN))); Assertions.assertTrue(instance.getAccessright().getLabel().equals(ModelConstants.ACCESS_RIGHT_OPEN)); Assertions.assertTrue(instance.getAccessright().getOpenAccessRoute().equals(OpenAccessRoute.green)); Assertions.assertTrue(instance.getType().equals("Article")); Assertions.assertEquals(2, instance.getUrl().size()); - Assertions.assertTrue(instance.getUrl().contains("https://doi.org/10.3897/oneeco.2.e13718") - && instance.getUrl().contains("https://oneecosystem.pensoft.net/article/13718/")); - Assertions.assertEquals("2017-01-01",instance.getPublicationdate()); - Assertions.assertEquals(null,instance.getArticleprocessingcharge()); + Assertions + .assertTrue( + instance.getUrl().contains("https://doi.org/10.3897/oneeco.2.e13718") + && instance.getUrl().contains("https://oneecosystem.pensoft.net/article/13718/")); + Assertions.assertEquals("2017-01-01", instance.getPublicationdate()); + Assertions.assertEquals(null, instance.getArticleprocessingcharge()); Assertions.assertEquals("peerReviewed", instance.getRefereed()); } - @Test - public void testDatasetDump(){ + public void testDatasetDump() { final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultDump/dataset_extendedinstance") - .getPath(); + .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultDump/dataset_extendedinstance") + .getPath(); final String communityMapPath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/communityMapPath/communitymap.json") - .getPath(); + .getResource("/eu/dnetlib/dhp/oa/graph/dump/communityMapPath/communitymap.json") + .getPath(); DumpProducts dump = new DumpProducts(); dump - .run(false, sourcePath, workingDir.toString() + "/result", - communityMapPath, Dataset.class, - GraphResult.class, Constants.DUMPTYPE.COMPLETE.getType()); + .run( + false, sourcePath, workingDir.toString() + "/result", + communityMapPath, Dataset.class, + GraphResult.class, Constants.DUMPTYPE.COMPLETE.getType()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .textFile(workingDir.toString() + "/result") - .map(item -> OBJECT_MAPPER.readValue(item, GraphResult.class)); + .textFile(workingDir.toString() + "/result") + .map(item -> OBJECT_MAPPER.readValue(item, GraphResult.class)); org.apache.spark.sql.Dataset verificationDataset = spark - .createDataset(tmp.rdd(), Encoders.bean(GraphResult.class)); + .createDataset(tmp.rdd(), Encoders.bean(GraphResult.class)); Assertions.assertEquals(1, verificationDataset.count()); Assertions.assertEquals(1, verificationDataset.filter("type = 'dataset'").count()); - //the common fields in the result have been already checked. Now checking only + // the common fields in the result have been already checked. Now checking only // community specific fields GraphResult gr = verificationDataset.first(); @@ -353,10 +442,33 @@ public class DumpJobTest { Assertions.assertEquals(2, gr.getGeolocation().stream().filter(gl -> gl.getBox().equals("")).count()); Assertions.assertEquals(1, gr.getGeolocation().stream().filter(gl -> gl.getPlace().equals("")).count()); Assertions.assertEquals(1, gr.getGeolocation().stream().filter(gl -> gl.getPoint().equals("")).count()); - Assertions.assertEquals(1, gr.getGeolocation().stream().filter(gl -> gl.getPlace().equals("18 York St, Ottawa, ON K1N 5S6; Ottawa; Ontario; Canada")).count()); - Assertions.assertEquals(1, gr.getGeolocation().stream().filter(gl -> gl.getPoint().equals("45.427242 -75.693904")).count()); - Assertions.assertEquals(1, gr.getGeolocation().stream().filter(gl -> gl.getPoint().equals("") && !gl.getPlace().equals("")).count()); - Assertions.assertEquals(1, gr.getGeolocation().stream().filter(gl -> !gl.getPoint().equals("") && gl.getPlace().equals("")).count()); + Assertions + .assertEquals( + 1, + gr + .getGeolocation() + .stream() + .filter(gl -> gl.getPlace().equals("18 York St, Ottawa, ON K1N 5S6; Ottawa; Ontario; Canada")) + .count()); + Assertions + .assertEquals( + 1, gr.getGeolocation().stream().filter(gl -> gl.getPoint().equals("45.427242 -75.693904")).count()); + Assertions + .assertEquals( + 1, + gr + .getGeolocation() + .stream() + .filter(gl -> gl.getPoint().equals("") && !gl.getPlace().equals("")) + .count()); + Assertions + .assertEquals( + 1, + gr + .getGeolocation() + .stream() + .filter(gl -> !gl.getPoint().equals("") && gl.getPlace().equals("")) + .count()); Assertions.assertEquals("1024Gb", gr.getSize()); @@ -373,30 +485,30 @@ public class DumpJobTest { } @Test - public void testSoftwareDump(){ + public void testSoftwareDump() { final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultDump/software_extendedinstance") - .getPath(); + .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultDump/software_extendedinstance") + .getPath(); final String communityMapPath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/communityMapPath/communitymap.json") - .getPath(); + .getResource("/eu/dnetlib/dhp/oa/graph/dump/communityMapPath/communitymap.json") + .getPath(); DumpProducts dump = new DumpProducts(); dump - .run(false, sourcePath, workingDir.toString() + "/result", - communityMapPath, Software.class, - GraphResult.class, Constants.DUMPTYPE.COMPLETE.getType()); + .run( + false, sourcePath, workingDir.toString() + "/result", + communityMapPath, Software.class, + GraphResult.class, Constants.DUMPTYPE.COMPLETE.getType()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - JavaRDD tmp = sc - .textFile(workingDir.toString() + "/result") - .map(item -> OBJECT_MAPPER.readValue(item, GraphResult.class)); + .textFile(workingDir.toString() + "/result") + .map(item -> OBJECT_MAPPER.readValue(item, GraphResult.class)); org.apache.spark.sql.Dataset verificationDataset = spark - .createDataset(tmp.rdd(), Encoders.bean(GraphResult.class)); + .createDataset(tmp.rdd(), Encoders.bean(GraphResult.class)); Assertions.assertEquals(1, verificationDataset.count()); @@ -412,7 +524,6 @@ public class DumpJobTest { Assertions.assertEquals("perl", gr.getProgrammingLanguage()); - Assertions.assertEquals(null, gr.getContainer()); Assertions.assertEquals(null, gr.getContactperson()); Assertions.assertEquals(null, gr.getContactgroup()); @@ -424,30 +535,30 @@ public class DumpJobTest { } @Test - public void testOrpDump(){ + public void testOrpDump() { final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultDump/orp_extendedinstance") - .getPath(); + .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultDump/orp_extendedinstance") + .getPath(); final String communityMapPath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/communityMapPath/communitymap.json") - .getPath(); + .getResource("/eu/dnetlib/dhp/oa/graph/dump/communityMapPath/communitymap.json") + .getPath(); DumpProducts dump = new DumpProducts(); dump - .run(false, sourcePath, workingDir.toString() + "/result", - communityMapPath, OtherResearchProduct.class, - GraphResult.class, Constants.DUMPTYPE.COMPLETE.getType()); + .run( + false, sourcePath, workingDir.toString() + "/result", + communityMapPath, OtherResearchProduct.class, + GraphResult.class, Constants.DUMPTYPE.COMPLETE.getType()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - JavaRDD tmp = sc - .textFile(workingDir.toString() + "/result") - .map(item -> OBJECT_MAPPER.readValue(item, GraphResult.class)); + .textFile(workingDir.toString() + "/result") + .map(item -> OBJECT_MAPPER.readValue(item, GraphResult.class)); org.apache.spark.sql.Dataset verificationDataset = spark - .createDataset(tmp.rdd(), Encoders.bean(GraphResult.class)); + .createDataset(tmp.rdd(), Encoders.bean(GraphResult.class)); Assertions.assertEquals(1, verificationDataset.count()); @@ -466,7 +577,6 @@ public class DumpJobTest { Assertions.assertTrue(gr.getTool().contains("tool1")); Assertions.assertTrue(gr.getTool().contains("tool2")); - Assertions.assertEquals(null, gr.getContainer()); Assertions.assertEquals(null, gr.getDocumentationUrl()); Assertions.assertEquals(null, gr.getCodeRepositoryUrl()); @@ -481,32 +591,33 @@ public class DumpJobTest { public void testPublicationDumpCommunity() throws JsonProcessingException { final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultDump/publication_extendedinstance") - .getPath(); + .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultDump/publication_extendedinstance") + .getPath(); final String communityMapPath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/communityMapPath/communitymap.json") - .getPath(); + .getResource("/eu/dnetlib/dhp/oa/graph/dump/communityMapPath/communitymap.json") + .getPath(); DumpProducts dump = new DumpProducts(); dump - .run(false, sourcePath, workingDir.toString() + "/result", communityMapPath, Publication.class, - CommunityResult.class, Constants.DUMPTYPE.COMMUNITY.getType()); + .run( + false, sourcePath, workingDir.toString() + "/result", communityMapPath, Publication.class, + CommunityResult.class, Constants.DUMPTYPE.COMMUNITY.getType()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .textFile(workingDir.toString() + "/result") - .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); + .textFile(workingDir.toString() + "/result") + .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); org.apache.spark.sql.Dataset verificationDataset = spark - .createDataset(tmp.rdd(), Encoders.bean(CommunityResult.class)); + .createDataset(tmp.rdd(), Encoders.bean(CommunityResult.class)); Assertions.assertEquals(1, verificationDataset.count()); Assertions.assertEquals(1, verificationDataset.filter("type = 'publication'").count()); - //the common fields in the result have been already checked. Now checking only + // the common fields in the result have been already checked. Now checking only // community specific fields CommunityResult cr = verificationDataset.first(); @@ -519,15 +630,20 @@ public class DumpJobTest { Assertions.assertEquals("0.9", cr.getContext().get(0).getProvenance().get(0).getTrust()); Assertions.assertEquals(1, cr.getCollectedfrom().size()); - Assertions.assertEquals("10|openaire____::fdc7e0400d8c1634cdaf8051dbae23db", cr.getCollectedfrom().get(0).getKey()); + Assertions + .assertEquals("10|openaire____::fdc7e0400d8c1634cdaf8051dbae23db", cr.getCollectedfrom().get(0).getKey()); Assertions.assertEquals("Pensoft", cr.getCollectedfrom().get(0).getValue()); Assertions.assertEquals(1, cr.getInstance().size()); - Assertions.assertEquals("10|openaire____::fdc7e0400d8c1634cdaf8051dbae23db", cr.getInstance().get(0).getCollectedfrom().getKey()); + Assertions + .assertEquals( + "10|openaire____::fdc7e0400d8c1634cdaf8051dbae23db", + cr.getInstance().get(0).getCollectedfrom().getKey()); Assertions.assertEquals("Pensoft", cr.getInstance().get(0).getCollectedfrom().getValue()); - Assertions.assertEquals("10|openaire____::e707e544b9a5bd23fc27fbfa65eb60dd", cr.getInstance().get(0).getHostedby().getKey()); - Assertions.assertEquals("One Ecosystem",cr.getInstance().get(0).getHostedby().getValue()); - + Assertions + .assertEquals( + "10|openaire____::e707e544b9a5bd23fc27fbfa65eb60dd", cr.getInstance().get(0).getHostedby().getKey()); + Assertions.assertEquals("One Ecosystem", cr.getInstance().get(0).getHostedby().getValue()); } @@ -587,8 +703,6 @@ public class DumpJobTest { Assertions.assertTrue(verificationDataset.filter("type = 'dataset'").count() == 90); - - } @Test @@ -650,7 +764,6 @@ public class DumpJobTest { Assertions.assertEquals(0, verificationDataset.count()); - } @Test @@ -718,7 +831,6 @@ public class DumpJobTest { Assertions.assertEquals(6, verificationDataset.filter("type = 'software'").count()); - } @Test @@ -814,7 +926,6 @@ public class DumpJobTest { Assertions.assertEquals(23, verificationDataset.count()); - Assertions.assertEquals(23, verificationDataset.filter("type = 'publication'").count()); verificationDataset.createOrReplaceTempView("check"); @@ -832,7 +943,6 @@ public class DumpJobTest { Assertions.assertTrue(temp.filter("id = '50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8'").count() == 1); - } } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/PrepareResultProjectJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/PrepareResultProjectJobTest.java index c950f1c91..a17861ad7 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/PrepareResultProjectJobTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/PrepareResultProjectJobTest.java @@ -8,16 +8,17 @@ import java.nio.file.Files; import java.nio.file.Path; import org.apache.commons.io.FileUtils; -import org.apache.neethi.Assertion; + import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.ForeachFunction; + import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.AfterAll; + import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -229,107 +230,98 @@ public class PrepareResultProjectJobTest { public void testMatchValidated() throws Exception { final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultProject/match_validatedRels") - .getPath(); + .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultProject/match_validatedRels") + .getPath(); SparkPrepareResultProject.main(new String[] { - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-outputPath", workingDir.toString() + "/preparedInfo", - "-sourcePath", sourcePath + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-outputPath", workingDir.toString() + "/preparedInfo", + "-sourcePath", sourcePath }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .textFile(workingDir.toString() + "/preparedInfo") - .map(item -> OBJECT_MAPPER.readValue(item, ResultProject.class)); + .textFile(workingDir.toString() + "/preparedInfo") + .map(item -> OBJECT_MAPPER.readValue(item, ResultProject.class)); org.apache.spark.sql.Dataset verificationDataset = spark - .createDataset(tmp.rdd(), Encoders.bean(ResultProject.class)); + .createDataset(tmp.rdd(), Encoders.bean(ResultProject.class)); - Assertions.assertTrue(verificationDataset.count() == 2); + assertEquals(2, verificationDataset.count() ); - Assertions - .assertEquals( - 1, - verificationDataset.filter("resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'").count()); - Assertions - .assertEquals( - 1, - verificationDataset.filter("resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80'").count()); + assertEquals( + 1, + verificationDataset.filter("resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'").count()); + assertEquals( + 1, + verificationDataset.filter("resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80'").count()); verificationDataset.createOrReplaceTempView("dataset"); - String query = "select resultId, MyT.id project , MyT.title title, MyT.acronym acronym , MyT.provenance.provenance provenance, " + - "MyT.validated.validatedByFunder, MyT.validated.validationDate " - + "from dataset " - + "lateral view explode(projectsList) p as MyT "; + String query = "select resultId, MyT.id project , MyT.title title, MyT.acronym acronym , MyT.provenance.provenance provenance, " + + + "MyT.validated.validatedByFunder, MyT.validated.validationDate " + + "from dataset " + + "lateral view explode(projectsList) p as MyT "; org.apache.spark.sql.Dataset resultExplodedProvenance = spark.sql(query); - Assertions.assertEquals(3, resultExplodedProvenance.count()); - Assertions.assertEquals(3, resultExplodedProvenance.filter("validatedByFunder = true").count()); - Assertions - .assertEquals( - 2, - resultExplodedProvenance - .filter("resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'") - .count()); + assertEquals(3, resultExplodedProvenance.count()); + assertEquals(3, resultExplodedProvenance.filter("validatedByFunder = true").count()); + assertEquals( + 2, + resultExplodedProvenance + .filter("resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'") + .count()); - Assertions - .assertEquals( - 1, - resultExplodedProvenance - .filter("resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80'") - .count()); + assertEquals( + 1, + resultExplodedProvenance + .filter("resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80'") + .count()); - Assertions - .assertEquals( - 2, - resultExplodedProvenance - .filter("project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6'") - .count()); + assertEquals( + 2, + resultExplodedProvenance + .filter("project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6'") + .count()); - Assertions - .assertEquals( - 1, - resultExplodedProvenance - .filter( - "project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6' " + - "and resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb' " + - "and validatedByFunder = true " + - "and validationDate = '2021-08-06'") - .count()); + assertEquals( + 1, + resultExplodedProvenance + .filter( + "project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6' " + + "and resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb' " + + "and validatedByFunder = true " + + "and validationDate = '2021-08-06'") + .count()); - Assertions - .assertEquals( - 1, - resultExplodedProvenance - .filter( - "project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6' " + - "and resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80' " + - "and validatedByFunder = true and validationDate = '2021-08-04'") - .count()); + assertEquals( + 1, + resultExplodedProvenance + .filter( + "project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6' " + + "and resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80' " + + "and validatedByFunder = true and validationDate = '2021-08-04'") + .count()); - Assertions - .assertEquals( - 1, - resultExplodedProvenance - .filter("project = '40|aka_________::03376222b28a3aebf2730ac514818d04'") - .count()); + assertEquals( + 1, + resultExplodedProvenance + .filter("project = '40|aka_________::03376222b28a3aebf2730ac514818d04'") + .count()); - Assertions - .assertEquals( - 1, - resultExplodedProvenance - .filter( - "project = '40|aka_________::03376222b28a3aebf2730ac514818d04' " + - "and resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb' " + - "and validatedByFunder = true and validationDate = '2021-08-05'") - .count()); + assertEquals( + 1, + resultExplodedProvenance + .filter( + "project = '40|aka_________::03376222b28a3aebf2730ac514818d04' " + + "and resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb' " + + "and validatedByFunder = true and validationDate = '2021-08-05'") + .count()); - Assertions - .assertEquals( - 3, resultExplodedProvenance.filter("provenance = 'sysimport:crosswalk:entityregistry'").count()); + assertEquals( + 3, resultExplodedProvenance.filter("provenance = 'sysimport:crosswalk:entityregistry'").count()); } } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/UpdateProjectInfoTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/UpdateProjectInfoTest.java index 61102232e..9710e4553 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/UpdateProjectInfoTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/UpdateProjectInfoTest.java @@ -8,8 +8,6 @@ import java.util.HashMap; import java.util.logging.Filter; import java.util.stream.Collectors; -import eu.dnetlib.dhp.schema.dump.oaf.community.Funder; -import eu.dnetlib.dhp.schema.dump.oaf.community.Project; import org.apache.commons.io.FileUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; @@ -31,6 +29,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.oa.graph.dump.community.SparkUpdateProjectInfo; import eu.dnetlib.dhp.schema.dump.oaf.Result; import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult; +import eu.dnetlib.dhp.schema.dump.oaf.community.Funder; +import eu.dnetlib.dhp.schema.dump.oaf.community.Project; public class UpdateProjectInfoTest { @@ -142,26 +142,26 @@ public class UpdateProjectInfoTest { } @Test - public void testValidatedRelation() throws Exception{ + public void testValidatedRelation() throws Exception { final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/addProjectInfo") - .getPath(); + .getResource("/eu/dnetlib/dhp/oa/graph/dump/addProjectInfo") + .getPath(); SparkUpdateProjectInfo.main(new String[] { - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-preparedInfoPath", sourcePath + "/preparedInfoValidated", - "-outputPath", workingDir.toString() + "/result", - "-sourcePath", sourcePath + "/publication_extendedmodel" + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-preparedInfoPath", sourcePath + "/preparedInfoValidated", + "-outputPath", workingDir.toString() + "/result", + "-sourcePath", sourcePath + "/publication_extendedmodel" }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .textFile(workingDir.toString() + "/result") - .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); + .textFile(workingDir.toString() + "/result") + .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); org.apache.spark.sql.Dataset verificationDataset = spark - .createDataset(tmp.rdd(), Encoders.bean(CommunityResult.class)); + .createDataset(tmp.rdd(), Encoders.bean(CommunityResult.class)); verificationDataset.show(false); @@ -169,10 +169,10 @@ public class UpdateProjectInfoTest { verificationDataset.createOrReplaceTempView("dataset"); String query = "select id, MyT.code code, MyT.title title, MyT.funder.name funderName, MyT.funder.shortName funderShortName, " - + - "MyT.funder.jurisdiction funderJurisdiction, MyT.funder.fundingStream fundingStream, MyT.validated " - + "from dataset " + - "lateral view explode(projects) p as MyT "; + + + "MyT.funder.jurisdiction funderJurisdiction, MyT.funder.fundingStream fundingStream, MyT.validated " + + "from dataset " + + "lateral view explode(projects) p as MyT "; org.apache.spark.sql.Dataset resultExplodedProvenance = spark.sql(query); @@ -180,27 +180,34 @@ public class UpdateProjectInfoTest { resultExplodedProvenance.show(false); Assertions - .assertEquals( - 2, - resultExplodedProvenance.filter("id = '50|pensoft_____::00ea4a1cd53806a97d62ea6bf268f2a2'").count()); + .assertEquals( + 2, + resultExplodedProvenance.filter("id = '50|pensoft_____::00ea4a1cd53806a97d62ea6bf268f2a2'").count()); Assertions - .assertEquals( - 1, - resultExplodedProvenance - .filter("id = '50|pensoft_____::00ea4a1cd53806a97d62ea6bf268f2a2' and code = '123455'") - .count()); + .assertEquals( + 1, + resultExplodedProvenance + .filter("id = '50|pensoft_____::00ea4a1cd53806a97d62ea6bf268f2a2' and code = '123455'") + .count()); Assertions - .assertEquals( - 1, - resultExplodedProvenance - .filter("id = '50|pensoft_____::00ea4a1cd53806a97d62ea6bf268f2a2' and code = '119027'") - .count()); + .assertEquals( + 1, + resultExplodedProvenance + .filter("id = '50|pensoft_____::00ea4a1cd53806a97d62ea6bf268f2a2' and code = '119027'") + .count()); Project project = verificationDataset - .map((MapFunction) cr -> cr.getProjects().stream().filter(p -> p.getValidated() != null).collect(Collectors.toList()).get(0) - , Encoders.bean(Project.class)).first(); + .map( + (MapFunction) cr -> cr + .getProjects() + .stream() + .filter(p -> p.getValidated() != null) + .collect(Collectors.toList()) + .get(0), + Encoders.bean(Project.class)) + .first(); Assertions.assertTrue(project.getFunder().getName().equals("Academy of Finland")); Assertions.assertTrue(project.getFunder().getShortName().equals("AKA")); @@ -208,18 +215,22 @@ public class UpdateProjectInfoTest { Assertions.assertTrue(project.getFunder().getFundingStream() == null); Assertions.assertTrue(project.getValidated().getValidationDate().equals("2021-08-06")); - project = verificationDataset - .map((MapFunction) cr -> cr.getProjects().stream().filter(p -> p.getValidated() == null).collect(Collectors.toList()).get(0) - , Encoders.bean(Project.class)).first(); - + .map( + (MapFunction) cr -> cr + .getProjects() + .stream() + .filter(p -> p.getValidated() == null) + .collect(Collectors.toList()) + .get(0), + Encoders.bean(Project.class)) + .first(); Assertions.assertTrue(project.getFunder().getName().equals("European Commission")); Assertions.assertTrue(project.getFunder().getShortName().equals("EC")); Assertions.assertTrue(project.getFunder().getJurisdiction().equals("EU")); Assertions.assertTrue(project.getFunder().getFundingStream().equals("H2020")); - } } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/complete/DumpRelationTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/complete/DumpRelationTest.java index a310448a4..fe178795d 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/complete/DumpRelationTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/complete/DumpRelationTest.java @@ -94,7 +94,8 @@ public class DumpRelationTest { verificationDataset.createOrReplaceTempView("table"); - verificationDataset.foreach((ForeachFunction)r -> System.out.println(new ObjectMapper().writeValueAsString(r))); + verificationDataset + .foreach((ForeachFunction) r -> System.out.println(new ObjectMapper().writeValueAsString(r))); Dataset check = spark .sql( @@ -134,13 +135,13 @@ public class DumpRelationTest { public void test2() throws Exception { final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/relation/relation_validated") - .getPath(); + .getResource("/eu/dnetlib/dhp/oa/graph/dump/relation/relation_validated") + .getPath(); SparkDumpRelationJob.main(new String[] { - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-outputPath", workingDir.toString() + "/relation", - "-sourcePath", sourcePath + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-outputPath", workingDir.toString() + "/relation", + "-sourcePath", sourcePath }); // dumpCommunityProducts.exec(MOCK_IS_LOOK_UP_URL,Boolean.FALSE, workingDir.toString()+"/dataset",sourcePath,"eu.dnetlib.dhp.schema.oaf.Dataset","eu.dnetlib.dhp.schema.dump.oaf.Dataset"); @@ -148,57 +149,58 @@ public class DumpRelationTest { final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .textFile(workingDir.toString() + "/relation") - .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)); + .textFile(workingDir.toString() + "/relation") + .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)); org.apache.spark.sql.Dataset verificationDataset = spark - .createDataset(tmp.rdd(), Encoders.bean(Relation.class)); + .createDataset(tmp.rdd(), Encoders.bean(Relation.class)); verificationDataset.createOrReplaceTempView("table"); - verificationDataset.foreach((ForeachFunction)r -> System.out.println(new ObjectMapper().writeValueAsString(r))); + verificationDataset + .foreach((ForeachFunction) r -> System.out.println(new ObjectMapper().writeValueAsString(r))); Dataset check = spark - .sql( - "SELECT reltype.name, source.id source, source.type stype, target.id target,target.type ttype, provenance.provenance " - + - "from table "); + .sql( + "SELECT reltype.name, source.id source, source.type stype, target.id target,target.type ttype, provenance.provenance " + + + "from table "); Assertions.assertEquals(20, check.filter("name = 'isProvidedBy'").count()); Assertions - .assertEquals( - 20, check - .filter( - "name = 'isProvidedBy' and stype = 'datasource' and ttype = 'organization' and " + - "provenance = 'Harvested'") - .count()); + .assertEquals( + 20, check + .filter( + "name = 'isProvidedBy' and stype = 'datasource' and ttype = 'organization' and " + + "provenance = 'Harvested'") + .count()); Assertions.assertEquals(7, check.filter("name = 'isParticipant'").count()); Assertions - .assertEquals( - 7, check - .filter( - "name = 'isParticipant' and stype = 'organization' and ttype = 'project' " + - "and provenance = 'Harvested'") - .count()); + .assertEquals( + 7, check + .filter( + "name = 'isParticipant' and stype = 'organization' and ttype = 'project' " + + "and provenance = 'Harvested'") + .count()); Assertions.assertEquals(1, check.filter("name = 'isAuthorInstitutionOf'").count()); Assertions - .assertEquals( - 1, check - .filter( - "name = 'isAuthorInstitutionOf' and stype = 'organization' and ttype = 'result' " + - "and provenance = 'Inferred by OpenAIRE'") - .count()); + .assertEquals( + 1, check + .filter( + "name = 'isAuthorInstitutionOf' and stype = 'organization' and ttype = 'result' " + + "and provenance = 'Inferred by OpenAIRE'") + .count()); Assertions.assertEquals(2, check.filter("name = 'isProducedBy'").count()); Assertions - .assertEquals( - 2, check - .filter( - "name = 'isProducedBy' and stype = 'project' and ttype = 'result' " + - "and provenance = 'Harvested' and validated = true " + - "and validationDate = '2021-08-06'") - .count()); + .assertEquals( + 2, check + .filter( + "name = 'isProducedBy' and stype = 'project' and ttype = 'result' " + + "and provenance = 'Harvested' and validated = true " + + "and validationDate = '2021-08-06'") + .count()); } } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/complete/QueryInformationSystemTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/complete/QueryInformationSystemTest.java index 7bb6b9ea9..08fcd49a8 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/complete/QueryInformationSystemTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/complete/QueryInformationSystemTest.java @@ -6,7 +6,6 @@ import static org.mockito.Mockito.lenient; import java.util.*; import java.util.function.Consumer; -import eu.dnetlib.dhp.schema.common.ModelSupport; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -14,6 +13,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; @@ -529,7 +529,8 @@ class QueryInformationSystemTest { List cInfoList = new ArrayList<>(); final Consumer consumer = ci -> cInfoList.add(ci); queryInformationSystem.execContextRelationQuery(); - queryInformationSystem.getContextRelation(consumer, "contentproviders", ModelSupport.entityIdPrefix.get("datasource")); + queryInformationSystem + .getContextRelation(consumer, "contentproviders", ModelSupport.entityIdPrefix.get("datasource")); Assertions.assertEquals(5, cInfoList.size()); } @@ -540,7 +541,8 @@ class QueryInformationSystemTest { List cInfoList = new ArrayList<>(); final Consumer consumer = ci -> cInfoList.add(ci); queryInformationSystem.execContextRelationQuery(); - queryInformationSystem.getContextRelation(consumer, "contentproviders", ModelSupport.entityIdPrefix.get("datasource")); + queryInformationSystem + .getContextRelation(consumer, "contentproviders", ModelSupport.entityIdPrefix.get("datasource")); cInfoList.forEach(contextInfo -> { switch (contextInfo.getId()) { diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/funderresult/SplitPerFunderTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/funderresult/SplitPerFunderTest.java index ad4ea36c4..477481c08 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/funderresult/SplitPerFunderTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/funderresult/SplitPerFunderTest.java @@ -145,8 +145,8 @@ public class SplitPerFunderTest { // CONICYT 0 tmp = sc - .textFile(workingDir.toString() + "/split/CONICYTF") - .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); + .textFile(workingDir.toString() + "/split/CONICYTF") + .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); Assertions.assertEquals(0, tmp.count()); } diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/IndexRecordTransformerTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/IndexRecordTransformerTest.java index 8daf318be..6229ad19b 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/IndexRecordTransformerTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/IndexRecordTransformerTest.java @@ -84,7 +84,8 @@ public class IndexRecordTransformerTest { @Test public void testForEOSCFutureTraining() throws IOException, TransformerException { - final String record = IOUtils.toString(getClass().getResourceAsStream("eosc-future/training-notebooks-seadatanet.xml")); + final String record = IOUtils + .toString(getClass().getResourceAsStream("eosc-future/training-notebooks-seadatanet.xml")); testRecordTransformation(record); } diff --git a/pom.xml b/pom.xml index 4d3bcee60..4be425779 100644 --- a/pom.xml +++ b/pom.xml @@ -753,7 +753,7 @@ 3.3.3 3.4.2 [2.12,3.0) - [2.7.18] + [2.7.19] [4.0.3] [6.0.5] [3.1.6]