kryo based parallel implementation of CreateRelatedEntitiesJob_phase2, now works by OafType; introduced custom aggregator in AdjacencyListBuilderJob

This commit is contained in:
Claudio Atzori 2020-06-01 00:32:42 +02:00
parent 6f5f498c78
commit 05f269a1c0
10 changed files with 169 additions and 316 deletions

View File

@ -4,32 +4,23 @@ package eu.dnetlib.dhp.oa.provision;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.*; import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.Aggregator; import org.apache.spark.sql.expressions.Aggregator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.provision.model.*; import eu.dnetlib.dhp.oa.provision.model.*;
import eu.dnetlib.dhp.schema.common.ModelSupport; import scala.Tuple2;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import scala.Function1;
import scala.Function2;
import scala.collection.JavaConverters; import scala.collection.JavaConverters;
import scala.collection.Seq; import scala.collection.Seq;
@ -106,127 +97,6 @@ public class AdjacencyListBuilderJob {
log.info("Found paths: {}", String.join(",", paths)); log.info("Found paths: {}", String.join(",", paths));
TypedColumn<EntityRelEntity, JoinedEntity> aggregator = new AdjacencyListAggregator().toColumn();
spark
.read()
.load(toSeq(paths))
.as(Encoders.kryo(EntityRelEntity.class))
.groupByKey(
(MapFunction<EntityRelEntity, String>) value -> value.getEntity().getId(),
Encoders.STRING())
.agg(aggregator)
.write()
.mode(SaveMode.Overwrite)
.parquet(outputPath);
}
public static class AdjacencyListAggregator extends Aggregator<EntityRelEntity, JoinedEntity, JoinedEntity> {
@Override
public JoinedEntity zero() {
return new JoinedEntity();
}
@Override
public JoinedEntity reduce(JoinedEntity j, EntityRelEntity e) {
j.setEntity(e.getEntity());
if (j.getLinks().size() <= MAX_LINKS) {
j.getLinks().add(new Tuple2(e.getRelation(), e.getTarget()));
}
return j;
}
@Override
public JoinedEntity merge(JoinedEntity j1, JoinedEntity j2) {
j1.getLinks().addAll(j2.getLinks());
return j1;
}
@Override
public JoinedEntity finish(JoinedEntity j) {
if (j.getLinks().size() > MAX_LINKS) {
ArrayList<Tuple2> links = j
.getLinks()
.stream()
.limit(MAX_LINKS)
.collect(Collectors.toCollection(ArrayList::new));
j.setLinks(links);
}
return j;
}
@Override
public Encoder<JoinedEntity> bufferEncoder() {
return Encoders.kryo(JoinedEntity.class);
}
@Override
public Encoder<JoinedEntity> outputEncoder() {
return Encoders.kryo(JoinedEntity.class);
}
}
private static void createAdjacencyLists(
SparkSession spark, String inputPath, String outputPath) {
log.info("Reading joined entities from: {}", inputPath);
spark
.read()
.load(inputPath)
.as(Encoders.bean(EntityRelEntity.class))
.groupByKey(
(MapFunction<EntityRelEntity, String>) value -> value.getEntity().getId(),
Encoders.STRING())
.mapGroups(
(MapGroupsFunction<String, EntityRelEntity, JoinedEntity>) (key, values) -> {
JoinedEntity j = new JoinedEntity();
List<Tuple2> links = new ArrayList<>();
while (values.hasNext() && links.size() < MAX_LINKS) {
EntityRelEntity curr = values.next();
if (j.getEntity() == null) {
j.setEntity(curr.getEntity());
}
links.add(new Tuple2(curr.getRelation(), curr.getTarget()));
}
j.setLinks(links);
return j;
},
Encoders.bean(JoinedEntity.class))
.write()
.mode(SaveMode.Overwrite)
.parquet(outputPath);
}
private static void createAdjacencyListsRDD(
SparkSession spark, String inputPath, String outputPath) {
log.info("Reading joined entities from: {}", inputPath);
RDD<JoinedEntity> joinedEntities = spark
.read()
.load(inputPath)
.as(Encoders.bean(EntityRelEntity.class))
.javaRDD()
.mapToPair(re -> {
JoinedEntity je = new JoinedEntity();
je.setEntity(re.getEntity());
je.setLinks(Lists.newArrayList());
if (re.getRelation() != null && re.getTarget() != null) {
je.getLinks().add(new Tuple2(re.getRelation(), re.getTarget()));
}
return new scala.Tuple2<>(re.getEntity().getId(), je);
})
.reduceByKey((je1, je2) -> {
je1.getLinks().addAll(je2.getLinks());
return je1;
})
.map(t -> t._2())
.rdd();
spark
.createDataset(joinedEntities, Encoders.bean(JoinedEntity.class))
.write()
.mode(SaveMode.Overwrite)
.parquet(outputPath);
} }
private static Seq<String> toSeq(List<String> list) { private static Seq<String> toSeq(List<String> list) {

View File

@ -2,7 +2,6 @@
package eu.dnetlib.dhp.oa.provision; package eu.dnetlib.dhp.oa.provision;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.*;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
@ -23,8 +22,9 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity; import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport;
import eu.dnetlib.dhp.oa.provision.model.RelatedEntity; import eu.dnetlib.dhp.oa.provision.model.RelatedEntity;
import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper;
import eu.dnetlib.dhp.oa.provision.model.SortableRelation; import eu.dnetlib.dhp.oa.provision.model.SortableRelation;
import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.common.ModelSupport;
@ -91,7 +91,7 @@ public class CreateRelatedEntitiesJob_phase1 {
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ModelSupport.getOafModelClasses()); conf.registerKryoClasses(ProvisionModelSupport.getModelClasses());
runWithSparkSession( runWithSparkSession(
conf, conf,
@ -120,7 +120,7 @@ public class CreateRelatedEntitiesJob_phase1 {
.filter("dataInfo.invisible == false") .filter("dataInfo.invisible == false")
.map( .map(
(MapFunction<E, RelatedEntity>) value -> asRelatedEntity(value, clazz), (MapFunction<E, RelatedEntity>) value -> asRelatedEntity(value, clazz),
Encoders.bean(RelatedEntity.class)) Encoders.kryo(RelatedEntity.class))
.map( .map(
(MapFunction<RelatedEntity, Tuple2<String, RelatedEntity>>) e -> new Tuple2<>(e.getId(), e), (MapFunction<RelatedEntity, Tuple2<String, RelatedEntity>>) e -> new Tuple2<>(e.getId(), e),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(RelatedEntity.class))) Encoders.tuple(Encoders.STRING(), Encoders.kryo(RelatedEntity.class)))
@ -129,9 +129,9 @@ public class CreateRelatedEntitiesJob_phase1 {
relsByTarget relsByTarget
.joinWith(entities, entities.col("_1").equalTo(relsByTarget.col("_1")), "inner") .joinWith(entities, entities.col("_1").equalTo(relsByTarget.col("_1")), "inner")
.map( .map(
(MapFunction<Tuple2<Tuple2<String, SortableRelation>, Tuple2<String, RelatedEntity>>, EntityRelEntity>) t -> new EntityRelEntity( (MapFunction<Tuple2<Tuple2<String, SortableRelation>, Tuple2<String, RelatedEntity>>, RelatedEntityWrapper>) t -> new RelatedEntityWrapper(
t._1()._2(), t._2()._2()), t._1()._2(), t._2()._2()),
Encoders.bean(EntityRelEntity.class)) Encoders.kryo(RelatedEntityWrapper.class))
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.parquet(outputPath); .parquet(outputPath);

View File

@ -13,10 +13,9 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.expressions.Aggregator;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -26,9 +25,11 @@ import com.google.common.collect.Lists;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity; import eu.dnetlib.dhp.oa.provision.model.JoinedEntity;
import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport; import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport;
import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper;
import eu.dnetlib.dhp.oa.provision.model.TypedRow; import eu.dnetlib.dhp.oa.provision.model.TypedRow;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import scala.Tuple2; import scala.Tuple2;
@ -124,32 +125,84 @@ public class CreateRelatedEntitiesJob_phase2 {
int numPartitions, int numPartitions,
Class<E> entityClazz) { Class<E> entityClazz) {
Dataset<Tuple2<String, E>> entity = readPathEntity(spark, entityPath, entityClazz); Dataset<Tuple2<String, E>> entities = readPathEntity(spark, entityPath, entityClazz);
Dataset<Tuple2<String, EntityRelEntity>> relatedEntities = readRelatedEntities( Dataset<Tuple2<String, RelatedEntityWrapper>> relatedEntities = readRelatedEntities(
spark, relatedEntitiesPath, entityClazz); spark, relatedEntitiesPath, entityClazz);
entity TypedColumn<JoinedEntity, JoinedEntity> aggregator = new AdjacencyListAggregator().toColumn();
.joinWith(relatedEntities, entity.col("_1").equalTo(relatedEntities.col("_1")), "left_outer")
.map((MapFunction<Tuple2<Tuple2<String, E>, Tuple2<String, EntityRelEntity>>, EntityRelEntity>) value -> { entities
EntityRelEntity re = new EntityRelEntity(); .joinWith(relatedEntities, entities.col("_1").equalTo(relatedEntities.col("_1")), "left_outer")
re.setEntity(getTypedRow(entityClazz.getCanonicalName().toLowerCase(), value._1()._2())); .map((MapFunction<Tuple2<Tuple2<String, E>, Tuple2<String, RelatedEntityWrapper>>, JoinedEntity>) value -> {
Optional<EntityRelEntity> related = Optional.ofNullable(value._2()).map(Tuple2::_2); JoinedEntity je = new JoinedEntity(value._1()._2());
if (related.isPresent()) { Optional
re.setRelation(related.get().getRelation()); .ofNullable(value._2())
re.setTarget(related.get().getTarget()); .map(Tuple2::_2)
} .ifPresent(r -> je.getLinks().add(r));
return re; return je;
}, Encoders.bean(EntityRelEntity.class)) }, Encoders.kryo(JoinedEntity.class))
.repartition(numPartitions) .filter(filterEmptyEntityFn())
.filter( .groupByKey(
(FilterFunction<EntityRelEntity>) value -> value.getEntity() != null (MapFunction<JoinedEntity, String>) value -> value.getEntity().getId(),
&& StringUtils.isNotBlank(value.getEntity().getId())) Encoders.STRING())
.agg(aggregator)
.map(
(MapFunction<Tuple2<String, JoinedEntity>, JoinedEntity>) value -> value._2(),
Encoders.kryo(JoinedEntity.class))
.filter(filterEmptyEntityFn())
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.parquet(outputPath); .parquet(outputPath);
} }
private static <E extends OafEntity> Dataset<Tuple2<String, EntityRelEntity>> readRelatedEntities( public static class AdjacencyListAggregator extends Aggregator<JoinedEntity, JoinedEntity, JoinedEntity> {
@Override
public JoinedEntity zero() {
return new JoinedEntity();
}
@Override
public JoinedEntity reduce(JoinedEntity b, JoinedEntity a) {
return mergeAndGet(b, a);
}
private JoinedEntity mergeAndGet(JoinedEntity b, JoinedEntity a) {
b
.setEntity(
Optional
.ofNullable(a.getEntity())
.orElse(
Optional
.ofNullable(b.getEntity())
.orElse(null)));
b.getLinks().addAll(a.getLinks());
return b;
}
@Override
public JoinedEntity merge(JoinedEntity b, JoinedEntity a) {
return mergeAndGet(b, a);
}
@Override
public JoinedEntity finish(JoinedEntity j) {
return j;
}
@Override
public Encoder<JoinedEntity> bufferEncoder() {
return Encoders.kryo(JoinedEntity.class);
}
@Override
public Encoder<JoinedEntity> outputEncoder() {
return Encoders.kryo(JoinedEntity.class);
}
}
private static <E extends OafEntity> Dataset<Tuple2<String, RelatedEntityWrapper>> readRelatedEntities(
SparkSession spark, String inputRelatedEntitiesPath, Class<E> entityClazz) { SparkSession spark, String inputRelatedEntitiesPath, Class<E> entityClazz) {
log.info("Reading related entities from: {}", inputRelatedEntitiesPath); log.info("Reading related entities from: {}", inputRelatedEntitiesPath);
@ -164,12 +217,12 @@ public class CreateRelatedEntitiesJob_phase2 {
return spark return spark
.read() .read()
.load(toSeq(paths)) .load(toSeq(paths))
.as(Encoders.bean(EntityRelEntity.class)) .as(Encoders.kryo(RelatedEntityWrapper.class))
.filter((FilterFunction<EntityRelEntity>) e -> e.getRelation().getSource().startsWith(idPrefix)) .filter((FilterFunction<RelatedEntityWrapper>) e -> e.getRelation().getSource().startsWith(idPrefix))
.map( .map(
(MapFunction<EntityRelEntity, Tuple2<String, EntityRelEntity>>) value -> new Tuple2<>( (MapFunction<RelatedEntityWrapper, Tuple2<String, RelatedEntityWrapper>>) value -> new Tuple2<>(
value.getRelation().getSource(), value), value.getRelation().getSource(), value),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(EntityRelEntity.class))); Encoders.tuple(Encoders.STRING(), Encoders.kryo(RelatedEntityWrapper.class)));
} }
private static <E extends OafEntity> Dataset<Tuple2<String, E>> readPathEntity( private static <E extends OafEntity> Dataset<Tuple2<String, E>> readPathEntity(
@ -250,6 +303,14 @@ public class CreateRelatedEntitiesJob_phase2 {
.anyMatch(c -> "orcid".equals(c.toLowerCase())); .anyMatch(c -> "orcid".equals(c.toLowerCase()));
} }
private static FilterFunction<JoinedEntity> filterEmptyEntityFn() {
return (FilterFunction<JoinedEntity>) v -> Objects.nonNull(v.getEntity());
/*
* return (FilterFunction<JoinedEntity>) v -> Optional .ofNullable(v.getEntity()) .map(e ->
* StringUtils.isNotBlank(e.getId())) .orElse(false);
*/
}
private static TypedRow getTypedRow(String type, OafEntity entity) private static TypedRow getTypedRow(String type, OafEntity entity)
throws JsonProcessingException { throws JsonProcessingException {
TypedRow t = new TypedRow(); TypedRow t = new TypedRow();

View File

@ -4,6 +4,7 @@ package eu.dnetlib.dhp.oa.provision;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -32,6 +33,8 @@ import eu.dnetlib.dhp.oa.provision.utils.ContextMapper;
import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory; import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import scala.Tuple2; import scala.Tuple2;
import scala.collection.JavaConverters;
import scala.collection.Seq;
/** /**
* Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. The * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. The
@ -89,6 +92,8 @@ public class XmlConverterJob {
log.info("otherDsTypeId: {}", otherDsTypeId); log.info("otherDsTypeId: {}", otherDsTypeId);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ProvisionModelSupport.getModelClasses());
runWithSparkSession( runWithSparkSession(
conf, conf,
@ -114,26 +119,18 @@ public class XmlConverterJob {
schemaLocation, schemaLocation,
otherDsTypeId); otherDsTypeId);
final List<String> paths = HdfsSupport
.listFiles(inputPath, spark.sparkContext().hadoopConfiguration());
log.info("Found paths: {}", String.join(",", paths));
spark spark
.read() .read()
.load(inputPath) .load(toSeq(paths))
.as(Encoders.bean(JoinedEntity.class)) .as(Encoders.kryo(JoinedEntity.class))
.map( .map(
(MapFunction<JoinedEntity, JoinedEntity>) j -> { (MapFunction<JoinedEntity, Tuple2<String, String>>) je -> new Tuple2<>(
if (j.getLinks() != null) { je.getEntity().getId(),
j
.setLinks(
j
.getLinks()
.stream()
.filter(t -> t.getRelation() != null & t.getRelatedEntity() != null)
.collect(Collectors.toCollection(ArrayList::new)));
}
return j;
},
Encoders.bean(JoinedEntity.class))
.map(
(MapFunction<JoinedEntity, Tuple2<String, String>>) je -> new Tuple2<>(je.getEntity().getId(),
recordFactory.build(je)), recordFactory.build(je)),
Encoders.tuple(Encoders.STRING(), Encoders.STRING())) Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
.javaRDD() .javaRDD()
@ -148,6 +145,10 @@ public class XmlConverterJob {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
} }
private static Seq<String> toSeq(List<String> list) {
return JavaConverters.asScalaIteratorConverter(list.iterator()).asScala().toSeq();
}
private static Map<String, LongAccumulator> prepareAccumulators(SparkContext sc) { private static Map<String, LongAccumulator> prepareAccumulators(SparkContext sc) {
Map<String, LongAccumulator> accumulators = Maps.newHashMap(); Map<String, LongAccumulator> accumulators = Maps.newHashMap();
accumulators accumulators

View File

@ -3,30 +3,39 @@ package eu.dnetlib.dhp.oa.provision.model;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List; import java.util.List;
public class JoinedEntity implements Serializable { import eu.dnetlib.dhp.schema.oaf.OafEntity;
private TypedRow entity; public class JoinedEntity<E extends OafEntity> implements Serializable {
private List<Tuple2> links = new ArrayList<>(); private E entity;
private List<RelatedEntityWrapper> links;
public JoinedEntity() { public JoinedEntity() {
links = new LinkedList<>();
} }
public TypedRow getEntity() { public JoinedEntity(E entity) {
return entity; this();
}
public void setEntity(TypedRow entity) {
this.entity = entity; this.entity = entity;
} }
public List<Tuple2> getLinks() { public E getEntity() {
return entity;
}
public void setEntity(E entity) {
this.entity = entity;
}
public List<RelatedEntityWrapper> getLinks() {
return links; return links;
} }
public void setLinks(List<Tuple2> links) { public void setLinks(List<RelatedEntityWrapper> links) {
this.links = links; this.links = links;
} }
} }

View File

@ -16,10 +16,9 @@ public class ProvisionModelSupport {
Lists Lists
.newArrayList( .newArrayList(
TypedRow.class, TypedRow.class,
EntityRelEntity.class, RelatedEntityWrapper.class,
JoinedEntity.class, JoinedEntity.class,
RelatedEntity.class, RelatedEntity.class,
Tuple2.class,
SortableRelation.class)); SortableRelation.class));
return modelClasses.toArray(new Class[] {}); return modelClasses.toArray(new Class[] {});
} }

View File

@ -5,33 +5,23 @@ import java.io.Serializable;
import com.google.common.base.Objects; import com.google.common.base.Objects;
public class EntityRelEntity implements Serializable { public class RelatedEntityWrapper implements Serializable {
private TypedRow entity;
private SortableRelation relation; private SortableRelation relation;
private RelatedEntity target; private RelatedEntity target;
public EntityRelEntity() { public RelatedEntityWrapper() {
} }
public EntityRelEntity(SortableRelation relation, RelatedEntity target) { public RelatedEntityWrapper(SortableRelation relation, RelatedEntity target) {
this(null, relation, target); this(null, relation, target);
} }
public EntityRelEntity(TypedRow entity, SortableRelation relation, RelatedEntity target) { public RelatedEntityWrapper(TypedRow entity, SortableRelation relation, RelatedEntity target) {
this.entity = entity;
this.relation = relation; this.relation = relation;
this.target = target; this.target = target;
} }
public TypedRow getEntity() {
return entity;
}
public void setEntity(TypedRow entity) {
this.entity = entity;
}
public SortableRelation getRelation() { public SortableRelation getRelation() {
return relation; return relation;
} }
@ -54,14 +44,13 @@ public class EntityRelEntity implements Serializable {
return true; return true;
if (o == null || getClass() != o.getClass()) if (o == null || getClass() != o.getClass())
return false; return false;
EntityRelEntity that = (EntityRelEntity) o; RelatedEntityWrapper that = (RelatedEntityWrapper) o;
return Objects.equal(entity, that.entity) return Objects.equal(relation, that.relation)
&& Objects.equal(relation, that.relation)
&& Objects.equal(target, that.target); && Objects.equal(target, that.target);
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hashCode(entity, relation, target); return Objects.hashCode(relation, target);
} }
} }

View File

@ -1,53 +0,0 @@
package eu.dnetlib.dhp.oa.provision.model;
import java.io.Serializable;
import java.util.Objects;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class Tuple2 implements Serializable {
private Relation relation;
private RelatedEntity relatedEntity;
public Tuple2() {
}
public Tuple2(Relation relation, RelatedEntity relatedEntity) {
this.relation = relation;
this.relatedEntity = relatedEntity;
}
public Relation getRelation() {
return relation;
}
public void setRelation(Relation relation) {
this.relation = relation;
}
public RelatedEntity getRelatedEntity() {
return relatedEntity;
}
public void setRelatedEntity(RelatedEntity relatedEntity) {
this.relatedEntity = relatedEntity;
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
Tuple2 t2 = (Tuple2) o;
return getRelation().equals(t2.getRelation());
}
@Override
public int hashCode() {
return Objects.hash(getRelation().hashCode());
}
}

View File

@ -85,17 +85,19 @@ public class XmlRecordFactory implements Serializable {
final Set<String> contexts = Sets.newHashSet(); final Set<String> contexts = Sets.newHashSet();
final OafEntity entity = toOafEntity(je.getEntity()); // final OafEntity entity = toOafEntity(je.getEntity());
OafEntity entity = je.getEntity();
TemplateFactory templateFactory = new TemplateFactory(); TemplateFactory templateFactory = new TemplateFactory();
try { try {
final EntityType type = EntityType.valueOf(je.getEntity().getType());
final EntityType type = EntityType.fromClass(entity.getClass());
final List<String> metadata = metadata(type, entity, contexts); final List<String> metadata = metadata(type, entity, contexts);
// rels has to be processed before the contexts because they enrich the contextMap with // rels has to be processed before the contexts because they enrich the contextMap with
// the // the
// funding info. // funding info.
final List<String> relations = je final List<RelatedEntityWrapper> links = je.getLinks();
.getLinks() final List<String> relations = links
.stream() .stream()
.filter(link -> !isDuplicate(link)) .filter(link -> !isDuplicate(link))
.map(link -> mapRelation(contexts, templateFactory, type, link)) .map(link -> mapRelation(contexts, templateFactory, type, link))
@ -975,10 +977,10 @@ public class XmlRecordFactory implements Serializable {
metadata.add(XmlSerializationUtils.mapQualifier("datasourcetypeui", dsType)); metadata.add(XmlSerializationUtils.mapQualifier("datasourcetypeui", dsType));
} }
private List<String> mapFields(Tuple2 link, Set<String> contexts) { private List<String> mapFields(RelatedEntityWrapper link, Set<String> contexts) {
final Relation rel = link.getRelation(); final Relation rel = link.getRelation();
final RelatedEntity re = link.getRelatedEntity(); final RelatedEntity re = link.getTarget();
final String targetType = link.getRelatedEntity().getType(); final String targetType = link.getTarget().getType();
final List<String> metadata = Lists.newArrayList(); final List<String> metadata = Lists.newArrayList();
switch (EntityType.valueOf(targetType)) { switch (EntityType.valueOf(targetType)) {
@ -1089,9 +1091,10 @@ public class XmlRecordFactory implements Serializable {
return metadata; return metadata;
} }
private String mapRelation(Set<String> contexts, TemplateFactory templateFactory, EntityType type, Tuple2 link) { private String mapRelation(Set<String> contexts, TemplateFactory templateFactory, EntityType type,
RelatedEntityWrapper link) {
final Relation rel = link.getRelation(); final Relation rel = link.getRelation();
final String targetType = link.getRelatedEntity().getType(); final String targetType = link.getTarget().getType();
final String scheme = ModelSupport.getScheme(type.toString(), targetType); final String scheme = ModelSupport.getScheme(type.toString(), targetType);
if (StringUtils.isBlank(scheme)) { if (StringUtils.isBlank(scheme)) {
@ -1107,18 +1110,18 @@ public class XmlRecordFactory implements Serializable {
private List<String> listChildren( private List<String> listChildren(
final OafEntity entity, JoinedEntity je, TemplateFactory templateFactory) { final OafEntity entity, JoinedEntity je, TemplateFactory templateFactory) {
EntityType entityType = EntityType.valueOf(je.getEntity().getType()); final EntityType entityType = EntityType.fromClass(je.getEntity().getClass());
List<String> children = je final List<RelatedEntityWrapper> links = je.getLinks();
.getLinks() List<String> children = links
.stream() .stream()
.filter(link -> isDuplicate(link)) .filter(link -> isDuplicate(link))
.map(link -> { .map(link -> {
final String targetType = link.getRelatedEntity().getType(); final String targetType = link.getTarget().getType();
final String name = ModelSupport.getMainType(EntityType.valueOf(targetType)); final String name = ModelSupport.getMainType(EntityType.valueOf(targetType));
final HashSet<String> fields = Sets.newHashSet(mapFields(link, null)); final HashSet<String> fields = Sets.newHashSet(mapFields(link, null));
return templateFactory return templateFactory
.getChild(name, link.getRelatedEntity().getId(), Lists.newArrayList(fields)); .getChild(name, link.getTarget().getId(), Lists.newArrayList(fields));
}) })
.collect(Collectors.toCollection(ArrayList::new)); .collect(Collectors.toCollection(ArrayList::new));
@ -1227,7 +1230,7 @@ public class XmlRecordFactory implements Serializable {
return children; return children;
} }
private boolean isDuplicate(Tuple2 link) { private boolean isDuplicate(RelatedEntityWrapper link) {
return REL_SUBTYPE_DEDUP.equalsIgnoreCase(link.getRelation().getSubRelType()); return REL_SUBTYPE_DEDUP.equalsIgnoreCase(link.getRelation().getSubRelType());
} }

View File

@ -104,7 +104,6 @@
<case to="prepare_relations">${wf:conf('resumeFrom') eq 'prepare_relations'}</case> <case to="prepare_relations">${wf:conf('resumeFrom') eq 'prepare_relations'}</case>
<case to="fork_join_related_entities">${wf:conf('resumeFrom') eq 'fork_join_related_entities'}</case> <case to="fork_join_related_entities">${wf:conf('resumeFrom') eq 'fork_join_related_entities'}</case>
<case to="fork_join_all_entities">${wf:conf('resumeFrom') eq 'fork_join_all_entities'}</case> <case to="fork_join_all_entities">${wf:conf('resumeFrom') eq 'fork_join_all_entities'}</case>
<case to="adjancency_lists">${wf:conf('resumeFrom') eq 'adjancency_lists'}</case>
<case to="convert_to_xml">${wf:conf('resumeFrom') eq 'convert_to_xml'}</case> <case to="convert_to_xml">${wf:conf('resumeFrom') eq 'convert_to_xml'}</case>
<case to="to_solr_index">${wf:conf('resumeFrom') eq 'to_solr_index'}</case> <case to="to_solr_index">${wf:conf('resumeFrom') eq 'to_solr_index'}</case>
<default to="prepare_relations"/> <default to="prepare_relations"/>
@ -373,7 +372,7 @@
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg> <arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--inputRelatedEntitiesPath</arg><arg>${workingDir}/join_partial</arg> <arg>--inputRelatedEntitiesPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_entities/publication</arg> <arg>--outputPath</arg><arg>${workingDir}/join_entities/publication</arg>
<arg>--numPartitions</arg><arg>35000</arg> <arg>--numPartitions</arg><arg>30000</arg>
</spark> </spark>
<ok to="wait_join_phase2"/> <ok to="wait_join_phase2"/>
<error to="Kill"/> <error to="Kill"/>
@ -394,7 +393,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15360 --conf spark.sql.shuffle.partitions=7680
--conf spark.network.timeout=${sparkNetworkTimeout} --conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/dataset</arg> <arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/dataset</arg>
@ -422,7 +421,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15360 --conf spark.sql.shuffle.partitions=7680
--conf spark.network.timeout=${sparkNetworkTimeout} --conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/otherresearchproduct</arg> <arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/otherresearchproduct</arg>
@ -450,7 +449,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15360 --conf spark.sql.shuffle.partitions=3840
--conf spark.network.timeout=${sparkNetworkTimeout} --conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/software</arg> <arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/software</arg>
@ -478,7 +477,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15360 --conf spark.sql.shuffle.partitions=7680
--conf spark.network.timeout=${sparkNetworkTimeout} --conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/datasource</arg> <arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/datasource</arg>
@ -506,7 +505,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15360 --conf spark.sql.shuffle.partitions=7680
--conf spark.network.timeout=${sparkNetworkTimeout} --conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/organization</arg> <arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/organization</arg>
@ -534,7 +533,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15360 --conf spark.sql.shuffle.partitions=3840
--conf spark.network.timeout=${sparkNetworkTimeout} --conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/project</arg> <arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/project</arg>
@ -547,32 +546,7 @@
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<join name="wait_join_phase2" to="adjancency_lists"/> <join name="wait_join_phase2" to="convert_to_xml"/>
<action name="adjancency_lists">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>build_adjacency_lists</name>
<class>eu.dnetlib.dhp.oa.provision.AdjacencyListBuilderJob</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15360
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputPath</arg><arg>${workingDir}/join_entities</arg>
<arg>--outputPath</arg><arg>${workingDir}/joined</arg>
</spark>
<ok to="convert_to_xml"/>
<error to="Kill"/>
</action>
<action name="convert_to_xml"> <action name="convert_to_xml">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
@ -592,7 +566,7 @@
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
--conf spark.network.timeout=${sparkNetworkTimeout} --conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${workingDir}/joined</arg> <arg>--inputPath</arg><arg>${workingDir}/join_entities</arg>
<arg>--outputPath</arg><arg>${workingDir}/xml</arg> <arg>--outputPath</arg><arg>${workingDir}/xml</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--otherDsTypeId</arg><arg>${otherDsTypeId}</arg> <arg>--otherDsTypeId</arg><arg>${otherDsTypeId}</arg>