diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkWhitelistSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkWhitelistSimRels.java index fa7d335700..7d91e47cc9 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkWhitelistSimRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkWhitelistSimRels.java @@ -1,3 +1,4 @@ + package eu.dnetlib.dhp.oa.dedup; import java.io.IOException; @@ -5,10 +6,7 @@ import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.Dataset; @@ -21,7 +19,6 @@ import org.slf4j.LoggerFactory; import org.xml.sax.SAXException; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.oa.dedup.model.Block; import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.utils.ISLookupClientFactory; @@ -31,121 +28,127 @@ import eu.dnetlib.pace.config.DedupConfig; import eu.dnetlib.pace.model.MapDocument; import eu.dnetlib.pace.util.MapDocumentUtil; import scala.Tuple2; -import scala.Tuple3; public class SparkWhitelistSimRels extends AbstractSparkAction { - private static final Logger log = LoggerFactory.getLogger(SparkCreateSimRels.class); + private static final Logger log = LoggerFactory.getLogger(SparkCreateSimRels.class); - private static final String WHITELIST_SEPARATOR = "####"; + private static final String WHITELIST_SEPARATOR = "####"; - public SparkWhitelistSimRels(ArgumentApplicationParser parser, SparkSession spark) { - super(parser, spark); - } + public SparkWhitelistSimRels(ArgumentApplicationParser parser, SparkSession spark) { + super(parser, spark); + } - public static void main(String[] args) throws Exception { - ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkCreateSimRels.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/whitelistSimRels_parameters.json"))); - parser.parseArgument(args); + public static void main(String[] args) throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCreateSimRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/whitelistSimRels_parameters.json"))); + parser.parseArgument(args); - SparkConf conf = new SparkConf(); - new SparkWhitelistSimRels(parser, getSparkSession(conf)) - .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); - } + SparkConf conf = new SparkConf(); + new SparkWhitelistSimRels(parser, getSparkSession(conf)) + .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); + } - @Override - public void run(ISLookUpService isLookUpService) - throws DocumentException, IOException, ISLookUpException, SAXException { + @Override + public void run(ISLookUpService isLookUpService) + throws DocumentException, IOException, ISLookUpException, SAXException { - // read oozie parameters - final String graphBasePath = parser.get("graphBasePath"); - final String isLookUpUrl = parser.get("isLookUpUrl"); - final String actionSetId = parser.get("actionSetId"); - final String workingPath = parser.get("workingPath"); - final int numPartitions = Optional - .ofNullable(parser.get("numPartitions")) - .map(Integer::valueOf) - .orElse(NUM_PARTITIONS); - final String whiteListPath = parser.get("whiteListPath"); + // read oozie parameters + final String graphBasePath = parser.get("graphBasePath"); + final String isLookUpUrl = parser.get("isLookUpUrl"); + final String actionSetId = parser.get("actionSetId"); + final String workingPath = parser.get("workingPath"); + final int numPartitions = Optional + .ofNullable(parser.get("numPartitions")) + .map(Integer::valueOf) + .orElse(NUM_PARTITIONS); + final String whiteListPath = parser.get("whiteListPath"); - log.info("numPartitions: '{}'", numPartitions); - log.info("graphBasePath: '{}'", graphBasePath); - log.info("isLookUpUrl: '{}'", isLookUpUrl); - log.info("actionSetId: '{}'", actionSetId); - log.info("workingPath: '{}'", workingPath); - log.info("whiteListPath: '{}'", whiteListPath); + log.info("numPartitions: '{}'", numPartitions); + log.info("graphBasePath: '{}'", graphBasePath); + log.info("isLookUpUrl: '{}'", isLookUpUrl); + log.info("actionSetId: '{}'", actionSetId); + log.info("workingPath: '{}'", workingPath); + log.info("whiteListPath: '{}'", whiteListPath); - JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - //file format: source####target - Dataset> whiteListRels = spark.createDataset(sc - .textFile(whiteListPath) - //check if the line is in the correct format: id1####id2 - .filter(s -> s.contains(WHITELIST_SEPARATOR) && s.split(WHITELIST_SEPARATOR).length == 2) - .map(s -> new Tuple2<>(s.split(WHITELIST_SEPARATOR)[0], s.split(WHITELIST_SEPARATOR)[1])) - .rdd(), - Encoders.tuple(Encoders.STRING(), Encoders.STRING())); + // file format: source####target + Dataset> whiteListRels = spark + .createDataset( + sc + .textFile(whiteListPath) + // check if the line is in the correct format: id1####id2 + .filter(s -> s.contains(WHITELIST_SEPARATOR) && s.split(WHITELIST_SEPARATOR).length == 2) + .map(s -> new Tuple2<>(s.split(WHITELIST_SEPARATOR)[0], s.split(WHITELIST_SEPARATOR)[1])) + .rdd(), + Encoders.tuple(Encoders.STRING(), Encoders.STRING())); - // for each dedup configuration - for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) { + // for each dedup configuration + for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) { - final String entity = dedupConf.getWf().getEntityType(); - final String subEntity = dedupConf.getWf().getSubEntityValue(); - log.info("Adding whitelist simrels for: '{}'", subEntity); + final String entity = dedupConf.getWf().getEntityType(); + final String subEntity = dedupConf.getWf().getSubEntityValue(); + log.info("Adding whitelist simrels for: '{}'", subEntity); - final String outputPath = DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity); + final String outputPath = DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity); - Dataset> entities = spark.createDataset(sc - .textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) - .repartition(numPartitions) - .mapToPair( - (PairFunction) s -> { - MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s); - return new Tuple2<>(d.getIdentifier(), "present"); - }) - .rdd(), - Encoders.tuple(Encoders.STRING(), Encoders.STRING())); + Dataset> entities = spark + .createDataset( + sc + .textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) + .repartition(numPartitions) + .mapToPair( + (PairFunction) s -> { + MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s); + return new Tuple2<>(d.getIdentifier(), "present"); + }) + .rdd(), + Encoders.tuple(Encoders.STRING(), Encoders.STRING())); - Dataset> whiteListRels1 = whiteListRels - .joinWith(entities, whiteListRels.col("_1").equalTo(entities.col("_1")), "inner") - .map((MapFunction, Tuple2>, Tuple2>) Tuple2::_1, Encoders.tuple(Encoders.STRING(), Encoders.STRING())); + Dataset> whiteListRels1 = whiteListRels + .joinWith(entities, whiteListRels.col("_1").equalTo(entities.col("_1")), "inner") + .map( + (MapFunction, Tuple2>, Tuple2>) Tuple2::_1, + Encoders.tuple(Encoders.STRING(), Encoders.STRING())); - Dataset> whiteListRels2 = whiteListRels1 - .joinWith(entities, whiteListRels1.col("_2").equalTo(entities.col("_1")), "inner") - .map((MapFunction, Tuple2>, Tuple2>) Tuple2::_1, Encoders.tuple(Encoders.STRING(), Encoders.STRING())); + Dataset> whiteListRels2 = whiteListRels1 + .joinWith(entities, whiteListRels1.col("_2").equalTo(entities.col("_1")), "inner") + .map( + (MapFunction, Tuple2>, Tuple2>) Tuple2::_1, + Encoders.tuple(Encoders.STRING(), Encoders.STRING())); - Dataset whiteListSimRels = whiteListRels2 - .map((MapFunction, Relation>) - r -> createSimRel(r._1(), r._2(), entity), - Encoders.bean(Relation.class) - ); + Dataset whiteListSimRels = whiteListRels2 + .map( + (MapFunction, Relation>) r -> createSimRel(r._1(), r._2(), entity), + Encoders.bean(Relation.class)); - saveParquet(whiteListSimRels, outputPath, SaveMode.Append); - } - } + saveParquet(whiteListSimRels, outputPath, SaveMode.Append); + } + } - private Relation createSimRel(String source, String target, String entity) { - final Relation r = new Relation(); - r.setSource(source); - r.setTarget(target); - r.setSubRelType("dedupSimilarity"); - r.setRelClass("isSimilarTo"); - r.setDataInfo(new DataInfo()); + private Relation createSimRel(String source, String target, String entity) { + final Relation r = new Relation(); + r.setSource(source); + r.setTarget(target); + r.setSubRelType("dedupSimilarity"); + r.setRelClass("isSimilarTo"); + r.setDataInfo(new DataInfo()); - switch (entity) { - case "result": - r.setRelType("resultResult"); - break; - case "organization": - r.setRelType("organizationOrganization"); - break; - default: - throw new IllegalArgumentException("unmanaged entity type: " + entity); - } - return r; - } + switch (entity) { + case "result": + r.setRelType("resultResult"); + break; + case "organization": + r.setRelType("organizationOrganization"); + break; + default: + throw new IllegalArgumentException("unmanaged entity type: " + entity); + } + return r; + } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/UpdateOpenorgsJob.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/UpdateOpenorgsJob.java index f23c54e2e0..d094fb72b7 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/UpdateOpenorgsJob.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/UpdateOpenorgsJob.java @@ -1,7 +1,8 @@ + package eu.dnetlib.dhp.oa.dedup; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import java.util.concurrent.TimeUnit; + import org.apache.commons.io.IOUtils; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; @@ -10,16 +11,21 @@ import org.apache.http.impl.client.HttpClients; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.TimeUnit; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; public class UpdateOpenorgsJob { - private static final Logger log = LoggerFactory.getLogger(UpdateOpenorgsJob.class); + private static final Logger log = LoggerFactory.getLogger(UpdateOpenorgsJob.class); public static void main(String[] args) throws Exception { - ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils.toString(SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/updateOpenorgsJob_parameters.json"))); - parser.parseArgument(args); + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCreateSimRels.class + .getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/updateOpenorgsJob_parameters.json"))); + parser.parseArgument(args); final String apiUrl = parser.get("apiUrl"); final int delay = Integer.parseInt(parser.get("delay")); @@ -28,17 +34,17 @@ public class UpdateOpenorgsJob { log.info("delay: '{}'", delay); APIResponse res = httpCall(apiUrl); - while(res!=null && res.getStatus().equals(ImportStatus.RUNNING)){ + while (res != null && res.getStatus().equals(ImportStatus.RUNNING)) { TimeUnit.MINUTES.sleep(delay); res = httpCall(apiUrl + "/status"); } - if (res==null) { + if (res == null) { log.error("Openorgs Update FAILED: No response"); throw new RuntimeException("Openorgs Update FAILED: No response"); } - if (res.getStatus()==null || !res.getStatus().equals(ImportStatus.SUCCESS)) { + if (res.getStatus() == null || !res.getStatus().equals(ImportStatus.SUCCESS)) { log.error("Openorgs Update FAILED: '{}' - '{}'", res.getStatus(), res.getMessage()); throw new RuntimeException(res.getMessage()); } @@ -107,9 +113,5 @@ class APIResponse { } enum ImportStatus { - SUCCESS, - FAILED, - RUNNING, - NOT_LAUNCHED, - NOT_YET_STARTED + SUCCESS, FAILED, RUNNING, NOT_LAUNCHED, NOT_YET_STARTED } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/openorgs/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/openorgs/oozie_app/workflow.xml index c7c6c92571..6947019e8b 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/openorgs/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/openorgs/oozie_app/workflow.xml @@ -267,6 +267,12 @@ + + + oozie.launcher.mapreduce.user.classpath.first + true + + eu.dnetlib.dhp.oa.dedup.UpdateOpenorgsJob --apiUrl${apiUrl} --delay5 diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java index fa03f93a6b..549988767f 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java @@ -48,634 +48,640 @@ import scala.Tuple2; @TestMethodOrder(MethodOrderer.OrderAnnotation.class) public class SparkDedupTest implements Serializable { - @Mock(serializable = true) - ISLookUpService isLookUpService; - - private static SparkSession spark; - private static JavaSparkContext jsc; - - private static String testGraphBasePath; - private static String testOutputBasePath; - private static String testDedupGraphBasePath; - private static final String testActionSetId = "test-orchestrator"; - private static String whitelistPath; - private static List whiteList; - - private static String WHITELIST_SEPARATOR = "####"; - - @BeforeAll - public static void cleanUp() throws IOException, URISyntaxException { - - testGraphBasePath = Paths - .get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/entities").toURI()) - .toFile() - .getAbsolutePath(); - testOutputBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-") - .toAbsolutePath() - .toString(); - - testDedupGraphBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-") - .toAbsolutePath() - .toString(); - - whitelistPath = Paths - .get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/whitelist.simrels.txt").toURI()) - .toFile() - .getAbsolutePath(); - whiteList = IOUtils.readLines(new FileReader(whitelistPath)); - - FileUtils.deleteDirectory(new File(testOutputBasePath)); - FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); - - final SparkConf conf = new SparkConf(); - conf.set("spark.sql.shuffle.partitions", "200"); - spark = SparkSession - .builder() - .appName(SparkDedupTest.class.getSimpleName()) - .master("local[*]") - .config(conf) - .getOrCreate(); - - jsc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - - } - - @BeforeEach - public void setUp() throws IOException, ISLookUpException { - - lenient() - .when(isLookUpService.getResourceProfileByQuery(Mockito.contains(testActionSetId))) - .thenReturn( - IOUtils - .toString( - SparkDedupTest.class - .getResourceAsStream( - "/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator.xml"))); - - lenient() - .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("organization"))) - .thenReturn( - IOUtils - .toString( - SparkDedupTest.class - .getResourceAsStream( - "/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json"))); - - lenient() - .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("publication"))) - .thenReturn( - IOUtils - .toString( - SparkDedupTest.class - .getResourceAsStream( - "/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json"))); - - lenient() - .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("software"))) - .thenReturn( - IOUtils - .toString( - SparkDedupTest.class - .getResourceAsStream( - "/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json"))); - - lenient() - .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("dataset"))) - .thenReturn( - IOUtils - .toString( - SparkDedupTest.class - .getResourceAsStream( - "/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json"))); - - lenient() - .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("otherresearchproduct"))) - .thenReturn( - IOUtils - .toString( - SparkDedupTest.class - .getResourceAsStream( - "/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json"))); - } - - @Test - @Order(1) - void createSimRelsTest() throws Exception { - - ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkCreateSimRels.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json"))); - - parser - .parseArgument( - new String[]{ - "-i", testGraphBasePath, - "-asi", testActionSetId, - "-la", "lookupurl", - "-w", testOutputBasePath, - "-np", "50" - }); - - new SparkCreateSimRels(parser, spark).run(isLookUpService); - - long orgs_simrel = spark - .read() - .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "organization")) - .count(); - - long pubs_simrel = spark - .read() - .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "publication")) - .count(); - - long sw_simrel = spark - .read() - .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "software")) - .count(); - - long ds_simrel = spark - .read() - .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "dataset")) - .count(); - - long orp_simrel = spark - .read() - .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "otherresearchproduct")) - .count(); - - assertEquals(3082, orgs_simrel); - assertEquals(7036, pubs_simrel); - assertEquals(336, sw_simrel); - assertEquals(442, ds_simrel); - assertEquals(6750, orp_simrel); - } - - @Test - @Order(2) - void whitelistSimRelsTest() throws Exception { - - ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkWhitelistSimRels.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/whitelistSimRels_parameters.json"))); - - parser - .parseArgument( - new String[]{ - "-i", testGraphBasePath, - "-asi", testActionSetId, - "-la", "lookupurl", - "-w", testOutputBasePath, - "-np", "50", - "-wl", whitelistPath - }); - - new SparkWhitelistSimRels(parser, spark).run(isLookUpService); - - long orgs_simrel = spark - .read() - .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "organization")) - .count(); - - long pubs_simrel = spark - .read() - .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "publication")) - .count(); - - long ds_simrel = spark - .read() - .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "dataset")) - .count(); - - long orp_simrel = spark - .read() - .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "otherresearchproduct")) - .count(); - - //entities simrels supposed to be equal to the number of previous step (no rels in whitelist) - assertEquals(3082, orgs_simrel); - assertEquals(7036, pubs_simrel); - assertEquals(442, ds_simrel); - assertEquals(6750, orp_simrel); - - //entities simrels to be different from the number of previous step (new simrels in the whitelist) - Dataset sw_simrel = spark - .read() - .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "software")); - - //check if the first relation in the whitelist exists - assertTrue(sw_simrel - .as(Encoders.bean(Relation.class)) - .toJavaRDD() - .filter(rel -> - rel.getSource().equalsIgnoreCase(whiteList.get(0).split(WHITELIST_SEPARATOR)[0]) && rel.getTarget().equalsIgnoreCase(whiteList.get(0).split(WHITELIST_SEPARATOR)[1])).count() > 0); - //check if the second relation in the whitelist exists - assertTrue(sw_simrel - .as(Encoders.bean(Relation.class)) - .toJavaRDD() - .filter(rel -> - rel.getSource().equalsIgnoreCase(whiteList.get(1).split(WHITELIST_SEPARATOR)[0]) && rel.getTarget().equalsIgnoreCase(whiteList.get(1).split(WHITELIST_SEPARATOR)[1])).count() > 0); - - assertEquals(338, sw_simrel.count()); - - } - - @Test - @Order(3) - void cutMergeRelsTest() throws Exception { - - ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkCreateMergeRels.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json"))); - - parser - .parseArgument( - new String[]{ - "-i", - testGraphBasePath, - "-asi", - testActionSetId, - "-la", - "lookupurl", - "-w", - testOutputBasePath, - "-cc", - "3" - }); - - new SparkCreateMergeRels(parser, spark).run(isLookUpService); - - long orgs_mergerel = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel") - .as(Encoders.bean(Relation.class)) - .filter((FilterFunction) r -> r.getRelClass().equalsIgnoreCase("merges")) - .groupBy("source") - .agg(count("target").alias("cnt")) - .select("source", "cnt") - .where("cnt > 3") - .count(); - - long pubs_mergerel = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel") - .as(Encoders.bean(Relation.class)) - .filter((FilterFunction) r -> r.getRelClass().equalsIgnoreCase("merges")) - .groupBy("source") - .agg(count("target").alias("cnt")) - .select("source", "cnt") - .where("cnt > 3") - .count(); - long sw_mergerel = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel") - .as(Encoders.bean(Relation.class)) - .filter((FilterFunction) r -> r.getRelClass().equalsIgnoreCase("merges")) - .groupBy("source") - .agg(count("target").alias("cnt")) - .select("source", "cnt") - .where("cnt > 3") - .count(); - - long ds_mergerel = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel") - .as(Encoders.bean(Relation.class)) - .filter((FilterFunction) r -> r.getRelClass().equalsIgnoreCase("merges")) - .groupBy("source") - .agg(count("target").alias("cnt")) - .select("source", "cnt") - .where("cnt > 3") - .count(); - - long orp_mergerel = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel") - .as(Encoders.bean(Relation.class)) - .filter((FilterFunction) r -> r.getRelClass().equalsIgnoreCase("merges")) - .groupBy("source") - .agg(count("target").alias("cnt")) - .select("source", "cnt") - .where("cnt > 3") - .count(); - - assertEquals(0, orgs_mergerel); - assertEquals(0, pubs_mergerel); - assertEquals(0, sw_mergerel); - assertEquals(0, ds_mergerel); - assertEquals(0, orp_mergerel); - - FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel")); - FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel")); - FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/software_mergerel")); - FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel")); - FileUtils - .deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel")); - } - - @Test - @Order(4) - void createMergeRelsTest() throws Exception { - - ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkCreateMergeRels.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json"))); - - parser - .parseArgument( - new String[]{ - "-i", - testGraphBasePath, - "-asi", - testActionSetId, - "-la", - "lookupurl", - "-w", - testOutputBasePath - }); - - new SparkCreateMergeRels(parser, spark).run(isLookUpService); - - long orgs_mergerel = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel") - .count(); - long pubs_mergerel = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel") - .count(); - long sw_mergerel = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel") - .count(); - long ds_mergerel = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel") - .count(); - - long orp_mergerel = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel") - .count(); - - assertEquals(1272, orgs_mergerel); - assertEquals(1438, pubs_mergerel); - assertEquals(286, sw_mergerel); - assertEquals(472, ds_mergerel); - assertEquals(718, orp_mergerel); - - } - - @Test - @Order(5) - void createDedupRecordTest() throws Exception { - - ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkCreateDedupRecord.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/createDedupRecord_parameters.json"))); - parser - .parseArgument( - new String[]{ - "-i", - testGraphBasePath, - "-asi", - testActionSetId, - "-la", - "lookupurl", - "-w", - testOutputBasePath - }); - - new SparkCreateDedupRecord(parser, spark).run(isLookUpService); - - long orgs_deduprecord = jsc - .textFile(testOutputBasePath + "/" + testActionSetId + "/organization_deduprecord") - .count(); - long pubs_deduprecord = jsc - .textFile(testOutputBasePath + "/" + testActionSetId + "/publication_deduprecord") - .count(); - long sw_deduprecord = jsc - .textFile(testOutputBasePath + "/" + testActionSetId + "/software_deduprecord") - .count(); - long ds_deduprecord = jsc.textFile(testOutputBasePath + "/" + testActionSetId + "/dataset_deduprecord").count(); - long orp_deduprecord = jsc - .textFile( - testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_deduprecord") - .count(); - - assertEquals(85, orgs_deduprecord); - assertEquals(65, pubs_deduprecord); - assertEquals(49, sw_deduprecord); - assertEquals(97, ds_deduprecord); - assertEquals(89, orp_deduprecord); - } - - @Test - @Order(6) - void updateEntityTest() throws Exception { - - ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkUpdateEntity.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json"))); - parser - .parseArgument( - new String[]{ - "-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath - }); - - new SparkUpdateEntity(parser, spark).run(isLookUpService); - - long organizations = jsc.textFile(testDedupGraphBasePath + "/organization").count(); - long publications = jsc.textFile(testDedupGraphBasePath + "/publication").count(); - long projects = jsc.textFile(testDedupGraphBasePath + "/project").count(); - long datasource = jsc.textFile(testDedupGraphBasePath + "/datasource").count(); - long softwares = jsc.textFile(testDedupGraphBasePath + "/software").count(); - long dataset = jsc.textFile(testDedupGraphBasePath + "/dataset").count(); - long otherresearchproduct = jsc.textFile(testDedupGraphBasePath + "/otherresearchproduct").count(); - - long mergedOrgs = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel") - .as(Encoders.bean(Relation.class)) - .where("relClass=='merges'") - .javaRDD() - .map(Relation::getTarget) - .distinct() - .count(); - - long mergedPubs = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel") - .as(Encoders.bean(Relation.class)) - .where("relClass=='merges'") - .javaRDD() - .map(Relation::getTarget) - .distinct() - .count(); - - long mergedSw = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel") - .as(Encoders.bean(Relation.class)) - .where("relClass=='merges'") - .javaRDD() - .map(Relation::getTarget) - .distinct() - .count(); - - long mergedDs = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel") - .as(Encoders.bean(Relation.class)) - .where("relClass=='merges'") - .javaRDD() - .map(Relation::getTarget) - .distinct() - .count(); - - long mergedOrp = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel") - .as(Encoders.bean(Relation.class)) - .where("relClass=='merges'") - .javaRDD() - .map(Relation::getTarget) - .distinct() - .count(); - - assertEquals(896, publications); - assertEquals(838, organizations); - assertEquals(100, projects); - assertEquals(100, datasource); - assertEquals(198, softwares); - assertEquals(389, dataset); - assertEquals(517, otherresearchproduct); - - long deletedOrgs = jsc - .textFile(testDedupGraphBasePath + "/organization") - .filter(this::isDeletedByInference) - .count(); - - long deletedPubs = jsc - .textFile(testDedupGraphBasePath + "/publication") - .filter(this::isDeletedByInference) - .count(); - - long deletedSw = jsc - .textFile(testDedupGraphBasePath + "/software") - .filter(this::isDeletedByInference) - .count(); - - long deletedDs = jsc - .textFile(testDedupGraphBasePath + "/dataset") - .filter(this::isDeletedByInference) - .count(); - - long deletedOrp = jsc - .textFile(testDedupGraphBasePath + "/otherresearchproduct") - .filter(this::isDeletedByInference) - .count(); - - assertEquals(mergedOrgs, deletedOrgs); - assertEquals(mergedPubs, deletedPubs); - assertEquals(mergedSw, deletedSw); - assertEquals(mergedDs, deletedDs); - assertEquals(mergedOrp, deletedOrp); - } - - @Test - @Order(7) - void propagateRelationTest() throws Exception { - - ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkPropagateRelation.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json"))); - parser - .parseArgument( - new String[]{ - "-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath - }); - - new SparkPropagateRelation(parser, spark).run(isLookUpService); - - long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count(); - - assertEquals(4860, relations); - - // check deletedbyinference - final Dataset mergeRels = spark - .read() - .load(DedupUtility.createMergeRelPath(testOutputBasePath, "*", "*")) - .as(Encoders.bean(Relation.class)); - final JavaPairRDD mergedIds = mergeRels - .where("relClass == 'merges'") - .select(mergeRels.col("target")) - .distinct() - .toJavaRDD() - .mapToPair( - (PairFunction) r -> new Tuple2(r.getString(0), "d")); - - JavaRDD toCheck = jsc - .textFile(testDedupGraphBasePath + "/relation") - .mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.source", json), json)) - .join(mergedIds) - .map(t -> t._2()._1()) - .mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.target", json), json)) - .join(mergedIds) - .map(t -> t._2()._1()); - - long deletedbyinference = toCheck.filter(this::isDeletedByInference).count(); - long updated = toCheck.count(); - - assertEquals(updated, deletedbyinference); - } - - @Test - @Order(8) - void testRelations() throws Exception { - testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_1.json", 12, 10); - testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_2.json", 10, 2); - } - - private void testUniqueness(String path, int expected_total, int expected_unique) { - Dataset rel = spark - .read() - .textFile(getClass().getResource(path).getPath()) - .map( - (MapFunction) s -> new ObjectMapper().readValue(s, Relation.class), - Encoders.bean(Relation.class)); - - assertEquals(expected_total, rel.count()); - assertEquals(expected_unique, rel.distinct().count()); - } - - @AfterAll - public static void finalCleanUp() throws IOException { - FileUtils.deleteDirectory(new File(testOutputBasePath)); - FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); - } - - public boolean isDeletedByInference(String s) { - return s.contains("\"deletedbyinference\":true"); - } + @Mock(serializable = true) + ISLookUpService isLookUpService; + + private static SparkSession spark; + private static JavaSparkContext jsc; + + private static String testGraphBasePath; + private static String testOutputBasePath; + private static String testDedupGraphBasePath; + private static final String testActionSetId = "test-orchestrator"; + private static String whitelistPath; + private static List whiteList; + + private static String WHITELIST_SEPARATOR = "####"; + + @BeforeAll + public static void cleanUp() throws IOException, URISyntaxException { + + testGraphBasePath = Paths + .get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/entities").toURI()) + .toFile() + .getAbsolutePath(); + testOutputBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-") + .toAbsolutePath() + .toString(); + + testDedupGraphBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-") + .toAbsolutePath() + .toString(); + + whitelistPath = Paths + .get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/whitelist.simrels.txt").toURI()) + .toFile() + .getAbsolutePath(); + whiteList = IOUtils.readLines(new FileReader(whitelistPath)); + + FileUtils.deleteDirectory(new File(testOutputBasePath)); + FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); + + final SparkConf conf = new SparkConf(); + conf.set("spark.sql.shuffle.partitions", "200"); + spark = SparkSession + .builder() + .appName(SparkDedupTest.class.getSimpleName()) + .master("local[*]") + .config(conf) + .getOrCreate(); + + jsc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + } + + @BeforeEach + public void setUp() throws IOException, ISLookUpException { + + lenient() + .when(isLookUpService.getResourceProfileByQuery(Mockito.contains(testActionSetId))) + .thenReturn( + IOUtils + .toString( + SparkDedupTest.class + .getResourceAsStream( + "/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator.xml"))); + + lenient() + .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("organization"))) + .thenReturn( + IOUtils + .toString( + SparkDedupTest.class + .getResourceAsStream( + "/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json"))); + + lenient() + .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("publication"))) + .thenReturn( + IOUtils + .toString( + SparkDedupTest.class + .getResourceAsStream( + "/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json"))); + + lenient() + .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("software"))) + .thenReturn( + IOUtils + .toString( + SparkDedupTest.class + .getResourceAsStream( + "/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json"))); + + lenient() + .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("dataset"))) + .thenReturn( + IOUtils + .toString( + SparkDedupTest.class + .getResourceAsStream( + "/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json"))); + + lenient() + .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("otherresearchproduct"))) + .thenReturn( + IOUtils + .toString( + SparkDedupTest.class + .getResourceAsStream( + "/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json"))); + } + + @Test + @Order(1) + void createSimRelsTest() throws Exception { + + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCreateSimRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json"))); + + parser + .parseArgument( + new String[] { + "-i", testGraphBasePath, + "-asi", testActionSetId, + "-la", "lookupurl", + "-w", testOutputBasePath, + "-np", "50" + }); + + new SparkCreateSimRels(parser, spark).run(isLookUpService); + + long orgs_simrel = spark + .read() + .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "organization")) + .count(); + + long pubs_simrel = spark + .read() + .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "publication")) + .count(); + + long sw_simrel = spark + .read() + .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "software")) + .count(); + + long ds_simrel = spark + .read() + .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "dataset")) + .count(); + + long orp_simrel = spark + .read() + .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "otherresearchproduct")) + .count(); + + assertEquals(3082, orgs_simrel); + assertEquals(7036, pubs_simrel); + assertEquals(336, sw_simrel); + assertEquals(442, ds_simrel); + assertEquals(6750, orp_simrel); + } + + @Test + @Order(2) + void whitelistSimRelsTest() throws Exception { + + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkWhitelistSimRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/whitelistSimRels_parameters.json"))); + + parser + .parseArgument( + new String[] { + "-i", testGraphBasePath, + "-asi", testActionSetId, + "-la", "lookupurl", + "-w", testOutputBasePath, + "-np", "50", + "-wl", whitelistPath + }); + + new SparkWhitelistSimRels(parser, spark).run(isLookUpService); + + long orgs_simrel = spark + .read() + .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "organization")) + .count(); + + long pubs_simrel = spark + .read() + .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "publication")) + .count(); + + long ds_simrel = spark + .read() + .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "dataset")) + .count(); + + long orp_simrel = spark + .read() + .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "otherresearchproduct")) + .count(); + + // entities simrels supposed to be equal to the number of previous step (no rels in whitelist) + assertEquals(3082, orgs_simrel); + assertEquals(7036, pubs_simrel); + assertEquals(442, ds_simrel); + assertEquals(6750, orp_simrel); + + // entities simrels to be different from the number of previous step (new simrels in the whitelist) + Dataset sw_simrel = spark + .read() + .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "software")); + + // check if the first relation in the whitelist exists + assertTrue( + sw_simrel + .as(Encoders.bean(Relation.class)) + .toJavaRDD() + .filter( + rel -> rel.getSource().equalsIgnoreCase(whiteList.get(0).split(WHITELIST_SEPARATOR)[0]) + && rel.getTarget().equalsIgnoreCase(whiteList.get(0).split(WHITELIST_SEPARATOR)[1])) + .count() > 0); + // check if the second relation in the whitelist exists + assertTrue( + sw_simrel + .as(Encoders.bean(Relation.class)) + .toJavaRDD() + .filter( + rel -> rel.getSource().equalsIgnoreCase(whiteList.get(1).split(WHITELIST_SEPARATOR)[0]) + && rel.getTarget().equalsIgnoreCase(whiteList.get(1).split(WHITELIST_SEPARATOR)[1])) + .count() > 0); + + assertEquals(338, sw_simrel.count()); + + } + + @Test + @Order(3) + void cutMergeRelsTest() throws Exception { + + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCreateMergeRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json"))); + + parser + .parseArgument( + new String[] { + "-i", + testGraphBasePath, + "-asi", + testActionSetId, + "-la", + "lookupurl", + "-w", + testOutputBasePath, + "-cc", + "3" + }); + + new SparkCreateMergeRels(parser, spark).run(isLookUpService); + + long orgs_mergerel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel") + .as(Encoders.bean(Relation.class)) + .filter((FilterFunction) r -> r.getRelClass().equalsIgnoreCase("merges")) + .groupBy("source") + .agg(count("target").alias("cnt")) + .select("source", "cnt") + .where("cnt > 3") + .count(); + + long pubs_mergerel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel") + .as(Encoders.bean(Relation.class)) + .filter((FilterFunction) r -> r.getRelClass().equalsIgnoreCase("merges")) + .groupBy("source") + .agg(count("target").alias("cnt")) + .select("source", "cnt") + .where("cnt > 3") + .count(); + long sw_mergerel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel") + .as(Encoders.bean(Relation.class)) + .filter((FilterFunction) r -> r.getRelClass().equalsIgnoreCase("merges")) + .groupBy("source") + .agg(count("target").alias("cnt")) + .select("source", "cnt") + .where("cnt > 3") + .count(); + + long ds_mergerel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel") + .as(Encoders.bean(Relation.class)) + .filter((FilterFunction) r -> r.getRelClass().equalsIgnoreCase("merges")) + .groupBy("source") + .agg(count("target").alias("cnt")) + .select("source", "cnt") + .where("cnt > 3") + .count(); + + long orp_mergerel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel") + .as(Encoders.bean(Relation.class)) + .filter((FilterFunction) r -> r.getRelClass().equalsIgnoreCase("merges")) + .groupBy("source") + .agg(count("target").alias("cnt")) + .select("source", "cnt") + .where("cnt > 3") + .count(); + + assertEquals(0, orgs_mergerel); + assertEquals(0, pubs_mergerel); + assertEquals(0, sw_mergerel); + assertEquals(0, ds_mergerel); + assertEquals(0, orp_mergerel); + + FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel")); + FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel")); + FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/software_mergerel")); + FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel")); + FileUtils + .deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel")); + } + + @Test + @Order(4) + void createMergeRelsTest() throws Exception { + + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCreateMergeRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json"))); + + parser + .parseArgument( + new String[] { + "-i", + testGraphBasePath, + "-asi", + testActionSetId, + "-la", + "lookupurl", + "-w", + testOutputBasePath + }); + + new SparkCreateMergeRels(parser, spark).run(isLookUpService); + + long orgs_mergerel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel") + .count(); + long pubs_mergerel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel") + .count(); + long sw_mergerel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel") + .count(); + long ds_mergerel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel") + .count(); + + long orp_mergerel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel") + .count(); + + assertEquals(1272, orgs_mergerel); + assertEquals(1438, pubs_mergerel); + assertEquals(286, sw_mergerel); + assertEquals(472, ds_mergerel); + assertEquals(718, orp_mergerel); + + } + + @Test + @Order(5) + void createDedupRecordTest() throws Exception { + + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCreateDedupRecord.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/createDedupRecord_parameters.json"))); + parser + .parseArgument( + new String[] { + "-i", + testGraphBasePath, + "-asi", + testActionSetId, + "-la", + "lookupurl", + "-w", + testOutputBasePath + }); + + new SparkCreateDedupRecord(parser, spark).run(isLookUpService); + + long orgs_deduprecord = jsc + .textFile(testOutputBasePath + "/" + testActionSetId + "/organization_deduprecord") + .count(); + long pubs_deduprecord = jsc + .textFile(testOutputBasePath + "/" + testActionSetId + "/publication_deduprecord") + .count(); + long sw_deduprecord = jsc + .textFile(testOutputBasePath + "/" + testActionSetId + "/software_deduprecord") + .count(); + long ds_deduprecord = jsc.textFile(testOutputBasePath + "/" + testActionSetId + "/dataset_deduprecord").count(); + long orp_deduprecord = jsc + .textFile( + testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_deduprecord") + .count(); + + assertEquals(85, orgs_deduprecord); + assertEquals(65, pubs_deduprecord); + assertEquals(49, sw_deduprecord); + assertEquals(97, ds_deduprecord); + assertEquals(89, orp_deduprecord); + } + + @Test + @Order(6) + void updateEntityTest() throws Exception { + + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkUpdateEntity.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json"))); + parser + .parseArgument( + new String[] { + "-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath + }); + + new SparkUpdateEntity(parser, spark).run(isLookUpService); + + long organizations = jsc.textFile(testDedupGraphBasePath + "/organization").count(); + long publications = jsc.textFile(testDedupGraphBasePath + "/publication").count(); + long projects = jsc.textFile(testDedupGraphBasePath + "/project").count(); + long datasource = jsc.textFile(testDedupGraphBasePath + "/datasource").count(); + long softwares = jsc.textFile(testDedupGraphBasePath + "/software").count(); + long dataset = jsc.textFile(testDedupGraphBasePath + "/dataset").count(); + long otherresearchproduct = jsc.textFile(testDedupGraphBasePath + "/otherresearchproduct").count(); + + long mergedOrgs = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel") + .as(Encoders.bean(Relation.class)) + .where("relClass=='merges'") + .javaRDD() + .map(Relation::getTarget) + .distinct() + .count(); + + long mergedPubs = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel") + .as(Encoders.bean(Relation.class)) + .where("relClass=='merges'") + .javaRDD() + .map(Relation::getTarget) + .distinct() + .count(); + + long mergedSw = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel") + .as(Encoders.bean(Relation.class)) + .where("relClass=='merges'") + .javaRDD() + .map(Relation::getTarget) + .distinct() + .count(); + + long mergedDs = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel") + .as(Encoders.bean(Relation.class)) + .where("relClass=='merges'") + .javaRDD() + .map(Relation::getTarget) + .distinct() + .count(); + + long mergedOrp = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel") + .as(Encoders.bean(Relation.class)) + .where("relClass=='merges'") + .javaRDD() + .map(Relation::getTarget) + .distinct() + .count(); + + assertEquals(896, publications); + assertEquals(838, organizations); + assertEquals(100, projects); + assertEquals(100, datasource); + assertEquals(198, softwares); + assertEquals(389, dataset); + assertEquals(517, otherresearchproduct); + + long deletedOrgs = jsc + .textFile(testDedupGraphBasePath + "/organization") + .filter(this::isDeletedByInference) + .count(); + + long deletedPubs = jsc + .textFile(testDedupGraphBasePath + "/publication") + .filter(this::isDeletedByInference) + .count(); + + long deletedSw = jsc + .textFile(testDedupGraphBasePath + "/software") + .filter(this::isDeletedByInference) + .count(); + + long deletedDs = jsc + .textFile(testDedupGraphBasePath + "/dataset") + .filter(this::isDeletedByInference) + .count(); + + long deletedOrp = jsc + .textFile(testDedupGraphBasePath + "/otherresearchproduct") + .filter(this::isDeletedByInference) + .count(); + + assertEquals(mergedOrgs, deletedOrgs); + assertEquals(mergedPubs, deletedPubs); + assertEquals(mergedSw, deletedSw); + assertEquals(mergedDs, deletedDs); + assertEquals(mergedOrp, deletedOrp); + } + + @Test + @Order(7) + void propagateRelationTest() throws Exception { + + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkPropagateRelation.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json"))); + parser + .parseArgument( + new String[] { + "-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath + }); + + new SparkPropagateRelation(parser, spark).run(isLookUpService); + + long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count(); + + assertEquals(4860, relations); + + // check deletedbyinference + final Dataset mergeRels = spark + .read() + .load(DedupUtility.createMergeRelPath(testOutputBasePath, "*", "*")) + .as(Encoders.bean(Relation.class)); + final JavaPairRDD mergedIds = mergeRels + .where("relClass == 'merges'") + .select(mergeRels.col("target")) + .distinct() + .toJavaRDD() + .mapToPair( + (PairFunction) r -> new Tuple2(r.getString(0), "d")); + + JavaRDD toCheck = jsc + .textFile(testDedupGraphBasePath + "/relation") + .mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.source", json), json)) + .join(mergedIds) + .map(t -> t._2()._1()) + .mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.target", json), json)) + .join(mergedIds) + .map(t -> t._2()._1()); + + long deletedbyinference = toCheck.filter(this::isDeletedByInference).count(); + long updated = toCheck.count(); + + assertEquals(updated, deletedbyinference); + } + + @Test + @Order(8) + void testRelations() throws Exception { + testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_1.json", 12, 10); + testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_2.json", 10, 2); + } + + private void testUniqueness(String path, int expected_total, int expected_unique) { + Dataset rel = spark + .read() + .textFile(getClass().getResource(path).getPath()) + .map( + (MapFunction) s -> new ObjectMapper().readValue(s, Relation.class), + Encoders.bean(Relation.class)); + + assertEquals(expected_total, rel.count()); + assertEquals(expected_unique, rel.distinct().count()); + } + + @AfterAll + public static void finalCleanUp() throws IOException { + FileUtils.deleteDirectory(new File(testOutputBasePath)); + FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); + } + + public boolean isDeletedByInference(String s) { + return s.contains("\"deletedbyinference\":true"); + } }