WIP: prepare relation job

This commit is contained in:
Claudio Atzori 2020-06-24 19:01:15 +02:00
parent 0e723d378b
commit 46e76affeb
2 changed files with 57 additions and 27 deletions

View File

@ -3,7 +3,9 @@ package eu.dnetlib.dhp.oa.provision;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.*; import java.util.*;
import java.util.function.Supplier;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
@ -18,7 +20,9 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Splitter; import com.google.common.base.Splitter;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
@ -59,6 +63,21 @@ public class PrepareRelationsJob {
public static final int DEFAULT_NUM_PARTITIONS = 3000; public static final int DEFAULT_NUM_PARTITIONS = 3000;
private static final Map<String, Integer> weights = Maps.newHashMap();
static {
weights.put("outcome", 0);
weights.put("supplement", 1);
weights.put("affiliation", 2);
weights.put("relationship", 3);
weights.put("publicationDataset", 4);
weights.put("similarity", 5);
weights.put("provision", 6);
weights.put("participation", 7);
weights.put("dedup", 8);
}
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils String jsonConfiguration = IOUtils
.toString( .toString(
@ -132,11 +151,15 @@ public class PrepareRelationsJob {
.filter(rel -> !relationFilter.contains(rel.getRelClass())) .filter(rel -> !relationFilter.contains(rel.getRelClass()))
// group by SOURCE and apply limit // group by SOURCE and apply limit
.groupBy(r -> SortableRelationKey.create(r, r.getSource())) .groupBy(r -> SortableRelationKey.create(r, r.getSource()))
.repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions)) .repartitionAndSortWithinPartitions(
new RelationPartitioner(relPartitions),
(SerializableComparator<SortableRelationKey>) (o1, o2) -> compare(o1, o2))
.flatMap(t -> Iterables.limit(t._2(), maxRelations).iterator()) .flatMap(t -> Iterables.limit(t._2(), maxRelations).iterator())
// group by TARGET and apply limit // group by TARGET and apply limit
.groupBy(r -> SortableRelationKey.create(r, r.getTarget())) .groupBy(r -> SortableRelationKey.create(r, r.getTarget()))
.repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions)) .repartitionAndSortWithinPartitions(
new RelationPartitioner(relPartitions),
(SerializableComparator<SortableRelationKey>) (o1, o2) -> compare(o1, o2))
.flatMap(t -> Iterables.limit(t._2(), maxRelations).iterator()) .flatMap(t -> Iterables.limit(t._2(), maxRelations).iterator())
.rdd(); .rdd();
@ -147,6 +170,24 @@ public class PrepareRelationsJob {
.parquet(outputPath); .parquet(outputPath);
} }
private static int compare(SortableRelationKey o1, SortableRelationKey o2) {
final Integer w1 = Optional.ofNullable(weights.get(o1.getSubRelType())).orElse(Integer.MAX_VALUE);
final Integer w2 = Optional.ofNullable(weights.get(o2.getSubRelType())).orElse(Integer.MAX_VALUE);
return ComparisonChain
.start()
.compare(w1, w2)
.compare(o1.getSource(), o2.getSource())
.compare(o1.getTarget(), o2.getTarget())
.result();
}
@FunctionalInterface
public interface SerializableComparator<T> extends Comparator<T>, Serializable {
@Override
int compare(T o1, T o2);
}
/** /**
* Reads a JavaRDD of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline delimited json text * Reads a JavaRDD of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline delimited json text
* file, * file,

View File

@ -5,27 +5,13 @@ import java.io.Serializable;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import com.google.common.base.Objects;
import com.google.common.collect.ComparisonChain; import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Relation;
public class SortableRelationKey implements Comparable<SortableRelationKey>, Serializable { public class SortableRelationKey implements Serializable {
private static final Map<String, Integer> weights = Maps.newHashMap();
static {
weights.put("outcome", 0);
weights.put("supplement", 1);
weights.put("affiliation", 2);
weights.put("relationship", 3);
weights.put("publicationDataset", 4);
weights.put("similarity", 5);
weights.put("provision", 6);
weights.put("participation", 7);
weights.put("dedup", 8);
}
private String groupingKey; private String groupingKey;
@ -49,15 +35,18 @@ public class SortableRelationKey implements Comparable<SortableRelationKey>, Ser
} }
@Override @Override
public int compareTo(SortableRelationKey o) { public boolean equals(Object o) {
final Integer wt = Optional.ofNullable(weights.get(getSubRelType())).orElse(Integer.MAX_VALUE); if (this == o)
final Integer wo = Optional.ofNullable(weights.get(o.getSubRelType())).orElse(Integer.MAX_VALUE); return true;
return ComparisonChain if (o == null || getClass() != o.getClass())
.start() return false;
.compare(wt, wo) SortableRelationKey that = (SortableRelationKey) o;
.compare(getSource(), o.getSource()) return Objects.equal(getGroupingKey(), that.getGroupingKey());
.compare(getTarget(), o.getTarget()) }
.result();
@Override
public int hashCode() {
return Objects.hashCode(getGroupingKey());
} }
public void setSource(String source) { public void setSource(String source) {