From 326367eccc30dbecdbe36a3ec980621c4fb805ae Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 26 Jun 2023 16:15:05 +0200 Subject: [PATCH] WIP: various refactors --- .../dnetlib/pace/model/SparkDedupConfig.scala | 9 ++++----- .../eu/dnetlib/pace/util/BlockProcessor.java | 1 - .../dhp/oa/dedup/SparkCreateSimRels.java | 2 -- .../dnetlib/dhp/oa/dedup/SparkDedupTest.java | 20 +++++++++---------- 4 files changed, 14 insertions(+), 18 deletions(-) diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkDedupConfig.scala b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkDedupConfig.scala index def5ebb84a..4a5c4e7aff 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkDedupConfig.scala +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkDedupConfig.scala @@ -29,7 +29,7 @@ case class SparkDedupConfig(conf: DedupConfig, numPartitions: Int) extends Seria val modelExtractor: (Dataset[String] => Dataset[Row]) = df => { df.withColumn("mapDocument", rowFromJsonUDF.apply(df.col(df.columns(0)))) .withColumn("identifier", new Column("mapDocument.identifier")) - .repartition(numPartitions, new Column("identifier")) + .repartition(new Column("identifier")) .dropDuplicates("identifier") .select("mapDocument.*") } @@ -178,12 +178,11 @@ case class SparkDedupConfig(conf: DedupConfig, numPartitions: Int) extends Seria val res = relBlocks.filter(col("match").equalTo(true)) .select(col("l.identifier").as("from"), col("r.identifier").as("to")) - .repartition(numPartitions) + .repartition() .dropDuplicates() // res.show(false) - res.union(res.select(col("to").as("from"), col("from").as("to"))) - .select(functions.struct("from", "to")) + res.select(functions.struct("from", "to")) } val processClusters: (Dataset[Row] => Dataset[Row]) = df => { @@ -193,7 +192,7 @@ case class SparkDedupConfig(conf: DedupConfig, numPartitions: Int) extends Seria df.filter(functions.size(new Column("block")).geq(new Literal(2, DataTypes.IntegerType))) .withColumn("relations", processBlock(df.sqlContext.sparkContext).apply(new Column("block"))) .select(functions.explode(new Column("relations")).as("relation")) - .repartition(numPartitions, new Column("relation")) + .repartition(new Column("relation")) .dropDuplicates("relation") } diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/BlockProcessor.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/BlockProcessor.java index feae7402c2..fae7493834 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/BlockProcessor.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/BlockProcessor.java @@ -145,7 +145,6 @@ public class BlockProcessor { final String type = dedupConf.getWf().getEntityType(); context.emit(type, from, to); - context.emit(type, to, from); } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java index fa9bd39bed..e6e8c745c7 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java @@ -81,8 +81,6 @@ public class SparkCreateSimRels extends AbstractSparkAction { final String outputPath = DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity); removeOutputDir(spark, outputPath); - JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - SparkDedupConfig sparkConfig = new SparkDedupConfig(dedupConf, numPartitions); Dataset simRels = spark diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java index dc9f08c735..ef7cc656cf 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java @@ -190,11 +190,11 @@ public class SparkDedupTest implements Serializable { System.out.println("ds_simrel = " + ds_simrel); System.out.println("orp_simrel = " + orp_simrel); - assertEquals(3076, orgs_simrel); - assertEquals(7046, pubs_simrel); - assertEquals(336, sw_simrel); - assertEquals(442, ds_simrel); - assertEquals(6784, orp_simrel); + assertEquals(1538, orgs_simrel); + assertEquals(3523, pubs_simrel); + assertEquals(168, sw_simrel); + assertEquals(221, ds_simrel); + assertEquals(3392, orp_simrel); } @@ -239,10 +239,10 @@ public class SparkDedupTest implements Serializable { .count(); // entities simrels supposed to be equal to the number of previous step (no rels in whitelist) - assertEquals(3076, orgs_simrel); - assertEquals(7046, pubs_simrel); - assertEquals(442, ds_simrel); - assertEquals(6784, orp_simrel); + assertEquals(1538, orgs_simrel); + assertEquals(3523, pubs_simrel); + assertEquals(221, ds_simrel); + assertEquals(3392, orp_simrel); // System.out.println("orgs_simrel = " + orgs_simrel); // System.out.println("pubs_simrel = " + pubs_simrel); // System.out.println("ds_simrel = " + ds_simrel); @@ -272,7 +272,7 @@ public class SparkDedupTest implements Serializable { && rel.getTarget().equalsIgnoreCase(whiteList.get(1).split(WHITELIST_SEPARATOR)[1])) .count() > 0); - assertEquals(338, sw_simrel.count()); + assertEquals(170, sw_simrel.count()); // System.out.println("sw_simrel = " + sw_simrel.count()); }