From c8f4b95464197958dab5aad0693b36c7360b1623 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 6 Apr 2020 08:59:58 +0200 Subject: [PATCH] dataset based provision WIP --- .../oa/provision/AdjacencyListBuilderJob.java | 91 +++++------------- .../CreateRelatedEntitiesJob_phase1.java | 8 +- .../CreateRelatedEntitiesJob_phase2.java | 94 +++++++++++++------ .../dhp/oa/provision/PrepareRelationsJob.java | 46 ++++----- .../dhp/oa/provision/model/JoinedEntity.java | 18 +--- .../dnetlib/dhp/oa/provision/model/Links.java | 6 +- .../oa/provision/model/SortableRelation.java | 3 +- .../dhp/oa/provision/model/Tuple2.java | 3 + .../oa/provision/utils/GraphMappingUtils.java | 6 +- .../oa/provision/utils/XmlRecordFactory.java | 66 +++++++++---- .../input_params_prepare_relations.json | 6 ++ ...input_params_related_entities_pahase2.json | 8 +- .../dhp/oa/provision/oozie_app/workflow.xml | 49 ++++++---- 13 files changed, 224 insertions(+), 180 deletions(-) diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java index dcb3ac171..291a44858 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java @@ -9,19 +9,22 @@ import eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils; import eu.dnetlib.dhp.schema.oaf.*; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.api.java.function.*; import org.apache.spark.rdd.RDD; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.expressions.Encode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Tuple2; import java.io.IOException; +import java.util.Iterator; import java.util.Optional; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.*; @@ -57,6 +60,7 @@ public class AdjacencyListBuilderJob { private static final Logger log = LoggerFactory.getLogger(AdjacencyListBuilderJob.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + public static final int MAX_LINKS = 100; public static void main(String[] args) throws Exception { @@ -92,72 +96,27 @@ public class AdjacencyListBuilderJob { private static void createAdjacencyLists(SparkSession spark, String inputPath, String outputPath) { - RDD joined = spark.read() + log.info("Reading joined entities from: {}", inputPath); + spark.read() .load(inputPath) - .as(Encoders.kryo(EntityRelEntity.class)) - .javaRDD() - .map(e -> getJoinedEntity(e)) - .mapToPair(e -> new Tuple2<>(e.getEntity().getId(), e)) - .reduceByKey((j1, j2) -> getJoinedEntity(j1, j2)) - .map(Tuple2::_2) - .rdd(); - - spark.createDataset(joined, Encoders.bean(JoinedEntity.class)) + .as(Encoders.bean(EntityRelEntity.class)) + .groupByKey((MapFunction) value -> value.getEntity().getId(), Encoders.STRING()) + .mapGroups((MapGroupsFunction) (key, values) -> { + JoinedEntity j = new JoinedEntity(); + Links links = new Links(); + while (values.hasNext() && links.size() < MAX_LINKS) { + EntityRelEntity curr = values.next(); + if (j.getEntity() == null) { + j.setEntity(curr.getEntity()); + } + links.add(new Tuple2(curr.getRelation(), curr.getTarget())); + } + j.setLinks(links); + return j; + }, Encoders.bean(JoinedEntity.class)) .write() .mode(SaveMode.Overwrite) .parquet(outputPath); - - } - - private static JoinedEntity getJoinedEntity(JoinedEntity j1, JoinedEntity j2) { - JoinedEntity je = new JoinedEntity(); - je.setEntity(je.getEntity()); - je.setType(j1.getType()); - - Links links = new Links(); - links.addAll(j1.getLinks()); - links.addAll(j2.getLinks()); - - return je; - } - - private static JoinedEntity getJoinedEntity(EntityRelEntity e) { - JoinedEntity j = new JoinedEntity(); - j.setEntity(toOafEntity(e.getEntity())); - j.setType(EntityType.valueOf(e.getEntity().getType())); - Links links = new Links(); - links.add(new eu.dnetlib.dhp.oa.provision.model.Tuple2(e.getRelation(), e.getTarget())); - j.setLinks(links); - return j; - } - - private static OafEntity toOafEntity(TypedRow typedRow) { - return parseOaf(typedRow.getOaf(), typedRow.getType()); - } - - private static OafEntity parseOaf(final String json, final String type) { - try { - switch (GraphMappingUtils.EntityType.valueOf(type)) { - case publication: - return OBJECT_MAPPER.readValue(json, Publication.class); - case dataset: - return OBJECT_MAPPER.readValue(json, Dataset.class); - case otherresearchproduct: - return OBJECT_MAPPER.readValue(json, OtherResearchProduct.class); - case software: - return OBJECT_MAPPER.readValue(json, Software.class); - case datasource: - return OBJECT_MAPPER.readValue(json, Datasource.class); - case organization: - return OBJECT_MAPPER.readValue(json, Organization.class); - case project: - return OBJECT_MAPPER.readValue(json, Project.class); - default: - throw new IllegalArgumentException("invalid type: " + type); - } - } catch (IOException e) { - throw new IllegalArgumentException(e); - } } private static void removeOutputDir(SparkSession spark, String path) { diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java index 0b153f826..0e3a5e472 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java @@ -42,17 +42,15 @@ import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.*; * for each entity type E_i * join (R.target = E_i.id), * map E_i as RelatedEntity T_i, extracting only the necessary information beforehand to produce [R - T_i] - * save the tuples [R - T_i] in append mode * * 3) CreateRelatedEntitiesJob_phase2: * prepare tuples [source entity - relation - target entity] (S - R - T): * create the union of the each entity type, hash by id (S) * for each [R - T_i] produced in phase1 * join S.id = [R - T_i].source to produce (S_i - R - T_i) - * save in append mode * * 4) AdjacencyListBuilderJob: - * given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mappnig the result as JoinedEntity + * given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mapping the result as JoinedEntity * * 5) XmlConverterJob: * convert the JoinedEntities as XML records @@ -121,8 +119,8 @@ public class CreateRelatedEntitiesJob_phase1 { t -> new EntityRelEntity(t._1()._2(), GraphMappingUtils.asRelatedEntity(t._2()._2(), entityClazz)), Encoders.bean(EntityRelEntity.class)) .write() - .mode(SaveMode.Append) - .parquet(outputPath); + .mode(SaveMode.Overwrite) + .parquet(outputPath + "/" + EntityType.fromClass(entityClazz)); } private static Dataset readPathEntity(SparkSession spark, String inputEntityPath, Class entityClazz) { diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java index 85a9113f2..645883f6f 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java @@ -1,5 +1,6 @@ package eu.dnetlib.dhp.oa.provision; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; @@ -8,7 +9,10 @@ import eu.dnetlib.dhp.oa.provision.model.TypedRow; import eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils; import eu.dnetlib.dhp.schema.oaf.*; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -17,7 +21,10 @@ import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Tuple2; +import scala.collection.JavaConverters; +import scala.collection.Seq; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.function.Function; @@ -68,7 +75,7 @@ public class CreateRelatedEntitiesJob_phase2 { String jsonConfiguration = IOUtils.toString( PrepareRelationsJob.class - .getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase1.json")); + .getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase2.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); @@ -81,14 +88,14 @@ public class CreateRelatedEntitiesJob_phase2 { String inputRelatedEntitiesPath = parser.get("inputRelatedEntitiesPath"); log.info("inputRelatedEntitiesPath: {}", inputRelatedEntitiesPath); - String inputGraphPath = parser.get("inputGraphPath"); - log.info("inputGraphPath: {}", inputGraphPath); + String inputGraphRootPath = parser.get("inputGraphRootPath"); + log.info("inputGraphRootPath: {}", inputGraphRootPath); String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); - String graphTableClassName = parser.get("graphTableClassName"); - log.info("graphTableClassName: {}", graphTableClassName); + int numPartitions = Integer.parseInt(parser.get("numPartitions")); + log.info("numPartitions: {}", numPartitions); SparkConf conf = new SparkConf(); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); @@ -97,14 +104,14 @@ public class CreateRelatedEntitiesJob_phase2 { runWithSparkSession(conf, isSparkSessionManaged, spark -> { removeOutputDir(spark, outputPath); - joinAllEntities(spark, inputRelatedEntitiesPath, inputGraphPath, outputPath); + joinAllEntities(spark, inputRelatedEntitiesPath, inputGraphRootPath, outputPath, numPartitions); }); } - private static void joinAllEntities(SparkSession spark, String inputRelatedEntitiesPath, String inputGraphPath, String outputPath) { + private static void joinAllEntities(SparkSession spark, String inputRelatedEntitiesPath, String inputGraphRootPath, String outputPath, int numPartitions) { + Dataset> entities = readAllEntities(spark, inputGraphRootPath, numPartitions); Dataset> relsBySource = readRelatedEntities(spark, inputRelatedEntitiesPath); - Dataset> entities = readAllEntities(spark, inputGraphPath); entities .joinWith(relsBySource, entities.col("_1").equalTo(relsBySource.col("_1")), "left_outer") @@ -118,51 +125,76 @@ public class CreateRelatedEntitiesJob_phase2 { } return re; }, Encoders.bean(EntityRelEntity.class)) + .repartition(numPartitions) + .filter((FilterFunction) value -> value.getEntity() != null && StringUtils.isNotBlank(value.getEntity().getId())) .write() .mode(SaveMode.Overwrite) .parquet(outputPath); } - private static Dataset> readAllEntities(SparkSession spark, String inputGraphPath) { - return GraphMappingUtils.entityTypes.entrySet() - .stream() - .map((Function, Dataset>) - e -> readPathEntity(spark, inputGraphPath + "/" + e.getKey().name(), e.getValue()) - .map((MapFunction) entity -> { - TypedRow t = new TypedRow(); - t.setType(e.getKey().name()); - t.setDeleted(entity.getDataInfo().getDeletedbyinference()); - t.setId(entity.getId()); - t.setOaf(OBJECT_MAPPER.writeValueAsString(entity)); - return t; - }, Encoders.bean(TypedRow.class))) - .reduce(spark.emptyDataset(Encoders.bean(TypedRow.class)), Dataset::union) + private static Dataset> readAllEntities(SparkSession spark, String inputGraphPath, int numPartitions) { + Dataset publication = readPathEntity(spark, inputGraphPath + "/publication", Publication.class); + Dataset dataset = readPathEntity(spark, inputGraphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class); + Dataset other = readPathEntity(spark, inputGraphPath + "/otherresearchproduct", OtherResearchProduct.class); + Dataset software = readPathEntity(spark, inputGraphPath + "/software", Software.class); + Dataset datasource = readPathEntity(spark, inputGraphPath + "/datasource", Datasource.class); + Dataset organization = readPathEntity(spark, inputGraphPath + "/organization", Organization.class); + Dataset project = readPathEntity(spark, inputGraphPath + "/project", Project.class); + + return publication + .union(dataset) + .union(other) + .union(software) + .union(datasource) + .union(organization) + .union(project) .map((MapFunction>) - value -> new Tuple2<>(value.getId(), value), - Encoders.tuple(Encoders.STRING(), Encoders.kryo(TypedRow.class))); + value -> new Tuple2<>(value.getId(), value), + Encoders.tuple(Encoders.STRING(), Encoders.kryo(TypedRow.class))) + .repartition(numPartitions); } private static Dataset> readRelatedEntities(SparkSession spark, String inputRelatedEntitiesPath) { + + log.info("Reading related entities from: {}", inputRelatedEntitiesPath); + + final List paths = HdfsSupport.listFiles(inputRelatedEntitiesPath, spark.sparkContext().hadoopConfiguration()); + + log.info("Found paths: {}", String.join(",", paths)); + return spark.read() - .load(inputRelatedEntitiesPath) - .as(Encoders.kryo(EntityRelEntity.class)) + .load(toSeq(paths)) + .as(Encoders.bean(EntityRelEntity.class)) .map((MapFunction>) - value -> new Tuple2<>(value.getRelation().getSource(), value), - Encoders.tuple(Encoders.STRING(), Encoders.kryo(EntityRelEntity.class))); + value -> new Tuple2<>(value.getRelation().getSource(), value), + Encoders.tuple(Encoders.STRING(), Encoders.kryo(EntityRelEntity.class))); } - - private static Dataset readPathEntity(SparkSession spark, String inputEntityPath, Class entityClazz) { + private static Dataset readPathEntity(SparkSession spark, String inputEntityPath, Class entityClazz) { log.info("Reading Graph table from: {}", inputEntityPath); return spark .read() .textFile(inputEntityPath) - .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, entityClazz), Encoders.bean(entityClazz)); + .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, entityClazz), Encoders.bean(entityClazz)) + .map((MapFunction) value -> getTypedRow(StringUtils.substringAfterLast(inputEntityPath, "/"), value), Encoders.bean(TypedRow.class)); + } + + private static TypedRow getTypedRow(String type, OafEntity entity) throws JsonProcessingException { + TypedRow t = new TypedRow(); + t.setType(type); + t.setDeleted(entity.getDataInfo().getDeletedbyinference()); + t.setId(entity.getId()); + t.setOaf(OBJECT_MAPPER.writeValueAsString(entity)); + return t; } private static void removeOutputDir(SparkSession spark, String path) { HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } + private static Seq toSeq(List list) { + return JavaConverters.asScalaIteratorConverter(list.iterator()).asScala().toSeq(); + } + } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java index 19599b52c..4c324a4c4 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java @@ -1,25 +1,30 @@ package eu.dnetlib.dhp.oa.provision; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.oa.provision.model.SortableRelation; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.FlatMapGroupsFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.rdd.RDD; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Tuple2; +import scala.math.Ordering; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.Optional; @@ -78,34 +83,24 @@ public class PrepareRelationsJob { String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); + int numPartitions = Integer.parseInt(parser.get("relPartitions")); + log.info("relPartitions: {}", numPartitions); + SparkConf conf = new SparkConf(); runWithSparkSession(conf, isSparkSessionManaged, spark -> { removeOutputDir(spark, outputPath); - prepareRelationsFromPaths(spark, inputRelationsPath, outputPath); + prepareRelationsFromPaths(spark, inputRelationsPath, outputPath, numPartitions); }); } - private static void prepareRelationsFromPaths(SparkSession spark, String inputRelationsPath, String outputPath) { - RDD rels = readPathRelation(spark, inputRelationsPath) - .filter((FilterFunction) r -> r.getDataInfo().getDeletedbyinference() == false) - .javaRDD() - .mapToPair((PairFunction>) rel -> new Tuple2<>( - rel.getSource(), - Lists.newArrayList(rel))) - .reduceByKey((v1, v2) -> { - v1.addAll(v2); - v1.sort(SortableRelation::compareTo); - if (v1.size() > MAX_RELS) { - return v1.subList(0, MAX_RELS); - } - return new ArrayList<>(v1.subList(0, MAX_RELS)); - }) - .flatMap(r -> r._2().iterator()) - .rdd(); - - spark.createDataset(rels, Encoders.bean(SortableRelation.class)) + private static void prepareRelationsFromPaths(SparkSession spark, String inputRelationsPath, String outputPath, int numPartitions) { + readPathRelation(spark, inputRelationsPath) + .filter((FilterFunction) value -> value.getDataInfo().getDeletedbyinference() == false) + .groupByKey((MapFunction) value -> value.getSource(), Encoders.STRING()) + .flatMapGroups((FlatMapGroupsFunction) (key, values) -> Iterators.limit(values, MAX_RELS), Encoders.bean(SortableRelation.class)) + .repartition(numPartitions) .write() .mode(SaveMode.Overwrite) .parquet(outputPath); @@ -121,8 +116,7 @@ public class PrepareRelationsJob { private static Dataset readPathRelation(SparkSession spark, final String inputPath) { return spark.read() .textFile(inputPath) - .map((MapFunction) s -> OBJECT_MAPPER.readValue(s, SortableRelation.class), - Encoders.bean(SortableRelation.class)); + .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, SortableRelation.class), Encoders.bean(SortableRelation.class)); } private static void removeOutputDir(SparkSession spark, String path) { diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java index 4dd434804..8d1c79798 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java @@ -1,31 +1,21 @@ package eu.dnetlib.dhp.oa.provision.model; -import eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils; -import eu.dnetlib.dhp.schema.oaf.OafEntity; - import java.io.Serializable; public class JoinedEntity implements Serializable { - private GraphMappingUtils.EntityType type; - - private OafEntity entity; + private TypedRow entity; private Links links; - public GraphMappingUtils.EntityType getType() { - return type; + public JoinedEntity() { } - public void setType(GraphMappingUtils.EntityType type) { - this.type = type; - } - - public OafEntity getEntity() { + public TypedRow getEntity() { return entity; } - public void setEntity(OafEntity entity) { + public void setEntity(TypedRow entity) { this.entity = entity; } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Links.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Links.java index 4ea194876..f23d96190 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Links.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Links.java @@ -1,6 +1,10 @@ package eu.dnetlib.dhp.oa.provision.model; +import java.io.Serializable; import java.util.HashSet; -public class Links extends HashSet { +public class Links extends HashSet implements Serializable { + + public Links() { + } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelation.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelation.java index 430779c72..b294a6633 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelation.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelation.java @@ -4,9 +4,10 @@ import com.google.common.collect.ComparisonChain; import com.google.common.collect.Maps; import eu.dnetlib.dhp.schema.oaf.Relation; +import java.io.Serializable; import java.util.Map; -public class SortableRelation extends Relation implements Comparable { +public class SortableRelation extends Relation implements Comparable, Serializable { private final static Map weights = Maps.newHashMap(); diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Tuple2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Tuple2.java index f1e2c652c..942acaea1 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Tuple2.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Tuple2.java @@ -11,6 +11,9 @@ public class Tuple2 implements Serializable { private RelatedEntity relatedEntity; + public Tuple2() { + } + public Tuple2(Relation relation, RelatedEntity relatedEntity) { this.relation = relation; this.relatedEntity = relatedEntity; diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/GraphMappingUtils.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/GraphMappingUtils.java index 8418db8e6..b65c88201 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/GraphMappingUtils.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/GraphMappingUtils.java @@ -102,11 +102,11 @@ public class GraphMappingUtils { entityMapping.get(EntityType.valueOf(targetType)).name()); } - public static String getMainType(final String type) { - return entityMapping.get(EntityType.valueOf(type)).name(); + public static String getMainType(final EntityType type) { + return entityMapping.get(type).name(); } - public static boolean isResult(String type) { + public static boolean isResult(EntityType type) { return MainEntityType.result.name().equals(getMainType(type)); } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java index f2b3aa2e7..9c339d41c 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java @@ -1,5 +1,6 @@ package eu.dnetlib.dhp.oa.provision.utils; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Joiner; import com.google.common.base.Splitter; import com.google.common.collect.Lists; @@ -48,6 +49,8 @@ public class XmlRecordFactory implements Serializable { private boolean indent = false; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + public XmlRecordFactory( final ContextMapper contextMapper, final boolean indent, final String schemaLocation, final String otherDatasourceTypesUForUI) { @@ -72,22 +75,24 @@ public class XmlRecordFactory implements Serializable { final Set contexts = Sets.newHashSet(); - final OafEntity entity = je.getEntity(); + final OafEntity entity = toOafEntity(je.getEntity()); TemplateFactory templateFactory = new TemplateFactory(); try { - final List metadata = metadata(je.getType(), entity, contexts); + final EntityType type = GraphMappingUtils.EntityType.valueOf(je.getEntity().getType()); + final List metadata = metadata(type, entity, contexts); // rels has to be processed before the contexts because they enrich the contextMap with the funding info. final List relations = listRelations(je, templateFactory, contexts); - metadata.addAll(buildContexts(getMainType(je.getType()), contexts)); + final String mainType = getMainType(type); + metadata.addAll(buildContexts(mainType, contexts)); metadata.add(XmlSerializationUtils.parseDataInfo(entity.getDataInfo())); final String body = templateFactory.buildBody( - getMainType(je.getType()), + mainType, metadata, relations, - listChildren(je, templateFactory), listExtraInfo(je)); + listChildren(entity, je.getEntity().getType(), templateFactory), listExtraInfo(entity)); return printXML(templateFactory.buildRecord(entity, schemaLocation, body), indent); } catch (final Throwable e) { @@ -95,6 +100,35 @@ public class XmlRecordFactory implements Serializable { } } + private static OafEntity toOafEntity(TypedRow typedRow) { + return parseOaf(typedRow.getOaf(), typedRow.getType()); + } + + private static OafEntity parseOaf(final String json, final String type) { + try { + switch (GraphMappingUtils.EntityType.valueOf(type)) { + case publication: + return OBJECT_MAPPER.readValue(json, Publication.class); + case dataset: + return OBJECT_MAPPER.readValue(json, Dataset.class); + case otherresearchproduct: + return OBJECT_MAPPER.readValue(json, OtherResearchProduct.class); + case software: + return OBJECT_MAPPER.readValue(json, Software.class); + case datasource: + return OBJECT_MAPPER.readValue(json, Datasource.class); + case organization: + return OBJECT_MAPPER.readValue(json, Organization.class); + case project: + return OBJECT_MAPPER.readValue(json, Project.class); + default: + throw new IllegalArgumentException("invalid type: " + type); + } + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + } + private String printXML(String xml, boolean indent) { try { final Document doc = new SAXReader().read(new StringReader(xml)); @@ -110,7 +144,7 @@ public class XmlRecordFactory implements Serializable { } } - private List metadata(final String type, final OafEntity entity, final Set contexts) { + private List metadata(final EntityType type, final OafEntity entity, final Set contexts) { final List metadata = Lists.newArrayList(); @@ -262,7 +296,7 @@ public class XmlRecordFactory implements Serializable { metadata.add(XmlSerializationUtils.mapQualifier("bestaccessright", getBestAccessright(r))); } - switch (EntityType.valueOf(type)) { + switch (type) { case publication: final Publication pub = (Publication) entity; @@ -746,14 +780,14 @@ public class XmlRecordFactory implements Serializable { return rels; } - private List listChildren(final JoinedEntity je, TemplateFactory templateFactory) { + private List listChildren(final OafEntity entity, String type, TemplateFactory templateFactory) { final List children = Lists.newArrayList(); - - if (MainEntityType.result.toString().equals(getMainType(je.getType()))) { - final List instances = ((Result) je.getEntity()).getInstance(); + EntityType entityType = EntityType.valueOf(type); + if (MainEntityType.result.toString().equals(getMainType(entityType))) { + final List instances = ((Result) entity).getInstance(); if (instances != null) { - for (final Instance instance : ((Result) je.getEntity()).getInstance()) { + for (final Instance instance : ((Result) entity).getInstance()) { final List fields = Lists.newArrayList(); @@ -788,9 +822,9 @@ public class XmlRecordFactory implements Serializable { children.add(templateFactory.getInstance(instance.getHostedby().getKey(), fields, instance.getUrl())); } } - final List ext = ((Result) je.getEntity()).getExternalReference(); + final List ext = ((Result) entity).getExternalReference(); if (ext != null) { - for (final ExternalReference er : ((Result) je.getEntity()).getExternalReference()) { + for (final ExternalReference er : ((Result) entity).getExternalReference()) { final List fields = Lists.newArrayList(); @@ -824,8 +858,8 @@ public class XmlRecordFactory implements Serializable { return children; } - private List listExtraInfo(JoinedEntity je) { - final List extraInfo = je.getEntity().getExtraInfo(); + private List listExtraInfo(OafEntity entity) { + final List extraInfo = entity.getExtraInfo(); return extraInfo != null ? extraInfo .stream() .map(e -> XmlSerializationUtils.mapExtraInfo(e)) diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json index 043129c9f..bfb248d01 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json @@ -16,5 +16,11 @@ "paramLongName": "outputPath", "paramDescription": "root output location for prepared relations", "paramRequired": true + }, + { + "paramName": "rp", + "paramLongName": "relPartitions", + "paramDescription": "number or partitions for the relations Dataset", + "paramRequired": true } ] diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase2.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase2.json index cb7949d49..2727f153b 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase2.json +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase2.json @@ -13,7 +13,7 @@ }, { "paramName": "iep", - "paramLongName": "inputGraphPath", + "paramLongName": "inputGraphRootPath", "paramDescription": "root graph path", "paramRequired": true }, @@ -22,5 +22,11 @@ "paramLongName": "outputPath", "paramDescription": "root output location for prepared relations", "paramRequired": true + }, + { + "paramName": "np", + "paramLongName": "numPartitions", + "paramDescription": "number of partitions to use for the output", + "paramRequired": true } ] diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml index 33b9291c4..4a78df5b0 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml @@ -5,6 +5,10 @@ inputGraphRootPath root location of input materialized graph + + isLookupUrl + URL for the isLookup service + sparkDriverMemoryForJoining @@ -97,6 +101,7 @@ --inputRelationsPath${inputGraphRootPath}/relation --outputPath${workingDir}/relation + --relPartitions3000 @@ -128,13 +133,14 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 + --conf spark.network.timeout=${sparkNetworkTimeout} - --inputRelationsPath${workingDir}/relations + --inputRelationsPath${workingDir}/relation --inputEntityPath${inputGraphRootPath}/publication --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication --outputPath${workingDir}/join_partial - + @@ -154,13 +160,14 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 + --conf spark.network.timeout=${sparkNetworkTimeout} - --inputRelationsPath${workingDir}/relations + --inputRelationsPath${workingDir}/relation --inputEntityPath${inputGraphRootPath}/dataset --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset --outputPath${workingDir}/join_partial - + @@ -180,13 +187,14 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 + --conf spark.network.timeout=${sparkNetworkTimeout} - --inputRelationsPath${workingDir}/relations + --inputRelationsPath${workingDir}/relation --inputEntityPath${inputGraphRootPath}/otherresearchproduct --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct --outputPath${workingDir}/join_partial - + @@ -206,13 +214,14 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 + --conf spark.network.timeout=${sparkNetworkTimeout} - --inputRelationsPath${workingDir}/relations + --inputRelationsPath${workingDir}/relation --inputEntityPath${inputGraphRootPath}/software --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software --outputPath${workingDir}/join_partial - + @@ -232,13 +241,14 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 + --conf spark.network.timeout=${sparkNetworkTimeout} - --inputRelationsPath${workingDir}/relations + --inputRelationsPath${workingDir}/relation --inputEntityPath${inputGraphRootPath}/datasource --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Datasource --outputPath${workingDir}/join_partial - + @@ -258,13 +268,14 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 + --conf spark.network.timeout=${sparkNetworkTimeout} - --inputRelationsPath${workingDir}/relations + --inputRelationsPath${workingDir}/relation --inputEntityPath${inputGraphRootPath}/organization --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Organization --outputPath${workingDir}/join_partial - + @@ -284,17 +295,19 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 + --conf spark.network.timeout=${sparkNetworkTimeout} - --inputRelationsPath${workingDir}/relations + --inputRelationsPath${workingDir}/relation --inputEntityPath${inputGraphRootPath}/project --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Project --outputPath${workingDir}/join_partial - + - + + @@ -312,10 +325,12 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 + --conf spark.network.timeout=${sparkNetworkTimeout} - --inputEntityPath${inputGraphRootPath} + --inputGraphRootPath${inputGraphRootPath} --inputRelatedEntitiesPath${workingDir}/join_partial --outputPath${workingDir}/join_entities + --numPartitions12000 @@ -337,6 +352,7 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 + --conf spark.network.timeout=${sparkNetworkTimeout} --inputPath ${workingDir}/join_entities --outputPath${workingDir}/joined @@ -361,6 +377,7 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 + --conf spark.network.timeout=${sparkNetworkTimeout} --inputPath${workingDir}/joined --outputPath${workingDir}/xml