enrichment steps #38

Merged
claudio.atzori merged 334 commits from miriam.baglioni/dnet-hadoop:master into enrichment_wfs 2020-08-11 16:40:26 +02:00
1 changed files with 85 additions and 9 deletions
Showing only changes of commit 821be1f8b6 - Show all commits

View File

@ -4,20 +4,20 @@ package eu.dnetlib.dhp.oa.provision;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.rdd.RDD; import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.*;
import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.expressions.Aggregator;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -25,10 +25,11 @@ import com.google.common.collect.Lists;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity; import eu.dnetlib.dhp.oa.provision.model.*;
import eu.dnetlib.dhp.oa.provision.model.JoinedEntity;
import eu.dnetlib.dhp.oa.provision.model.Tuple2;
import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import scala.Function1;
import scala.Function2;
/** /**
* Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. The * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. The
@ -82,17 +83,92 @@ public class AdjacencyListBuilderJob {
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ModelSupport.getOafModelClasses()); List<Class<?>> modelClasses = Arrays.asList(ModelSupport.getOafModelClasses());
modelClasses
.addAll(
Lists
.newArrayList(
TypedRow.class,
EntityRelEntity.class,
JoinedEntity.class,
RelatedEntity.class,
Tuple2.class,
SortableRelation.class));
conf.registerKryoClasses(modelClasses.toArray(new Class[] {}));
runWithSparkSession( runWithSparkSession(
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
removeOutputDir(spark, outputPath); removeOutputDir(spark, outputPath);
createAdjacencyListsRDD(spark, inputPath, outputPath); createAdjacencyListsKryo(spark, inputPath, outputPath);
}); });
} }
private static void createAdjacencyListsKryo(
SparkSession spark, String inputPath, String outputPath) {
TypedColumn<EntityRelEntity, JoinedEntity> aggregator = new AdjacencyListAggregator().toColumn();
log.info("Reading joined entities from: {}", inputPath);
spark
.read()
.load(inputPath)
.as(Encoders.kryo(EntityRelEntity.class))
.groupByKey(
(MapFunction<EntityRelEntity, String>) value -> value.getEntity().getId(),
Encoders.STRING())
.agg(aggregator)
.write()
.mode(SaveMode.Overwrite)
.parquet(outputPath);
}
public static class AdjacencyListAggregator extends Aggregator<EntityRelEntity, JoinedEntity, JoinedEntity> {
@Override
public JoinedEntity zero() {
return new JoinedEntity();
}
@Override
public JoinedEntity reduce(JoinedEntity j, EntityRelEntity e) {
j.setEntity(e.getEntity());
if (j.getLinks().size() <= MAX_LINKS) {
j.getLinks().add(new Tuple2(e.getRelation(), e.getTarget()));
}
return j;
}
@Override
public JoinedEntity merge(JoinedEntity j1, JoinedEntity j2) {
j1.getLinks().addAll(j2.getLinks());
return j1;
}
@Override
public JoinedEntity finish(JoinedEntity j) {
if (j.getLinks().size() > MAX_LINKS) {
ArrayList<Tuple2> links = j
.getLinks()
.stream()
.limit(MAX_LINKS)
.collect(Collectors.toCollection(ArrayList::new));
j.setLinks(links);
}
return j;
}
@Override
public Encoder<JoinedEntity> bufferEncoder() {
return Encoders.kryo(JoinedEntity.class);
}
@Override
public Encoder<JoinedEntity> outputEncoder() {
return Encoders.kryo(JoinedEntity.class);
}
}
private static void createAdjacencyLists( private static void createAdjacencyLists(
SparkSession spark, String inputPath, String outputPath) { SparkSession spark, String inputPath, String outputPath) {