From 05f269a1c085296471f42554c3d86472e0d78fb4 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 1 Jun 2020 00:32:42 +0200 Subject: [PATCH] kryo based parallel implementation of CreateRelatedEntitiesJob_phase2, now works by OafType; introduced custom aggregator in AdjacencyListBuilderJob --- .../oa/provision/AdjacencyListBuilderJob.java | 132 +----------------- .../CreateRelatedEntitiesJob_phase1.java | 12 +- .../CreateRelatedEntitiesJob_phase2.java | 117 ++++++++++++---- .../dhp/oa/provision/XmlConverterJob.java | 35 ++--- .../dhp/oa/provision/model/JoinedEntity.java | 29 ++-- .../model/ProvisionModelSupport.java | 3 +- ...lEntity.java => RelatedEntityWrapper.java} | 25 +--- .../dhp/oa/provision/model/Tuple2.java | 53 ------- .../oa/provision/utils/XmlRecordFactory.java | 33 +++-- .../dhp/oa/provision/oozie_app/workflow.xml | 46 ++---- 10 files changed, 169 insertions(+), 316 deletions(-) rename dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/{EntityRelEntity.java => RelatedEntityWrapper.java} (56%) delete mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Tuple2.java diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java index 910138988..d9cc03cd5 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java @@ -4,32 +4,23 @@ package eu.dnetlib.dhp.oa.provision; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapGroupsFunction; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.rdd.RDD; import org.apache.spark.sql.*; import org.apache.spark.sql.expressions.Aggregator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; - import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.oa.provision.model.*; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.Oaf; -import scala.Function1; -import scala.Function2; +import scala.Tuple2; import scala.collection.JavaConverters; import scala.collection.Seq; @@ -106,127 +97,6 @@ public class AdjacencyListBuilderJob { log.info("Found paths: {}", String.join(",", paths)); - TypedColumn aggregator = new AdjacencyListAggregator().toColumn(); - spark - .read() - .load(toSeq(paths)) - .as(Encoders.kryo(EntityRelEntity.class)) - .groupByKey( - (MapFunction) value -> value.getEntity().getId(), - Encoders.STRING()) - .agg(aggregator) - .write() - .mode(SaveMode.Overwrite) - .parquet(outputPath); - } - - public static class AdjacencyListAggregator extends Aggregator { - - @Override - public JoinedEntity zero() { - return new JoinedEntity(); - } - - @Override - public JoinedEntity reduce(JoinedEntity j, EntityRelEntity e) { - j.setEntity(e.getEntity()); - if (j.getLinks().size() <= MAX_LINKS) { - j.getLinks().add(new Tuple2(e.getRelation(), e.getTarget())); - } - return j; - } - - @Override - public JoinedEntity merge(JoinedEntity j1, JoinedEntity j2) { - j1.getLinks().addAll(j2.getLinks()); - return j1; - } - - @Override - public JoinedEntity finish(JoinedEntity j) { - if (j.getLinks().size() > MAX_LINKS) { - ArrayList links = j - .getLinks() - .stream() - .limit(MAX_LINKS) - .collect(Collectors.toCollection(ArrayList::new)); - j.setLinks(links); - } - return j; - } - - @Override - public Encoder bufferEncoder() { - return Encoders.kryo(JoinedEntity.class); - } - - @Override - public Encoder outputEncoder() { - return Encoders.kryo(JoinedEntity.class); - } - } - - private static void createAdjacencyLists( - SparkSession spark, String inputPath, String outputPath) { - - log.info("Reading joined entities from: {}", inputPath); - spark - .read() - .load(inputPath) - .as(Encoders.bean(EntityRelEntity.class)) - .groupByKey( - (MapFunction) value -> value.getEntity().getId(), - Encoders.STRING()) - .mapGroups( - (MapGroupsFunction) (key, values) -> { - JoinedEntity j = new JoinedEntity(); - List links = new ArrayList<>(); - while (values.hasNext() && links.size() < MAX_LINKS) { - EntityRelEntity curr = values.next(); - if (j.getEntity() == null) { - j.setEntity(curr.getEntity()); - } - links.add(new Tuple2(curr.getRelation(), curr.getTarget())); - } - j.setLinks(links); - return j; - }, - Encoders.bean(JoinedEntity.class)) - .write() - .mode(SaveMode.Overwrite) - .parquet(outputPath); - } - - private static void createAdjacencyListsRDD( - SparkSession spark, String inputPath, String outputPath) { - - log.info("Reading joined entities from: {}", inputPath); - RDD joinedEntities = spark - .read() - .load(inputPath) - .as(Encoders.bean(EntityRelEntity.class)) - .javaRDD() - .mapToPair(re -> { - JoinedEntity je = new JoinedEntity(); - je.setEntity(re.getEntity()); - je.setLinks(Lists.newArrayList()); - if (re.getRelation() != null && re.getTarget() != null) { - je.getLinks().add(new Tuple2(re.getRelation(), re.getTarget())); - } - return new scala.Tuple2<>(re.getEntity().getId(), je); - }) - .reduceByKey((je1, je2) -> { - je1.getLinks().addAll(je2.getLinks()); - return je1; - }) - .map(t -> t._2()) - .rdd(); - - spark - .createDataset(joinedEntities, Encoders.bean(JoinedEntity.class)) - .write() - .mode(SaveMode.Overwrite) - .parquet(outputPath); } private static Seq toSeq(List list) { diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java index ccb20a136..4d2633bc5 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java @@ -2,7 +2,6 @@ package eu.dnetlib.dhp.oa.provision; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.*; import java.util.List; import java.util.Objects; @@ -23,8 +22,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity; +import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport; import eu.dnetlib.dhp.oa.provision.model.RelatedEntity; +import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper; import eu.dnetlib.dhp.oa.provision.model.SortableRelation; import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelSupport; @@ -91,7 +91,7 @@ public class CreateRelatedEntitiesJob_phase1 { SparkConf conf = new SparkConf(); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.registerKryoClasses(ModelSupport.getOafModelClasses()); + conf.registerKryoClasses(ProvisionModelSupport.getModelClasses()); runWithSparkSession( conf, @@ -120,7 +120,7 @@ public class CreateRelatedEntitiesJob_phase1 { .filter("dataInfo.invisible == false") .map( (MapFunction) value -> asRelatedEntity(value, clazz), - Encoders.bean(RelatedEntity.class)) + Encoders.kryo(RelatedEntity.class)) .map( (MapFunction>) e -> new Tuple2<>(e.getId(), e), Encoders.tuple(Encoders.STRING(), Encoders.kryo(RelatedEntity.class))) @@ -129,9 +129,9 @@ public class CreateRelatedEntitiesJob_phase1 { relsByTarget .joinWith(entities, entities.col("_1").equalTo(relsByTarget.col("_1")), "inner") .map( - (MapFunction, Tuple2>, EntityRelEntity>) t -> new EntityRelEntity( + (MapFunction, Tuple2>, RelatedEntityWrapper>) t -> new RelatedEntityWrapper( t._1()._2(), t._2()._2()), - Encoders.bean(EntityRelEntity.class)) + Encoders.kryo(RelatedEntityWrapper.class)) .write() .mode(SaveMode.Overwrite) .parquet(outputPath); diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java index 757ab47d3..5ef30d6e1 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java @@ -13,10 +13,9 @@ import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.*; import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.expressions.Aggregator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,9 +25,11 @@ import com.google.common.collect.Lists; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity; +import eu.dnetlib.dhp.oa.provision.model.JoinedEntity; import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport; +import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper; import eu.dnetlib.dhp.oa.provision.model.TypedRow; +import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import scala.Tuple2; @@ -75,7 +76,7 @@ public class CreateRelatedEntitiesJob_phase2 { .toString( PrepareRelationsJob.class .getResourceAsStream( - "/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase2.json")); + "/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase2.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); @@ -124,32 +125,84 @@ public class CreateRelatedEntitiesJob_phase2 { int numPartitions, Class entityClazz) { - Dataset> entity = readPathEntity(spark, entityPath, entityClazz); - Dataset> relatedEntities = readRelatedEntities( + Dataset> entities = readPathEntity(spark, entityPath, entityClazz); + Dataset> relatedEntities = readRelatedEntities( spark, relatedEntitiesPath, entityClazz); - entity - .joinWith(relatedEntities, entity.col("_1").equalTo(relatedEntities.col("_1")), "left_outer") - .map((MapFunction, Tuple2>, EntityRelEntity>) value -> { - EntityRelEntity re = new EntityRelEntity(); - re.setEntity(getTypedRow(entityClazz.getCanonicalName().toLowerCase(), value._1()._2())); - Optional related = Optional.ofNullable(value._2()).map(Tuple2::_2); - if (related.isPresent()) { - re.setRelation(related.get().getRelation()); - re.setTarget(related.get().getTarget()); - } - return re; - }, Encoders.bean(EntityRelEntity.class)) - .repartition(numPartitions) - .filter( - (FilterFunction) value -> value.getEntity() != null - && StringUtils.isNotBlank(value.getEntity().getId())) + TypedColumn aggregator = new AdjacencyListAggregator().toColumn(); + + entities + .joinWith(relatedEntities, entities.col("_1").equalTo(relatedEntities.col("_1")), "left_outer") + .map((MapFunction, Tuple2>, JoinedEntity>) value -> { + JoinedEntity je = new JoinedEntity(value._1()._2()); + Optional + .ofNullable(value._2()) + .map(Tuple2::_2) + .ifPresent(r -> je.getLinks().add(r)); + return je; + }, Encoders.kryo(JoinedEntity.class)) + .filter(filterEmptyEntityFn()) + .groupByKey( + (MapFunction) value -> value.getEntity().getId(), + Encoders.STRING()) + .agg(aggregator) + .map( + (MapFunction, JoinedEntity>) value -> value._2(), + Encoders.kryo(JoinedEntity.class)) + .filter(filterEmptyEntityFn()) .write() .mode(SaveMode.Overwrite) .parquet(outputPath); } - private static Dataset> readRelatedEntities( + public static class AdjacencyListAggregator extends Aggregator { + + @Override + public JoinedEntity zero() { + return new JoinedEntity(); + } + + @Override + public JoinedEntity reduce(JoinedEntity b, JoinedEntity a) { + return mergeAndGet(b, a); + } + + private JoinedEntity mergeAndGet(JoinedEntity b, JoinedEntity a) { + b + .setEntity( + Optional + .ofNullable(a.getEntity()) + .orElse( + Optional + .ofNullable(b.getEntity()) + .orElse(null))); + b.getLinks().addAll(a.getLinks()); + return b; + } + + @Override + public JoinedEntity merge(JoinedEntity b, JoinedEntity a) { + return mergeAndGet(b, a); + } + + @Override + public JoinedEntity finish(JoinedEntity j) { + return j; + } + + @Override + public Encoder bufferEncoder() { + return Encoders.kryo(JoinedEntity.class); + } + + @Override + public Encoder outputEncoder() { + return Encoders.kryo(JoinedEntity.class); + } + + } + + private static Dataset> readRelatedEntities( SparkSession spark, String inputRelatedEntitiesPath, Class entityClazz) { log.info("Reading related entities from: {}", inputRelatedEntitiesPath); @@ -164,12 +217,12 @@ public class CreateRelatedEntitiesJob_phase2 { return spark .read() .load(toSeq(paths)) - .as(Encoders.bean(EntityRelEntity.class)) - .filter((FilterFunction) e -> e.getRelation().getSource().startsWith(idPrefix)) + .as(Encoders.kryo(RelatedEntityWrapper.class)) + .filter((FilterFunction) e -> e.getRelation().getSource().startsWith(idPrefix)) .map( - (MapFunction>) value -> new Tuple2<>( + (MapFunction>) value -> new Tuple2<>( value.getRelation().getSource(), value), - Encoders.tuple(Encoders.STRING(), Encoders.kryo(EntityRelEntity.class))); + Encoders.tuple(Encoders.STRING(), Encoders.kryo(RelatedEntityWrapper.class))); } private static Dataset> readPathEntity( @@ -250,6 +303,14 @@ public class CreateRelatedEntitiesJob_phase2 { .anyMatch(c -> "orcid".equals(c.toLowerCase())); } + private static FilterFunction filterEmptyEntityFn() { + return (FilterFunction) v -> Objects.nonNull(v.getEntity()); + /* + * return (FilterFunction) v -> Optional .ofNullable(v.getEntity()) .map(e -> + * StringUtils.isNotBlank(e.getId())) .orElse(false); + */ + } + private static TypedRow getTypedRow(String type, OafEntity entity) throws JsonProcessingException { TypedRow t = new TypedRow(); diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java index a88b28592..a1ed7fd2a 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java @@ -4,6 +4,7 @@ package eu.dnetlib.dhp.oa.provision; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; @@ -32,6 +33,8 @@ import eu.dnetlib.dhp.oa.provision.utils.ContextMapper; import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory; import eu.dnetlib.dhp.schema.oaf.*; import scala.Tuple2; +import scala.collection.JavaConverters; +import scala.collection.Seq; /** * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. The @@ -89,6 +92,8 @@ public class XmlConverterJob { log.info("otherDsTypeId: {}", otherDsTypeId); SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ProvisionModelSupport.getModelClasses()); runWithSparkSession( conf, @@ -114,26 +119,18 @@ public class XmlConverterJob { schemaLocation, otherDsTypeId); + final List paths = HdfsSupport + .listFiles(inputPath, spark.sparkContext().hadoopConfiguration()); + + log.info("Found paths: {}", String.join(",", paths)); + spark .read() - .load(inputPath) - .as(Encoders.bean(JoinedEntity.class)) + .load(toSeq(paths)) + .as(Encoders.kryo(JoinedEntity.class)) .map( - (MapFunction) j -> { - if (j.getLinks() != null) { - j - .setLinks( - j - .getLinks() - .stream() - .filter(t -> t.getRelation() != null & t.getRelatedEntity() != null) - .collect(Collectors.toCollection(ArrayList::new))); - } - return j; - }, - Encoders.bean(JoinedEntity.class)) - .map( - (MapFunction>) je -> new Tuple2<>(je.getEntity().getId(), + (MapFunction>) je -> new Tuple2<>( + je.getEntity().getId(), recordFactory.build(je)), Encoders.tuple(Encoders.STRING(), Encoders.STRING())) .javaRDD() @@ -148,6 +145,10 @@ public class XmlConverterJob { HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } + private static Seq toSeq(List list) { + return JavaConverters.asScalaIteratorConverter(list.iterator()).asScala().toSeq(); + } + private static Map prepareAccumulators(SparkContext sc) { Map accumulators = Maps.newHashMap(); accumulators diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java index 7681fa76f..2eb9cf38b 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java @@ -3,30 +3,39 @@ package eu.dnetlib.dhp.oa.provision.model; import java.io.Serializable; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; -public class JoinedEntity implements Serializable { +import eu.dnetlib.dhp.schema.oaf.OafEntity; - private TypedRow entity; +public class JoinedEntity implements Serializable { - private List links = new ArrayList<>(); + private E entity; + + private List links; public JoinedEntity() { + links = new LinkedList<>(); } - public TypedRow getEntity() { - return entity; - } - - public void setEntity(TypedRow entity) { + public JoinedEntity(E entity) { + this(); this.entity = entity; } - public List getLinks() { + public E getEntity() { + return entity; + } + + public void setEntity(E entity) { + this.entity = entity; + } + + public List getLinks() { return links; } - public void setLinks(List links) { + public void setLinks(List links) { this.links = links; } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java index 3cccce7c4..f9fde14e5 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java @@ -16,10 +16,9 @@ public class ProvisionModelSupport { Lists .newArrayList( TypedRow.class, - EntityRelEntity.class, + RelatedEntityWrapper.class, JoinedEntity.class, RelatedEntity.class, - Tuple2.class, SortableRelation.class)); return modelClasses.toArray(new Class[] {}); } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/EntityRelEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper.java similarity index 56% rename from dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/EntityRelEntity.java rename to dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper.java index a6b3c5591..d708b6ed0 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/EntityRelEntity.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper.java @@ -5,33 +5,23 @@ import java.io.Serializable; import com.google.common.base.Objects; -public class EntityRelEntity implements Serializable { +public class RelatedEntityWrapper implements Serializable { - private TypedRow entity; private SortableRelation relation; private RelatedEntity target; - public EntityRelEntity() { + public RelatedEntityWrapper() { } - public EntityRelEntity(SortableRelation relation, RelatedEntity target) { + public RelatedEntityWrapper(SortableRelation relation, RelatedEntity target) { this(null, relation, target); } - public EntityRelEntity(TypedRow entity, SortableRelation relation, RelatedEntity target) { - this.entity = entity; + public RelatedEntityWrapper(TypedRow entity, SortableRelation relation, RelatedEntity target) { this.relation = relation; this.target = target; } - public TypedRow getEntity() { - return entity; - } - - public void setEntity(TypedRow entity) { - this.entity = entity; - } - public SortableRelation getRelation() { return relation; } @@ -54,14 +44,13 @@ public class EntityRelEntity implements Serializable { return true; if (o == null || getClass() != o.getClass()) return false; - EntityRelEntity that = (EntityRelEntity) o; - return Objects.equal(entity, that.entity) - && Objects.equal(relation, that.relation) + RelatedEntityWrapper that = (RelatedEntityWrapper) o; + return Objects.equal(relation, that.relation) && Objects.equal(target, that.target); } @Override public int hashCode() { - return Objects.hashCode(entity, relation, target); + return Objects.hashCode(relation, target); } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Tuple2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Tuple2.java deleted file mode 100644 index 5ebe9c9eb..000000000 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Tuple2.java +++ /dev/null @@ -1,53 +0,0 @@ - -package eu.dnetlib.dhp.oa.provision.model; - -import java.io.Serializable; -import java.util.Objects; - -import eu.dnetlib.dhp.schema.oaf.Relation; - -public class Tuple2 implements Serializable { - - private Relation relation; - - private RelatedEntity relatedEntity; - - public Tuple2() { - } - - public Tuple2(Relation relation, RelatedEntity relatedEntity) { - this.relation = relation; - this.relatedEntity = relatedEntity; - } - - public Relation getRelation() { - return relation; - } - - public void setRelation(Relation relation) { - this.relation = relation; - } - - public RelatedEntity getRelatedEntity() { - return relatedEntity; - } - - public void setRelatedEntity(RelatedEntity relatedEntity) { - this.relatedEntity = relatedEntity; - } - - @Override - public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - Tuple2 t2 = (Tuple2) o; - return getRelation().equals(t2.getRelation()); - } - - @Override - public int hashCode() { - return Objects.hash(getRelation().hashCode()); - } -} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java index f99298130..d950a816d 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java @@ -85,17 +85,19 @@ public class XmlRecordFactory implements Serializable { final Set contexts = Sets.newHashSet(); - final OafEntity entity = toOafEntity(je.getEntity()); + // final OafEntity entity = toOafEntity(je.getEntity()); + OafEntity entity = je.getEntity(); TemplateFactory templateFactory = new TemplateFactory(); try { - final EntityType type = EntityType.valueOf(je.getEntity().getType()); + + final EntityType type = EntityType.fromClass(entity.getClass()); final List metadata = metadata(type, entity, contexts); // rels has to be processed before the contexts because they enrich the contextMap with // the // funding info. - final List relations = je - .getLinks() + final List links = je.getLinks(); + final List relations = links .stream() .filter(link -> !isDuplicate(link)) .map(link -> mapRelation(contexts, templateFactory, type, link)) @@ -975,10 +977,10 @@ public class XmlRecordFactory implements Serializable { metadata.add(XmlSerializationUtils.mapQualifier("datasourcetypeui", dsType)); } - private List mapFields(Tuple2 link, Set contexts) { + private List mapFields(RelatedEntityWrapper link, Set contexts) { final Relation rel = link.getRelation(); - final RelatedEntity re = link.getRelatedEntity(); - final String targetType = link.getRelatedEntity().getType(); + final RelatedEntity re = link.getTarget(); + final String targetType = link.getTarget().getType(); final List metadata = Lists.newArrayList(); switch (EntityType.valueOf(targetType)) { @@ -1089,9 +1091,10 @@ public class XmlRecordFactory implements Serializable { return metadata; } - private String mapRelation(Set contexts, TemplateFactory templateFactory, EntityType type, Tuple2 link) { + private String mapRelation(Set contexts, TemplateFactory templateFactory, EntityType type, + RelatedEntityWrapper link) { final Relation rel = link.getRelation(); - final String targetType = link.getRelatedEntity().getType(); + final String targetType = link.getTarget().getType(); final String scheme = ModelSupport.getScheme(type.toString(), targetType); if (StringUtils.isBlank(scheme)) { @@ -1107,18 +1110,18 @@ public class XmlRecordFactory implements Serializable { private List listChildren( final OafEntity entity, JoinedEntity je, TemplateFactory templateFactory) { - EntityType entityType = EntityType.valueOf(je.getEntity().getType()); + final EntityType entityType = EntityType.fromClass(je.getEntity().getClass()); - List children = je - .getLinks() + final List links = je.getLinks(); + List children = links .stream() .filter(link -> isDuplicate(link)) .map(link -> { - final String targetType = link.getRelatedEntity().getType(); + final String targetType = link.getTarget().getType(); final String name = ModelSupport.getMainType(EntityType.valueOf(targetType)); final HashSet fields = Sets.newHashSet(mapFields(link, null)); return templateFactory - .getChild(name, link.getRelatedEntity().getId(), Lists.newArrayList(fields)); + .getChild(name, link.getTarget().getId(), Lists.newArrayList(fields)); }) .collect(Collectors.toCollection(ArrayList::new)); @@ -1227,7 +1230,7 @@ public class XmlRecordFactory implements Serializable { return children; } - private boolean isDuplicate(Tuple2 link) { + private boolean isDuplicate(RelatedEntityWrapper link) { return REL_SUBTYPE_DEDUP.equalsIgnoreCase(link.getRelation().getSubRelType()); } diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml index 15d352790..0d5121cf1 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml @@ -104,7 +104,6 @@ ${wf:conf('resumeFrom') eq 'prepare_relations'} ${wf:conf('resumeFrom') eq 'fork_join_related_entities'} ${wf:conf('resumeFrom') eq 'fork_join_all_entities'} - ${wf:conf('resumeFrom') eq 'adjancency_lists'} ${wf:conf('resumeFrom') eq 'convert_to_xml'} ${wf:conf('resumeFrom') eq 'to_solr_index'} @@ -373,7 +372,7 @@ --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication --inputRelatedEntitiesPath${workingDir}/join_partial --outputPath${workingDir}/join_entities/publication - --numPartitions35000 + --numPartitions30000 @@ -394,7 +393,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=15360 + --conf spark.sql.shuffle.partitions=7680 --conf spark.network.timeout=${sparkNetworkTimeout} --inputEntityPath${inputGraphRootPath}/dataset @@ -422,7 +421,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=15360 + --conf spark.sql.shuffle.partitions=7680 --conf spark.network.timeout=${sparkNetworkTimeout} --inputEntityPath${inputGraphRootPath}/otherresearchproduct @@ -450,7 +449,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=15360 + --conf spark.sql.shuffle.partitions=3840 --conf spark.network.timeout=${sparkNetworkTimeout} --inputEntityPath${inputGraphRootPath}/software @@ -478,7 +477,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=15360 + --conf spark.sql.shuffle.partitions=7680 --conf spark.network.timeout=${sparkNetworkTimeout} --inputEntityPath${inputGraphRootPath}/datasource @@ -506,7 +505,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=15360 + --conf spark.sql.shuffle.partitions=7680 --conf spark.network.timeout=${sparkNetworkTimeout} --inputEntityPath${inputGraphRootPath}/organization @@ -534,7 +533,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=15360 + --conf spark.sql.shuffle.partitions=3840 --conf spark.network.timeout=${sparkNetworkTimeout} --inputEntityPath${inputGraphRootPath}/project @@ -547,32 +546,7 @@ - - - - - yarn - cluster - build_adjacency_lists - eu.dnetlib.dhp.oa.provision.AdjacencyListBuilderJob - dhp-graph-provision-${projectVersion}.jar - - --executor-cores=${sparkExecutorCoresForJoining} - --executor-memory=${sparkExecutorMemoryForJoining} - --driver-memory=${sparkDriverMemoryForJoining} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=15360 - --conf spark.network.timeout=${sparkNetworkTimeout} - - --inputPath${workingDir}/join_entities - --outputPath${workingDir}/joined - - - - + @@ -592,7 +566,7 @@ --conf spark.sql.shuffle.partitions=3840 --conf spark.network.timeout=${sparkNetworkTimeout} - --inputPath${workingDir}/joined + --inputPath${workingDir}/join_entities --outputPath${workingDir}/xml --isLookupUrl${isLookupUrl} --otherDsTypeId${otherDsTypeId} @@ -622,7 +596,7 @@ --conf spark.hadoop.mapreduce.reduce.speculative=false --inputPath${workingDir}/xml - --isLookupUrl ${isLookupUrl} + --isLookupUrl${isLookupUrl} --format${format} --batchSize${batchSize}