diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent.java b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent.java index 16e112c252..01a99da1b6 100644 --- a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent.java +++ b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent.java @@ -74,6 +74,7 @@ public class SparkCreateConnectedComponent { } public static long getHashcode(final String id) { - return Hashing.murmur3_128().hashUnencodedChars(id).asLong(); + return Hashing.murmur3_128().hashString(id).asLong(); } + } diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/DatasetJoiner.scala b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/DatasetJoiner.scala index a550bff344..afc33c34a3 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/DatasetJoiner.scala +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/DatasetJoiner.scala @@ -8,22 +8,33 @@ object DatasetJoiner { def startJoin(spark: SparkSession, relPath:String, targetPath:String) { val relation = spark.read.load(relPath) - val relatedPublication = relation.where("target like '50%'").groupBy("source").agg(count("target").as("publication")).select(col("source"). alias("p_source"), col("publication")) - val relatedDataset = relation.where("target like '60%'").groupBy("source").agg(count("target").as("dataset")).select(col("source"). alias("d_source"), col("dataset")) - val relatedUnknown = relation.where("target like '70%'").groupBy("source").agg(count("target").as("unknown")).select(col("source"). alias("u_source"), col("unknown")) + val relatedPublication = relation + .where("target like '50%'") + .groupBy("source") + .agg(count("target").as("publication")) + .select(col("source"). alias("p_source"), col("publication")) + val relatedDataset = relation + .where("target like '60%'") + .groupBy("source") + .agg(count("target").as("dataset")) + .select(col("source"). alias("d_source"), col("dataset")) + val relatedUnknown = relation + .where("target like '70%'") + .groupBy("source") + .agg(count("target").as("unknown")) + .select(col("source"). alias("u_source"), col("unknown")) val firstJoin = relatedPublication - .join(relatedDataset,col("p_source").equalTo(col("d_source")),"full") - .select(coalesce(col("p_source"), col("d_source")).alias("id"), - col("publication"), - col("dataset")) - .join(relatedUnknown, col("u_source").equalTo(col("id")),"full") - .select(coalesce(col("u_source"), col("id")).alias("source"), - coalesce(col("publication"),lit(0)).alias("relatedPublication"), - coalesce(col("dataset"),lit(0)).alias("relatedDataset"), - coalesce(col("unknown"),lit(0)).alias("relatedUnknown") - ) + .join(relatedDataset,col("p_source").equalTo(col("d_source")),"full") + .select( coalesce( col("p_source"), col("d_source")).alias("id"), + col("publication"), + col("dataset")) + .join(relatedUnknown, col("u_source").equalTo(col("id")),"full") + .select( coalesce(col("u_source"), col("id")).alias("source"), + coalesce(col("publication"),lit(0)).alias("relatedPublication"), + coalesce(col("dataset"),lit(0)).alias("relatedDataset"), + coalesce(col("unknown"),lit(0)).alias("relatedUnknown") + ) firstJoin.write.mode("overwrite").save(targetPath) - } }