forked from D-Net/dnet-hadoop
reintroduced RDD based relation cut off procedure
This commit is contained in:
parent
f3bc8aed31
commit
0bdfbb0a57
|
@ -3,7 +3,9 @@ package eu.dnetlib.dhp.oa.provision;
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
|
@ -15,17 +17,21 @@ import org.apache.spark.sql.Dataset;
|
||||||
import org.apache.spark.sql.Encoders;
|
import org.apache.spark.sql.Encoders;
|
||||||
import org.apache.spark.sql.SaveMode;
|
import org.apache.spark.sql.SaveMode;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
import org.apache.spark.sql.sources.In;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.common.base.Splitter;
|
||||||
import com.google.common.collect.Iterables;
|
import com.google.common.collect.Iterables;
|
||||||
import com.google.common.collect.Iterators;
|
import com.google.common.collect.Iterators;
|
||||||
|
import com.google.common.collect.Sets;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||||
import eu.dnetlib.dhp.oa.provision.model.SortableRelation;
|
import eu.dnetlib.dhp.oa.provision.model.SortableRelation;
|
||||||
import eu.dnetlib.dhp.oa.provision.utils.RelationPartitioner;
|
import eu.dnetlib.dhp.oa.provision.utils.RelationPartitioner;
|
||||||
|
import scala.Int;
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -58,6 +64,8 @@ public class PrepareRelationsJob {
|
||||||
|
|
||||||
public static final int MAX_RELS = 100;
|
public static final int MAX_RELS = 100;
|
||||||
|
|
||||||
|
public static final int DEFAULT_NUM_PARTITIONS = 3000;
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
String jsonConfiguration = IOUtils
|
String jsonConfiguration = IOUtils
|
||||||
.toString(
|
.toString(
|
||||||
|
@ -79,6 +87,24 @@ public class PrepareRelationsJob {
|
||||||
String outputPath = parser.get("outputPath");
|
String outputPath = parser.get("outputPath");
|
||||||
log.info("outputPath: {}", outputPath);
|
log.info("outputPath: {}", outputPath);
|
||||||
|
|
||||||
|
int relPartitions = Optional
|
||||||
|
.ofNullable(parser.get("relPartitions"))
|
||||||
|
.map(Integer::valueOf)
|
||||||
|
.orElse(DEFAULT_NUM_PARTITIONS);
|
||||||
|
log.info("relPartitions: {}", relPartitions);
|
||||||
|
|
||||||
|
Set<String> relationFilter = Optional
|
||||||
|
.ofNullable(parser.get("relationFilter"))
|
||||||
|
.map(s -> Sets.newHashSet(Splitter.on(",").split(s)))
|
||||||
|
.orElse(new HashSet<>());
|
||||||
|
log.info("relationFilter: {}", relationFilter);
|
||||||
|
|
||||||
|
int maxRelations = Optional
|
||||||
|
.ofNullable(parser.get("maxRelations"))
|
||||||
|
.map(Integer::valueOf)
|
||||||
|
.orElse(MAX_RELS);
|
||||||
|
log.info("maxRelations: {}", maxRelations);
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
SparkConf conf = new SparkConf();
|
||||||
|
|
||||||
runWithSparkSession(
|
runWithSparkSession(
|
||||||
|
@ -86,12 +112,13 @@ public class PrepareRelationsJob {
|
||||||
isSparkSessionManaged,
|
isSparkSessionManaged,
|
||||||
spark -> {
|
spark -> {
|
||||||
removeOutputDir(spark, outputPath);
|
removeOutputDir(spark, outputPath);
|
||||||
prepareRelationsFromPaths(spark, inputRelationsPath, outputPath);
|
prepareRelationsRDDFromPaths(
|
||||||
|
spark, inputRelationsPath, outputPath, relationFilter, relPartitions, maxRelations);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void prepareRelationsFromPaths(
|
private static void prepareRelationsFromPaths(
|
||||||
SparkSession spark, String inputRelationsPath, String outputPath) {
|
SparkSession spark, String inputRelationsPath, String outputPath, Set<String> relationFilter) {
|
||||||
readPathRelation(spark, inputRelationsPath)
|
readPathRelation(spark, inputRelationsPath)
|
||||||
.filter("dataInfo.deletedbyinference == false")
|
.filter("dataInfo.deletedbyinference == false")
|
||||||
.groupByKey(
|
.groupByKey(
|
||||||
|
@ -125,20 +152,19 @@ public class PrepareRelationsJob {
|
||||||
|
|
||||||
// TODO work in progress
|
// TODO work in progress
|
||||||
private static void prepareRelationsRDDFromPaths(
|
private static void prepareRelationsRDDFromPaths(
|
||||||
SparkSession spark, String inputRelationsPath, String outputPath, int numPartitions) {
|
SparkSession spark, String inputRelationsPath, String outputPath, Set<String> relationFilter, int relPartitions,
|
||||||
JavaRDD<SortableRelation> rels = readPathRelationRDD(spark, inputRelationsPath).repartition(numPartitions);
|
int maxRelations) {
|
||||||
|
JavaRDD<SortableRelation> rels = readPathRelationRDD(spark, inputRelationsPath).repartition(relPartitions);
|
||||||
|
|
||||||
|
// only consider those that are not virtually deleted
|
||||||
RDD<SortableRelation> d = rels
|
RDD<SortableRelation> d = rels
|
||||||
.filter(rel -> !rel.getDataInfo().getDeletedbyinference()) // only
|
.filter(rel -> !rel.getDataInfo().getDeletedbyinference())
|
||||||
// consider
|
.filter(rel -> !relationFilter.contains(rel.getRelClass()))
|
||||||
// those
|
|
||||||
// that are not virtually
|
|
||||||
// deleted
|
|
||||||
.mapToPair(
|
.mapToPair(
|
||||||
(PairFunction<SortableRelation, SortableRelation, SortableRelation>) rel -> new Tuple2<>(rel, rel))
|
(PairFunction<SortableRelation, SortableRelation, SortableRelation>) rel -> new Tuple2<>(rel, rel))
|
||||||
.groupByKey(new RelationPartitioner(rels.getNumPartitions()))
|
.groupByKey(new RelationPartitioner(rels.getNumPartitions()))
|
||||||
.map(p -> Iterables.limit(p._2(), MAX_RELS))
|
.map(group -> Iterables.limit(group._2(), maxRelations))
|
||||||
.flatMap(p -> p.iterator())
|
.flatMap(group -> group.iterator())
|
||||||
.rdd();
|
.rdd();
|
||||||
|
|
||||||
spark
|
spark
|
||||||
|
|
|
@ -16,10 +16,10 @@ public class SortableRelation extends Relation implements Comparable<Relation>,
|
||||||
static {
|
static {
|
||||||
weights.put("outcome", 0);
|
weights.put("outcome", 0);
|
||||||
weights.put("supplement", 1);
|
weights.put("supplement", 1);
|
||||||
weights.put("publicationDataset", 2);
|
weights.put("affiliation", 2);
|
||||||
weights.put("relationship", 3);
|
weights.put("relationship", 3);
|
||||||
weights.put("similarity", 4);
|
weights.put("publicationDataset", 4);
|
||||||
weights.put("affiliation", 5);
|
weights.put("similarity", 5);
|
||||||
|
|
||||||
weights.put("provision", 6);
|
weights.put("provision", 6);
|
||||||
weights.put("participation", 7);
|
weights.put("participation", 7);
|
||||||
|
|
|
@ -21,6 +21,18 @@
|
||||||
"paramName": "rp",
|
"paramName": "rp",
|
||||||
"paramLongName": "relPartitions",
|
"paramLongName": "relPartitions",
|
||||||
"paramDescription": "number or partitions for the relations Dataset",
|
"paramDescription": "number or partitions for the relations Dataset",
|
||||||
"paramRequired": true
|
"paramRequired": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "rf",
|
||||||
|
"paramLongName": "relationFilter",
|
||||||
|
"paramDescription": "filter applied reading relations (by relClass)",
|
||||||
|
"paramRequired": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "mr",
|
||||||
|
"paramLongName": "maxRelations",
|
||||||
|
"paramDescription": "maximum number of relations applied reading relations (by relClass)",
|
||||||
|
"paramRequired": false
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
|
|
|
@ -102,7 +102,9 @@
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--inputRelationsPath</arg><arg>${inputGraphRootPath}/relation</arg>
|
<arg>--inputRelationsPath</arg><arg>${inputGraphRootPath}/relation</arg>
|
||||||
<arg>--outputPath</arg><arg>${workingDir}/relation</arg>
|
<arg>--outputPath</arg><arg>${workingDir}/relation</arg>
|
||||||
<arg>--relPartitions</arg><arg>3000</arg>
|
<arg>--relPartitions</arg><arg>${relPartitions}</arg>
|
||||||
|
<arg>--relationFilter</arg><arg>${relationFilter}</arg>
|
||||||
|
<arg>--maxRelations</arg><arg>${maxRelations}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="fork_join_related_entities"/>
|
<ok to="fork_join_related_entities"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
|
|
Loading…
Reference in New Issue