WIP: various refactors

This commit is contained in:
Claudio Atzori 2023-06-26 16:15:05 +02:00 committed by Sandro La Bruzzo
parent 521dd7f167
commit 326367eccc
4 changed files with 14 additions and 18 deletions

View File

@ -29,7 +29,7 @@ case class SparkDedupConfig(conf: DedupConfig, numPartitions: Int) extends Seria
val modelExtractor: (Dataset[String] => Dataset[Row]) = df => {
df.withColumn("mapDocument", rowFromJsonUDF.apply(df.col(df.columns(0))))
.withColumn("identifier", new Column("mapDocument.identifier"))
.repartition(numPartitions, new Column("identifier"))
.repartition(new Column("identifier"))
.dropDuplicates("identifier")
.select("mapDocument.*")
}
@ -178,12 +178,11 @@ case class SparkDedupConfig(conf: DedupConfig, numPartitions: Int) extends Seria
val res = relBlocks.filter(col("match").equalTo(true))
.select(col("l.identifier").as("from"), col("r.identifier").as("to"))
.repartition(numPartitions)
.repartition()
.dropDuplicates()
// res.show(false)
res.union(res.select(col("to").as("from"), col("from").as("to")))
.select(functions.struct("from", "to"))
res.select(functions.struct("from", "to"))
}
val processClusters: (Dataset[Row] => Dataset[Row]) = df => {
@ -193,7 +192,7 @@ case class SparkDedupConfig(conf: DedupConfig, numPartitions: Int) extends Seria
df.filter(functions.size(new Column("block")).geq(new Literal(2, DataTypes.IntegerType)))
.withColumn("relations", processBlock(df.sqlContext.sparkContext).apply(new Column("block")))
.select(functions.explode(new Column("relations")).as("relation"))
.repartition(numPartitions, new Column("relation"))
.repartition(new Column("relation"))
.dropDuplicates("relation")
}

View File

@ -145,7 +145,6 @@ public class BlockProcessor {
final String type = dedupConf.getWf().getEntityType();
context.emit(type, from, to);
context.emit(type, to, from);
}
}

View File

@ -81,8 +81,6 @@ public class SparkCreateSimRels extends AbstractSparkAction {
final String outputPath = DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity);
removeOutputDir(spark, outputPath);
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
SparkDedupConfig sparkConfig = new SparkDedupConfig(dedupConf, numPartitions);
Dataset<?> simRels = spark

View File

@ -190,11 +190,11 @@ public class SparkDedupTest implements Serializable {
System.out.println("ds_simrel = " + ds_simrel);
System.out.println("orp_simrel = " + orp_simrel);
assertEquals(3076, orgs_simrel);
assertEquals(7046, pubs_simrel);
assertEquals(336, sw_simrel);
assertEquals(442, ds_simrel);
assertEquals(6784, orp_simrel);
assertEquals(1538, orgs_simrel);
assertEquals(3523, pubs_simrel);
assertEquals(168, sw_simrel);
assertEquals(221, ds_simrel);
assertEquals(3392, orp_simrel);
}
@ -239,10 +239,10 @@ public class SparkDedupTest implements Serializable {
.count();
// entities simrels supposed to be equal to the number of previous step (no rels in whitelist)
assertEquals(3076, orgs_simrel);
assertEquals(7046, pubs_simrel);
assertEquals(442, ds_simrel);
assertEquals(6784, orp_simrel);
assertEquals(1538, orgs_simrel);
assertEquals(3523, pubs_simrel);
assertEquals(221, ds_simrel);
assertEquals(3392, orp_simrel);
// System.out.println("orgs_simrel = " + orgs_simrel);
// System.out.println("pubs_simrel = " + pubs_simrel);
// System.out.println("ds_simrel = " + ds_simrel);
@ -272,7 +272,7 @@ public class SparkDedupTest implements Serializable {
&& rel.getTarget().equalsIgnoreCase(whiteList.get(1).split(WHITELIST_SEPARATOR)[1]))
.count() > 0);
assertEquals(338, sw_simrel.count());
assertEquals(170, sw_simrel.count());
// System.out.println("sw_simrel = " + sw_simrel.count());
}