forked from antonis.lempesis/dnet-hadoop
added RDD based adjacency list creation procedure
This commit is contained in:
parent
f057dcdf65
commit
8047d16dd9
|
@ -9,14 +9,20 @@ import java.util.Optional;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
|
import org.apache.spark.api.java.function.Function2;
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
import org.apache.spark.api.java.function.MapGroupsFunction;
|
import org.apache.spark.api.java.function.MapGroupsFunction;
|
||||||
|
import org.apache.spark.api.java.function.PairFunction;
|
||||||
|
import org.apache.spark.rdd.RDD;
|
||||||
import org.apache.spark.sql.Encoders;
|
import org.apache.spark.sql.Encoders;
|
||||||
import org.apache.spark.sql.SaveMode;
|
import org.apache.spark.sql.SaveMode;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||||
import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity;
|
import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity;
|
||||||
|
@ -83,7 +89,7 @@ public class AdjacencyListBuilderJob {
|
||||||
isSparkSessionManaged,
|
isSparkSessionManaged,
|
||||||
spark -> {
|
spark -> {
|
||||||
removeOutputDir(spark, outputPath);
|
removeOutputDir(spark, outputPath);
|
||||||
createAdjacencyLists(spark, inputPath, outputPath);
|
createAdjacencyListsRDD(spark, inputPath, outputPath);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -118,6 +124,38 @@ public class AdjacencyListBuilderJob {
|
||||||
.parquet(outputPath);
|
.parquet(outputPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void createAdjacencyListsRDD(
|
||||||
|
SparkSession spark, String inputPath, String outputPath) {
|
||||||
|
|
||||||
|
log.info("Reading joined entities from: {}", inputPath);
|
||||||
|
RDD<JoinedEntity> joinedEntities = spark
|
||||||
|
.read()
|
||||||
|
.load(inputPath)
|
||||||
|
.as(Encoders.bean(EntityRelEntity.class))
|
||||||
|
.javaRDD()
|
||||||
|
.mapToPair(re -> {
|
||||||
|
JoinedEntity je = new JoinedEntity();
|
||||||
|
je.setEntity(re.getEntity());
|
||||||
|
je.setLinks(Lists.newArrayList());
|
||||||
|
if (re.getRelation() != null && re.getTarget() != null) {
|
||||||
|
je.getLinks().add(new Tuple2(re.getRelation(), re.getTarget()));
|
||||||
|
}
|
||||||
|
return new scala.Tuple2<>(re.getEntity().getId(), je);
|
||||||
|
})
|
||||||
|
.reduceByKey((je1, je2) -> {
|
||||||
|
je1.getLinks().addAll(je2.getLinks());
|
||||||
|
return je1;
|
||||||
|
})
|
||||||
|
.map(t -> t._2())
|
||||||
|
.rdd();
|
||||||
|
|
||||||
|
spark
|
||||||
|
.createDataset(joinedEntities, Encoders.bean(JoinedEntity.class))
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.parquet(outputPath);
|
||||||
|
}
|
||||||
|
|
||||||
private static void removeOutputDir(SparkSession spark, String path) {
|
private static void removeOutputDir(SparkSession spark, String path) {
|
||||||
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
|
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue