From f4bd2b5619a513baa476787fa10f559ccbc341f3 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Tue, 4 May 2021 10:26:14 +0200 Subject: [PATCH] recert file SparkDedupTest.java --- .../dnetlib/dhp/oa/dedup/SparkDedupTest.java | 250 +++++++++--------- 1 file changed, 131 insertions(+), 119 deletions(-) diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java index a95e28c43..bf4913056 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java @@ -12,6 +12,7 @@ import java.io.IOException; import java.io.Serializable; import java.net.URISyntaxException; import java.nio.file.Paths; +import java.util.List; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; @@ -26,13 +27,7 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.MethodOrderer; -import org.junit.jupiter.api.Order; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestMethodOrder; +import org.junit.jupiter.api.*; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.Mockito; @@ -102,7 +97,8 @@ public class SparkDedupTest implements Serializable { IOUtils .toString( SparkDedupTest.class - .getResourceAsStream("/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator.xml"))); + .getResourceAsStream( + "/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator.xml"))); lenient() .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("organization"))) @@ -110,7 +106,8 @@ public class SparkDedupTest implements Serializable { IOUtils .toString( SparkDedupTest.class - .getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json"))); + .getResourceAsStream( + "/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json"))); lenient() .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("publication"))) @@ -118,7 +115,8 @@ public class SparkDedupTest implements Serializable { IOUtils .toString( SparkDedupTest.class - .getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json"))); + .getResourceAsStream( + "/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json"))); lenient() .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("software"))) @@ -126,7 +124,8 @@ public class SparkDedupTest implements Serializable { IOUtils .toString( SparkDedupTest.class - .getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json"))); + .getResourceAsStream( + "/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json"))); lenient() .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("dataset"))) @@ -134,7 +133,8 @@ public class SparkDedupTest implements Serializable { IOUtils .toString( SparkDedupTest.class - .getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json"))); + .getResourceAsStream( + "/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json"))); lenient() .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("otherresearchproduct"))) @@ -142,51 +142,54 @@ public class SparkDedupTest implements Serializable { IOUtils .toString( SparkDedupTest.class - .getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json"))); + .getResourceAsStream( + "/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json"))); } @Test @Order(1) public void createSimRelsTest() throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser( + ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils .toString( SparkCreateSimRels.class - .getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json"))); + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json"))); parser - .parseArgument(new String[] { - "-i", testGraphBasePath, - "-asi", testActionSetId, - "-la", "lookupurl", - "-w", testOutputBasePath, - "-np", "50" - }); + .parseArgument( + new String[] { + "-i", testGraphBasePath, + "-asi", testActionSetId, + "-la", "lookupurl", + "-w", testOutputBasePath, + "-np", "50" + }); new SparkCreateSimRels(parser, spark).run(isLookUpService); - final long orgs_simrel = spark + long orgs_simrel = spark .read() .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "organization")) .count(); - final long pubs_simrel = spark + long pubs_simrel = spark .read() .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "publication")) .count(); - final long sw_simrel = spark + long sw_simrel = spark .read() .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "software")) .count(); - final long ds_simrel = spark + long ds_simrel = spark .read() .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "dataset")) .count(); - final long orp_simrel = spark + long orp_simrel = spark .read() .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "otherresearchproduct")) .count(); @@ -202,29 +205,31 @@ public class SparkDedupTest implements Serializable { @Order(2) public void cutMergeRelsTest() throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser( + ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils .toString( SparkCreateMergeRels.class - .getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json"))); + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json"))); parser - .parseArgument(new String[] { - "-i", - testGraphBasePath, - "-asi", - testActionSetId, - "-la", - "lookupurl", - "-w", - testOutputBasePath, - "-cc", - "3" - }); + .parseArgument( + new String[] { + "-i", + testGraphBasePath, + "-asi", + testActionSetId, + "-la", + "lookupurl", + "-w", + testOutputBasePath, + "-cc", + "3" + }); new SparkCreateMergeRels(parser, spark).run(isLookUpService); - final long orgs_mergerel = spark + long orgs_mergerel = spark .read() .load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel") .as(Encoders.bean(Relation.class)) @@ -235,7 +240,7 @@ public class SparkDedupTest implements Serializable { .where("cnt > 3") .count(); - final long pubs_mergerel = spark + long pubs_mergerel = spark .read() .load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel") .as(Encoders.bean(Relation.class)) @@ -245,7 +250,7 @@ public class SparkDedupTest implements Serializable { .select("source", "cnt") .where("cnt > 3") .count(); - final long sw_mergerel = spark + long sw_mergerel = spark .read() .load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel") .as(Encoders.bean(Relation.class)) @@ -256,7 +261,7 @@ public class SparkDedupTest implements Serializable { .where("cnt > 3") .count(); - final long ds_mergerel = spark + long ds_mergerel = spark .read() .load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel") .as(Encoders.bean(Relation.class)) @@ -267,7 +272,7 @@ public class SparkDedupTest implements Serializable { .where("cnt > 3") .count(); - final long orp_mergerel = spark + long orp_mergerel = spark .read() .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel") .as(Encoders.bean(Relation.class)) @@ -296,44 +301,46 @@ public class SparkDedupTest implements Serializable { @Order(3) public void createMergeRelsTest() throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser( + ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils .toString( SparkCreateMergeRels.class - .getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json"))); + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json"))); parser - .parseArgument(new String[] { - "-i", - testGraphBasePath, - "-asi", - testActionSetId, - "-la", - "lookupurl", - "-w", - testOutputBasePath - }); + .parseArgument( + new String[] { + "-i", + testGraphBasePath, + "-asi", + testActionSetId, + "-la", + "lookupurl", + "-w", + testOutputBasePath + }); new SparkCreateMergeRels(parser, spark).run(isLookUpService); - final long orgs_mergerel = spark + long orgs_mergerel = spark .read() .load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel") .count(); - final long pubs_mergerel = spark + long pubs_mergerel = spark .read() .load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel") .count(); - final long sw_mergerel = spark + long sw_mergerel = spark .read() .load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel") .count(); - final long ds_mergerel = spark + long ds_mergerel = spark .read() .load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel") .count(); - final long orp_mergerel = spark + long orp_mergerel = spark .read() .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel") .count(); @@ -350,39 +357,40 @@ public class SparkDedupTest implements Serializable { @Order(4) public void createDedupRecordTest() throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser( + ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils .toString( SparkCreateDedupRecord.class - .getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createDedupRecord_parameters.json"))); + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/createDedupRecord_parameters.json"))); parser - .parseArgument(new String[] { - "-i", - testGraphBasePath, - "-asi", - testActionSetId, - "-la", - "lookupurl", - "-w", - testOutputBasePath - }); + .parseArgument( + new String[] { + "-i", + testGraphBasePath, + "-asi", + testActionSetId, + "-la", + "lookupurl", + "-w", + testOutputBasePath + }); new SparkCreateDedupRecord(parser, spark).run(isLookUpService); - final long orgs_deduprecord = jsc + long orgs_deduprecord = jsc .textFile(testOutputBasePath + "/" + testActionSetId + "/organization_deduprecord") .count(); - final long pubs_deduprecord = jsc + long pubs_deduprecord = jsc .textFile(testOutputBasePath + "/" + testActionSetId + "/publication_deduprecord") .count(); - final long sw_deduprecord = jsc + long sw_deduprecord = jsc .textFile(testOutputBasePath + "/" + testActionSetId + "/software_deduprecord") .count(); - final long ds_deduprecord = jsc - .textFile(testOutputBasePath + "/" + testActionSetId + "/dataset_deduprecord") - .count(); - final long orp_deduprecord = jsc - .textFile(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_deduprecord") + long ds_deduprecord = jsc.textFile(testOutputBasePath + "/" + testActionSetId + "/dataset_deduprecord").count(); + long orp_deduprecord = jsc + .textFile( + testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_deduprecord") .count(); assertEquals(85, orgs_deduprecord); @@ -396,27 +404,29 @@ public class SparkDedupTest implements Serializable { @Order(5) public void updateEntityTest() throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser( + ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils .toString( SparkUpdateEntity.class - .getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json"))); + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json"))); parser - .parseArgument(new String[] { - "-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath - }); + .parseArgument( + new String[] { + "-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath + }); new SparkUpdateEntity(parser, spark).run(isLookUpService); - final long organizations = jsc.textFile(testDedupGraphBasePath + "/organization").count(); - final long publications = jsc.textFile(testDedupGraphBasePath + "/publication").count(); - final long projects = jsc.textFile(testDedupGraphBasePath + "/project").count(); - final long datasource = jsc.textFile(testDedupGraphBasePath + "/datasource").count(); - final long softwares = jsc.textFile(testDedupGraphBasePath + "/software").count(); - final long dataset = jsc.textFile(testDedupGraphBasePath + "/dataset").count(); - final long otherresearchproduct = jsc.textFile(testDedupGraphBasePath + "/otherresearchproduct").count(); + long organizations = jsc.textFile(testDedupGraphBasePath + "/organization").count(); + long publications = jsc.textFile(testDedupGraphBasePath + "/publication").count(); + long projects = jsc.textFile(testDedupGraphBasePath + "/project").count(); + long datasource = jsc.textFile(testDedupGraphBasePath + "/datasource").count(); + long softwares = jsc.textFile(testDedupGraphBasePath + "/software").count(); + long dataset = jsc.textFile(testDedupGraphBasePath + "/dataset").count(); + long otherresearchproduct = jsc.textFile(testDedupGraphBasePath + "/otherresearchproduct").count(); - final long mergedOrgs = spark + long mergedOrgs = spark .read() .load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel") .as(Encoders.bean(Relation.class)) @@ -426,7 +436,7 @@ public class SparkDedupTest implements Serializable { .distinct() .count(); - final long mergedPubs = spark + long mergedPubs = spark .read() .load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel") .as(Encoders.bean(Relation.class)) @@ -436,7 +446,7 @@ public class SparkDedupTest implements Serializable { .distinct() .count(); - final long mergedSw = spark + long mergedSw = spark .read() .load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel") .as(Encoders.bean(Relation.class)) @@ -446,7 +456,7 @@ public class SparkDedupTest implements Serializable { .distinct() .count(); - final long mergedDs = spark + long mergedDs = spark .read() .load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel") .as(Encoders.bean(Relation.class)) @@ -456,7 +466,7 @@ public class SparkDedupTest implements Serializable { .distinct() .count(); - final long mergedOrp = spark + long mergedOrp = spark .read() .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel") .as(Encoders.bean(Relation.class)) @@ -474,27 +484,27 @@ public class SparkDedupTest implements Serializable { assertEquals(389, dataset); assertEquals(517, otherresearchproduct); - final long deletedOrgs = jsc + long deletedOrgs = jsc .textFile(testDedupGraphBasePath + "/organization") .filter(this::isDeletedByInference) .count(); - final long deletedPubs = jsc + long deletedPubs = jsc .textFile(testDedupGraphBasePath + "/publication") .filter(this::isDeletedByInference) .count(); - final long deletedSw = jsc + long deletedSw = jsc .textFile(testDedupGraphBasePath + "/software") .filter(this::isDeletedByInference) .count(); - final long deletedDs = jsc + long deletedDs = jsc .textFile(testDedupGraphBasePath + "/dataset") .filter(this::isDeletedByInference) .count(); - final long deletedOrp = jsc + long deletedOrp = jsc .textFile(testDedupGraphBasePath + "/otherresearchproduct") .filter(this::isDeletedByInference) .count(); @@ -510,19 +520,21 @@ public class SparkDedupTest implements Serializable { @Order(6) public void propagateRelationTest() throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser( + ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils .toString( SparkPropagateRelation.class - .getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json"))); + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json"))); parser - .parseArgument(new String[] { - "-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath - }); + .parseArgument( + new String[] { + "-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath + }); new SparkPropagateRelation(parser, spark).run(isLookUpService); - final long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count(); + long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count(); assertEquals(4862, relations); @@ -536,9 +548,10 @@ public class SparkDedupTest implements Serializable { .select(mergeRels.col("target")) .distinct() .toJavaRDD() - .mapToPair((PairFunction) r -> new Tuple2<>(r.getString(0), "d")); + .mapToPair( + (PairFunction) r -> new Tuple2(r.getString(0), "d")); - final JavaRDD toCheck = jsc + JavaRDD toCheck = jsc .textFile(testDedupGraphBasePath + "/relation") .mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.source", json), json)) .join(mergedIds) @@ -547,8 +560,8 @@ public class SparkDedupTest implements Serializable { .join(mergedIds) .map(t -> t._2()._1()); - final long deletedbyinference = toCheck.filter(this::isDeletedByInference).count(); - final long updated = toCheck.count(); + long deletedbyinference = toCheck.filter(this::isDeletedByInference).count(); + long updated = toCheck.count(); assertEquals(updated, deletedbyinference); } @@ -560,8 +573,8 @@ public class SparkDedupTest implements Serializable { testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_2.json", 10, 2); } - private void testUniqueness(final String path, final int expected_total, final int expected_unique) { - final Dataset rel = spark + private void testUniqueness(String path, int expected_total, int expected_unique) { + Dataset rel = spark .read() .textFile(getClass().getResource(path).getPath()) .map( @@ -578,8 +591,7 @@ public class SparkDedupTest implements Serializable { FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); } - public boolean isDeletedByInference(final String s) { + public boolean isDeletedByInference(String s) { return s.contains("\"deletedbyinference\":true"); } - }