1
0
Fork 0

WIP: prepare relation job

This commit is contained in:
Claudio Atzori 2020-06-25 12:22:18 +02:00
parent 6933ec11fb
commit e62333192c
2 changed files with 27 additions and 33 deletions

View File

@ -1,42 +1,33 @@
package eu.dnetlib.dhp.oa.provision; package eu.dnetlib.dhp.oa.provision;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Splitter;
import java.io.Serializable; import com.google.common.collect.Iterables;
import java.util.*; import com.google.common.collect.Sets;
import java.util.function.Supplier; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import javax.annotation.Nullable; import eu.dnetlib.dhp.oa.provision.model.SortableRelationKey;
import eu.dnetlib.dhp.oa.provision.utils.RelationPartitioner;
import eu.dnetlib.dhp.schema.oaf.Relation;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.rdd.RDD; import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicate;
import com.google.common.base.Splitter;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.provision.model.SortableRelationKey;
import eu.dnetlib.dhp.oa.provision.utils.RelationPartitioner;
import eu.dnetlib.dhp.schema.oaf.Relation;
import scala.Tuple2; import scala.Tuple2;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
/** /**
* Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. The * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. The
* operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization, and * operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization, and
@ -136,32 +127,35 @@ public class PrepareRelationsJob {
SparkSession spark, String inputRelationsPath, String outputPath, Set<String> relationFilter, int maxRelations, SparkSession spark, String inputRelationsPath, String outputPath, Set<String> relationFilter, int maxRelations,
int relPartitions) { int relPartitions) {
RDD<Relation> cappedRels = readPathRelationRDD(spark, inputRelationsPath) // group by SOURCE and apply limit
RDD<Relation> bySource = readPathRelationRDD(spark, inputRelationsPath)
.filter(rel -> rel.getDataInfo().getDeletedbyinference() == false) .filter(rel -> rel.getDataInfo().getDeletedbyinference() == false)
.filter(rel -> relationFilter.contains(rel.getRelClass()) == false) .filter(rel -> relationFilter.contains(rel.getRelClass()) == false)
// group by SOURCE and apply limit
.mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, r.getSource()), r)) .mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, r.getSource()), r))
.repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions)) .repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions))
.groupBy(Tuple2::_1) .groupBy(Tuple2::_1)
.map(Tuple2::_2) .map(Tuple2::_2)
.map(t -> Iterables.filter(t, input -> input._1().getSubRelType().equals("outcome"))) .map(t -> Iterables.limit(t, maxRelations))
.flatMap(Iterable::iterator) .flatMap(Iterable::iterator)
.map(Tuple2::_2) .map(Tuple2::_2)
.rdd();
// group by TARGET and apply limit // group by TARGET and apply limit
RDD<Relation> byTarget = readPathRelationRDD(spark, inputRelationsPath)
.filter(rel -> rel.getDataInfo().getDeletedbyinference() == false)
.filter(rel -> relationFilter.contains(rel.getRelClass()) == false)
.mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, r.getTarget()), r)) .mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, r.getTarget()), r))
.repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions)) .repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions))
.groupBy(Tuple2::_1) .groupBy(Tuple2::_1)
.map(Tuple2::_2) .map(Tuple2::_2)
.map(t -> Iterables.filter(t, input -> input._1().getSubRelType().equals("outcome"))) .map(t -> Iterables.limit(t, maxRelations))
// .map(t -> Iterables.limit(t, maxRelations))
.flatMap(Iterable::iterator) .flatMap(Iterable::iterator)
.map(Tuple2::_2) .map(Tuple2::_2)
.rdd(); .rdd();
spark spark
.createDataset(cappedRels, Encoders.bean(Relation.class)) .createDataset(bySource.union(byTarget), Encoders.bean(Relation.class))
.repartition(relPartitions)
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.parquet(outputPath); .parquet(outputPath);

View File

@ -50,7 +50,7 @@ public class SortableRelationKey implements Comparable<SortableRelationKey>, Ser
if (o == null || getClass() != o.getClass()) if (o == null || getClass() != o.getClass())
return false; return false;
SortableRelationKey that = (SortableRelationKey) o; SortableRelationKey that = (SortableRelationKey) o;
return Objects.equal(getGroupingKey(), that.getGroupingKey()); return getGroupingKey().equals(that.getGroupingKey());
} }
@Override @Override