minor changes

This commit is contained in:
Claudio Atzori 2020-05-21 10:00:14 +02:00
parent d7d2a0637f
commit dbfb9c19fe
2 changed files with 59 additions and 32 deletions

View File

@ -17,7 +17,6 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.sources.In;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -31,7 +30,6 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.provision.model.SortableRelation;
import eu.dnetlib.dhp.oa.provision.utils.RelationPartitioner;
import scala.Int;
import scala.Tuple2;
/**
@ -112,26 +110,74 @@ public class PrepareRelationsJob {
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
prepareRelationsRDDFromPaths(
prepareRelationsRDD(
spark, inputRelationsPath, outputPath, relationFilter, relPartitions, maxRelations);
});
}
private static void prepareRelationsFromPaths(
SparkSession spark, String inputRelationsPath, String outputPath, Set<String> relationFilter) {
/**
* Dataset based implementation that prepares the graph relations by limiting the number of outgoing links and
* filtering the relation types according to the given criteria.
*
* @param spark the spark session
* @param inputRelationsPath source path for the graph relations
* @param outputPath output path for the processed relations
* @param relationFilter set of relation filters applied to the `relClass` field
* @param maxRelations maximum number of allowed outgoing edges
*/
private static void prepareRelations(
SparkSession spark, String inputRelationsPath, String outputPath, Set<String> relationFilter,
int maxRelations) {
readPathRelation(spark, inputRelationsPath)
.filter("dataInfo.deletedbyinference == false")
.filter((FilterFunction<SortableRelation>) rel -> !relationFilter.contains(rel.getRelClass()))
.groupByKey(
(MapFunction<SortableRelation, String>) value -> value.getSource(), Encoders.STRING())
.flatMapGroups(
(FlatMapGroupsFunction<String, SortableRelation, SortableRelation>) (key, values) -> Iterators
.limit(values, MAX_RELS),
.limit(values, maxRelations),
Encoders.bean(SortableRelation.class))
.write()
.mode(SaveMode.Overwrite)
.parquet(outputPath);
}
/**
* RDD based implementation that prepares the graph relations by limiting the number of outgoing links and filtering
* the relation types according to the given criteria. Moreover, outgoing links kept within the given limit are
* prioritized according to the weights indicated in eu.dnetlib.dhp.oa.provision.model.SortableRelation.
*
* @param spark the spark session
* @param inputRelationsPath source path for the graph relations
* @param outputPath output path for the processed relations
* @param relationFilter set of relation filters applied to the `relClass` field
* @param maxRelations maximum number of allowed outgoing edges
*/
// TODO work in progress
private static void prepareRelationsRDD(
SparkSession spark, String inputRelationsPath, String outputPath, Set<String> relationFilter, int relPartitions,
int maxRelations) {
JavaRDD<SortableRelation> rels = readPathRelationRDD(spark, inputRelationsPath).repartition(relPartitions);
RelationPartitioner partitioner = new RelationPartitioner(rels.getNumPartitions());
// only consider those that are not virtually deleted
RDD<SortableRelation> d = rels
.filter(rel -> !rel.getDataInfo().getDeletedbyinference())
.filter(rel -> !relationFilter.contains(rel.getRelClass()))
.mapToPair(
(PairFunction<SortableRelation, SortableRelation, SortableRelation>) rel -> new Tuple2<>(rel, rel))
.groupByKey(partitioner)
.map(group -> Iterables.limit(group._2(), maxRelations))
.flatMap(group -> group.iterator())
.rdd();
spark
.createDataset(d, Encoders.bean(SortableRelation.class))
.write()
.mode(SaveMode.Overwrite)
.parquet(outputPath);
}
/**
* Reads a Dataset of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline delimited json text
* file,
@ -150,30 +196,6 @@ public class PrepareRelationsJob {
Encoders.bean(SortableRelation.class));
}
// TODO work in progress
private static void prepareRelationsRDDFromPaths(
SparkSession spark, String inputRelationsPath, String outputPath, Set<String> relationFilter, int relPartitions,
int maxRelations) {
JavaRDD<SortableRelation> rels = readPathRelationRDD(spark, inputRelationsPath).repartition(relPartitions);
// only consider those that are not virtually deleted
RDD<SortableRelation> d = rels
.filter(rel -> !rel.getDataInfo().getDeletedbyinference())
.filter(rel -> !relationFilter.contains(rel.getRelClass()))
.mapToPair(
(PairFunction<SortableRelation, SortableRelation, SortableRelation>) rel -> new Tuple2<>(rel, rel))
.groupByKey(new RelationPartitioner(rels.getNumPartitions()))
.map(group -> Iterables.limit(group._2(), maxRelations))
.flatMap(group -> group.iterator())
.rdd();
spark
.createDataset(d, Encoders.bean(SortableRelation.class))
.write()
.mode(SaveMode.Overwrite)
.parquet(outputPath);
}
private static JavaRDD<SortableRelation> readPathRelationRDD(
SparkSession spark, final String inputPath) {
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());

View File

@ -80,6 +80,11 @@
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
<property>
<name>sparkNetworkTimeout</name>
<description>configures spark.network.timeout</description>
</property>
</parameters>
<global>
@ -357,7 +362,7 @@
<arg>--inputGraphRootPath</arg><arg>${inputGraphRootPath}</arg>
<arg>--inputRelatedEntitiesPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_entities</arg>
<arg>--numPartitions</arg><arg>12000</arg>
<arg>--numPartitions</arg><arg>24000</arg>
</spark>
<ok to="adjancency_lists"/>
<error to="Kill"/>
@ -381,7 +386,7 @@
--conf spark.sql.shuffle.partitions=7680
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputPath</arg> <arg>${workingDir}/join_entities</arg>
<arg>--inputPath</arg><arg>${workingDir}/join_entities</arg>
<arg>--outputPath</arg><arg>${workingDir}/joined</arg>
</spark>
<ok to="convert_to_xml"/>