forked from D-Net/dnet-hadoop
reformat code
This commit is contained in:
parent
a4b6a51168
commit
7cef698f36
|
@ -74,6 +74,7 @@ public class SparkCreateConnectedComponent {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static long getHashcode(final String id) {
|
public static long getHashcode(final String id) {
|
||||||
return Hashing.murmur3_128().hashUnencodedChars(id).asLong();
|
return Hashing.murmur3_128().hashString(id).asLong();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,22 +8,33 @@ object DatasetJoiner {
|
||||||
def startJoin(spark: SparkSession, relPath:String, targetPath:String) {
|
def startJoin(spark: SparkSession, relPath:String, targetPath:String) {
|
||||||
val relation = spark.read.load(relPath)
|
val relation = spark.read.load(relPath)
|
||||||
|
|
||||||
val relatedPublication = relation.where("target like '50%'").groupBy("source").agg(count("target").as("publication")).select(col("source"). alias("p_source"), col("publication"))
|
val relatedPublication = relation
|
||||||
val relatedDataset = relation.where("target like '60%'").groupBy("source").agg(count("target").as("dataset")).select(col("source"). alias("d_source"), col("dataset"))
|
.where("target like '50%'")
|
||||||
val relatedUnknown = relation.where("target like '70%'").groupBy("source").agg(count("target").as("unknown")).select(col("source"). alias("u_source"), col("unknown"))
|
.groupBy("source")
|
||||||
|
.agg(count("target").as("publication"))
|
||||||
|
.select(col("source"). alias("p_source"), col("publication"))
|
||||||
|
val relatedDataset = relation
|
||||||
|
.where("target like '60%'")
|
||||||
|
.groupBy("source")
|
||||||
|
.agg(count("target").as("dataset"))
|
||||||
|
.select(col("source"). alias("d_source"), col("dataset"))
|
||||||
|
val relatedUnknown = relation
|
||||||
|
.where("target like '70%'")
|
||||||
|
.groupBy("source")
|
||||||
|
.agg(count("target").as("unknown"))
|
||||||
|
.select(col("source"). alias("u_source"), col("unknown"))
|
||||||
val firstJoin = relatedPublication
|
val firstJoin = relatedPublication
|
||||||
.join(relatedDataset,col("p_source").equalTo(col("d_source")),"full")
|
.join(relatedDataset,col("p_source").equalTo(col("d_source")),"full")
|
||||||
.select(coalesce(col("p_source"), col("d_source")).alias("id"),
|
.select( coalesce( col("p_source"), col("d_source")).alias("id"),
|
||||||
col("publication"),
|
col("publication"),
|
||||||
col("dataset"))
|
col("dataset"))
|
||||||
.join(relatedUnknown, col("u_source").equalTo(col("id")),"full")
|
.join(relatedUnknown, col("u_source").equalTo(col("id")),"full")
|
||||||
.select(coalesce(col("u_source"), col("id")).alias("source"),
|
.select( coalesce(col("u_source"), col("id")).alias("source"),
|
||||||
coalesce(col("publication"),lit(0)).alias("relatedPublication"),
|
coalesce(col("publication"),lit(0)).alias("relatedPublication"),
|
||||||
coalesce(col("dataset"),lit(0)).alias("relatedDataset"),
|
coalesce(col("dataset"),lit(0)).alias("relatedDataset"),
|
||||||
coalesce(col("unknown"),lit(0)).alias("relatedUnknown")
|
coalesce(col("unknown"),lit(0)).alias("relatedUnknown")
|
||||||
)
|
)
|
||||||
firstJoin.write.mode("overwrite").save(targetPath)
|
firstJoin.write.mode("overwrite").save(targetPath)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue