diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java index 58875eec07..8549839fca 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java @@ -24,7 +24,7 @@ abstract class AbstractSparkAction implements Serializable { public ArgumentApplicationParser parser; //parameters for the spark action public SparkSession spark; //the spark session - public AbstractSparkAction(ArgumentApplicationParser parser, SparkSession spark) throws Exception { + public AbstractSparkAction(ArgumentApplicationParser parser, SparkSession spark) { this.parser = parser; this.spark = spark; diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateConnectedComponent.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java similarity index 91% rename from dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateConnectedComponent.java rename to dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java index d0ef580342..1425422f80 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateConnectedComponent.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java @@ -16,7 +16,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.graphx.Edge; import org.apache.spark.rdd.RDD; @@ -31,11 +30,11 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; -public class SparkCreateConnectedComponent extends AbstractSparkAction { +public class SparkCreateMergeRels extends AbstractSparkAction { - private static final Log log = LogFactory.getLog(SparkCreateConnectedComponent.class); + private static final Log log = LogFactory.getLog(SparkCreateMergeRels.class); - public SparkCreateConnectedComponent(ArgumentApplicationParser parser, SparkSession spark) throws Exception { + public SparkCreateMergeRels(ArgumentApplicationParser parser, SparkSession spark) throws Exception { super(parser, spark); } @@ -45,7 +44,7 @@ public class SparkCreateConnectedComponent extends AbstractSparkAction { SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json"))); parser.parseArgument(args); - new SparkCreateConnectedComponent(parser, getSparkSession(parser)).run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); + new SparkCreateMergeRels(parser, getSparkSession(parser)).run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); } @Override diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml index 25596bc2ff..3c73fefb6a 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml @@ -86,7 +86,7 @@ yarn cluster Create Merge Relations - eu.dnetlib.dhp.oa.dedup.SparkCreateConnectedComponent + eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels dhp-dedup-openaire-${projectVersion}.jar --executor-memory ${sparkExecutorMemory} diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java index 3d7f574608..0a43003968 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java @@ -22,6 +22,7 @@ import java.io.Serializable; import java.net.URISyntaxException; import java.nio.file.Paths; +import static java.nio.file.Files.createTempDirectory; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.lenient; @@ -36,16 +37,20 @@ public class SparkDedupTest implements Serializable { private static JavaSparkContext jsc; private static String testGraphBasePath; - private final static String testOutputBasePath = "/tmp/test_dedup_workflow"; + private static String testOutputBasePath = "/tmp/test_dedup_workflow"; private final static String testActionSetId = "test-orchestrator"; - private final static String testDedupGraphBasePath = "/tmp/test_dedup_workflow/dedup_graph"; + private static String testDedupGraphBasePath = "/tmp/test_dedup_workflow/dedup_graph"; @BeforeAll private static void cleanUp() throws IOException, URISyntaxException { testGraphBasePath = Paths.get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/entities").toURI()).toFile().getAbsolutePath(); + testOutputBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-").toAbsolutePath().toString(); + testDedupGraphBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-").toAbsolutePath().toString(); + FileUtils.deleteDirectory(new File(testOutputBasePath)); + FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); spark = SparkSession .builder() @@ -54,7 +59,7 @@ public class SparkDedupTest implements Serializable { .config(new SparkConf()) .getOrCreate(); - jsc = new JavaSparkContext(spark.sparkContext()); + jsc = JavaSparkContext.fromSparkContext(spark.sparkContext()); } @@ -69,7 +74,6 @@ public class SparkDedupTest implements Serializable { lenient().when(isLookUpService.getResourceProfileByQuery(Mockito.contains("publication"))) .thenReturn(IOUtils.toString(SparkDedupTest.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json"))); - } @Test @@ -97,11 +101,11 @@ public class SparkDedupTest implements Serializable { @Test @Order(2) - public void createCCTest() throws Exception { + public void createMergeRelsTest() throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils.toString( - SparkCreateConnectedComponent.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json"))); + SparkCreateMergeRels.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json"))); parser.parseArgument(new String[]{ "-mt", "local[*]", "-i", testGraphBasePath, @@ -109,7 +113,7 @@ public class SparkDedupTest implements Serializable { "-la", "lookupurl", "-w", testOutputBasePath}); - new SparkCreateConnectedComponent(parser, spark).run(isLookUpService); + new SparkCreateMergeRels(parser, spark).run(isLookUpService); long orgs_mergerel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel").count(); long pubs_mergerel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel").count(); @@ -185,8 +189,6 @@ public class SparkDedupTest implements Serializable { assertEquals(mergedOrgs, deletedOrgs); assertEquals(mergedPubs, deletedPubs); - - //TODO check the size of other entities not deduplicated } @Test @@ -211,6 +213,12 @@ public class SparkDedupTest implements Serializable { } + @AfterAll + public static void finalCleanUp() throws IOException { + FileUtils.deleteDirectory(new File(testOutputBasePath)); + FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); + } + public boolean isDeletedByInference(String s) { return s.contains("\"deletedbyinference\":true"); }