From ca345aaad33139c85e105fab671cf59a0570e646 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 6 Apr 2020 15:33:31 +0200 Subject: [PATCH] dataset based provision WIP --- .../oa/provision/AdjacencyListBuilderJob.java | 41 ++++++++--------- .../CreateRelatedEntitiesJob_phase1.java | 45 ++++++++++--------- .../CreateRelatedEntitiesJob_phase2.java | 34 ++++++-------- .../dhp/oa/provision/PrepareRelationsJob.java | 32 ++++++------- .../dhp/oa/provision/XmlConverterJob.java | 45 +++++++++++++++---- .../dhp/oa/provision/model/JoinedEntity.java | 7 +-- .../dnetlib/dhp/oa/provision/model/Links.java | 10 ----- .../oa/provision/utils/GraphMappingUtils.java | 16 +++---- .../oa/provision/utils/XmlRecordFactory.java | 2 +- .../dhp/oa/provision/oozie_app/workflow.xml | 10 ++--- 10 files changed, 124 insertions(+), 118 deletions(-) delete mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Links.java diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java index 291a44858..2cc52fb62 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java @@ -3,31 +3,25 @@ package eu.dnetlib.dhp.oa.provision; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.oa.provision.model.*; -import eu.dnetlib.dhp.oa.provision.utils.ContextMapper; -import eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils; -import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity; +import eu.dnetlib.dhp.oa.provision.model.JoinedEntity; +import eu.dnetlib.dhp.oa.provision.model.Tuple2; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.*; -import org.apache.spark.rdd.RDD; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.catalyst.expressions.Encode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.Iterator; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; -import java.util.Spliterator; -import java.util.Spliterators; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.*; +import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.getKryoClasses; /** * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. @@ -43,14 +37,19 @@ import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.*; * can be linked at most to 100 other objects * * 2) JoinRelationEntityByTargetJob: - * prepare tuples [source entity - relation - target entity] (S - R - T): + * (phase 1): prepare tuples [relation - target entity] (R - T): * for each entity type E_i - * join (R.target = E_i.id), - * map E_i as RelatedEntity T_i, extracting only the necessary information beforehand to produce [R - T_i] - * join (E_i.id = [R - T_i].source), where E_i becomes the source entity S + * map E_i as RelatedEntity T_i to simplify the model and extracting only the necessary information + * join (R.target = T_i.id) + * save the tuples (R_i, T_i) + * (phase 2): + * create the union of all the entity types E, hash by id + * read the tuples (R, T), hash by R.source + * join E.id = (R, T).source, where E becomes the Source Entity S + * save the tuples (S, R, T) * * 3) AdjacencyListBuilderJob: - * given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mappnig the result as JoinedEntity + * given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mapping the result as JoinedEntity * * 4) XmlConverterJob: * convert the JoinedEntities as XML records @@ -59,7 +58,6 @@ public class AdjacencyListBuilderJob { private static final Logger log = LoggerFactory.getLogger(AdjacencyListBuilderJob.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public static final int MAX_LINKS = 100; public static void main(String[] args) throws Exception { @@ -91,7 +89,6 @@ public class AdjacencyListBuilderJob { removeOutputDir(spark, outputPath); createAdjacencyLists(spark, inputPath, outputPath); }); - } private static void createAdjacencyLists(SparkSession spark, String inputPath, String outputPath) { @@ -103,7 +100,7 @@ public class AdjacencyListBuilderJob { .groupByKey((MapFunction) value -> value.getEntity().getId(), Encoders.STRING()) .mapGroups((MapGroupsFunction) (key, values) -> { JoinedEntity j = new JoinedEntity(); - Links links = new Links(); + List links = new ArrayList<>(); while (values.hasNext() && links.size() < MAX_LINKS) { EntityRelEntity curr = values.next(); if (j.getEntity() == null) { diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java index 0e3a5e472..7d3555b6c 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java @@ -4,9 +4,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity; +import eu.dnetlib.dhp.oa.provision.model.RelatedEntity; import eu.dnetlib.dhp.oa.provision.model.SortableRelation; -import eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils; -import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.OafEntity; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; @@ -37,22 +37,22 @@ import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.*; * only consider relationships that are not virtually deleted ($.dataInfo.deletedbyinference == false), each entity * can be linked at most to 100 other objects * - * 2) CreateRelatedEntitiesJob_phase1: - * prepare tuples [relation - target entity] (R - T): + * 2) JoinRelationEntityByTargetJob: + * (phase 1): prepare tuples [relation - target entity] (R - T): * for each entity type E_i - * join (R.target = E_i.id), - * map E_i as RelatedEntity T_i, extracting only the necessary information beforehand to produce [R - T_i] + * map E_i as RelatedEntity T_i to simplify the model and extracting only the necessary information + * join (R.target = T_i.id) + * save the tuples (R_i, T_i) + * (phase 2): + * create the union of all the entity types E, hash by id + * read the tuples (R, T), hash by R.source + * join E.id = (R, T).source, where E becomes the Source Entity S + * save the tuples (S, R, T) * - * 3) CreateRelatedEntitiesJob_phase2: - * prepare tuples [source entity - relation - target entity] (S - R - T): - * create the union of the each entity type, hash by id (S) - * for each [R - T_i] produced in phase1 - * join S.id = [R - T_i].source to produce (S_i - R - T_i) - * - * 4) AdjacencyListBuilderJob: + * 3) AdjacencyListBuilderJob: * given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mapping the result as JoinedEntity * - * 5) XmlConverterJob: + * 4) XmlConverterJob: * convert the JoinedEntities as XML records */ public class CreateRelatedEntitiesJob_phase1 { @@ -103,20 +103,21 @@ public class CreateRelatedEntitiesJob_phase1 { private static void joinRelationEntity(SparkSession spark, String inputRelationsPath, String inputEntityPath, Class entityClazz, String outputPath) { Dataset> relsByTarget = readPathRelation(spark, inputRelationsPath) + .filter((FilterFunction) value -> value.getDataInfo().getDeletedbyinference() == false) .map((MapFunction>) r -> new Tuple2<>(r.getTarget(), r), - Encoders.tuple(Encoders.STRING(), Encoders.kryo(SortableRelation.class))); + Encoders.tuple(Encoders.STRING(), Encoders.kryo(SortableRelation.class))) + .cache(); - Dataset> entities = readPathEntity(spark, inputEntityPath, entityClazz) - .map((MapFunction>) e -> new Tuple2<>(e.getId(), e), - Encoders.tuple(Encoders.STRING(), Encoders.kryo(entityClazz))) + Dataset> entities = readPathEntity(spark, inputEntityPath, entityClazz) + .map((MapFunction) value -> asRelatedEntity(value, entityClazz), Encoders.bean(RelatedEntity.class)) + .map((MapFunction>) e -> new Tuple2<>(e.getId(), e), + Encoders.tuple(Encoders.STRING(), Encoders.kryo(RelatedEntity.class))) .cache(); relsByTarget .joinWith(entities, entities.col("_1").equalTo(relsByTarget.col("_1")), "inner") - .filter((FilterFunction, Tuple2>>) - value -> value._2()._2().getDataInfo().getDeletedbyinference() == false) - .map((MapFunction, Tuple2>, EntityRelEntity>) - t -> new EntityRelEntity(t._1()._2(), GraphMappingUtils.asRelatedEntity(t._2()._2(), entityClazz)), + .map((MapFunction, Tuple2>, EntityRelEntity>) + t -> new EntityRelEntity(t._1()._2(), t._2()._2()), Encoders.bean(EntityRelEntity.class)) .write() .mode(SaveMode.Overwrite) diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java index 645883f6f..2b5c627b6 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java @@ -6,11 +6,9 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity; import eu.dnetlib.dhp.oa.provision.model.TypedRow; -import eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils; import eu.dnetlib.dhp.schema.oaf.*; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; @@ -25,12 +23,10 @@ import scala.collection.JavaConverters; import scala.collection.Seq; import java.util.List; -import java.util.Map; import java.util.Optional; -import java.util.function.Function; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.*; +import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.getKryoClasses; /** * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. @@ -45,24 +41,22 @@ import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.*; * only consider relationships that are not virtually deleted ($.dataInfo.deletedbyinference == false), each entity * can be linked at most to 100 other objects * - * 2) CreateRelatedEntitiesJob_phase1: - * prepare tuples [relation - target entity] (R - T): + * 2) JoinRelationEntityByTargetJob: + * (phase 1): prepare tuples [relation - target entity] (R - T): * for each entity type E_i - * join (R.target = E_i.id), - * map E_i as RelatedEntity T_i, extracting only the necessary information beforehand to produce [R - T_i] - * save the tuples [R - T_i] in append mode + * map E_i as RelatedEntity T_i to simplify the model and extracting only the necessary information + * join (R.target = T_i.id) + * save the tuples (R_i, T_i) + * (phase 2): + * create the union of all the entity types E, hash by id + * read the tuples (R, T), hash by R.source + * join E.id = (R, T).source, where E becomes the Source Entity S + * save the tuples (S, R, T) * - * 3) CreateRelatedEntitiesJob_phase2: - * prepare tuples [source entity - relation - target entity] (S - R - T): - * create the union of the each entity type, hash by id (S) - * for each [R - T_i] produced in phase1 - * join S.id = [R - T_i].source to produce (S_i - R - T_i) - * save in append mode + * 3) AdjacencyListBuilderJob: + * given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mapping the result as JoinedEntity * - * 4) AdjacencyListBuilderJob: - * given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mappnig the result as JoinedEntity - * - * 5) XmlConverterJob: + * 4) XmlConverterJob: * convert the JoinedEntities as XML records */ public class CreateRelatedEntitiesJob_phase2 { diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java index 4c324a4c4..5a70e258f 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java @@ -1,31 +1,22 @@ package eu.dnetlib.dhp.oa.provision; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.oa.provision.model.SortableRelation; import org.apache.commons.io.IOUtils; -import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.FlatMapGroupsFunction; import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.rdd.RDD; -import org.apache.spark.sql.*; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Tuple2; -import scala.math.Ordering; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; import java.util.Optional; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; @@ -44,14 +35,19 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; * can be linked at most to 100 other objects * * 2) JoinRelationEntityByTargetJob: - * prepare tuples [source entity - relation - target entity] (S - R - T): + * (phase 1): prepare tuples [relation - target entity] (R - T): * for each entity type E_i - * join (R.target = E_i.id), - * map E_i as RelatedEntity T_i, extracting only the necessary information beforehand to produce [R - T_i] - * join (E_i.id = [R - T_i].source), where E_i becomes the source entity S + * map E_i as RelatedEntity T_i to simplify the model and extracting only the necessary information + * join (R.target = T_i.id) + * save the tuples (R_i, T_i) + * (phase 2): + * create the union of all the entity types E, hash by id + * read the tuples (R, T), hash by R.source + * join E.id = (R, T).source, where E becomes the Source Entity S + * save the tuples (S, R, T) * * 3) AdjacencyListBuilderJob: - * given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mappnig the result as JoinedEntity + * given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mapping the result as JoinedEntity * * 4) XmlConverterJob: * convert the JoinedEntities as XML records diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java index 74a36c580..910cd8543 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java @@ -1,5 +1,6 @@ package eu.dnetlib.dhp.oa.provision; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Maps; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; @@ -27,8 +28,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Tuple2; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; @@ -37,23 +41,25 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; * The operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization, * and all the possible relationships (similarity links produced by the Dedup process are excluded). * - * The operation is implemented by sequentially joining one entity type at time (E) with the relationships (R), and again - * by E, finally grouped by E.id; - * * The workflow is organized in different parts aimed to to reduce the complexity of the operation * 1) PrepareRelationsJob: * only consider relationships that are not virtually deleted ($.dataInfo.deletedbyinference == false), each entity * can be linked at most to 100 other objects * * 2) JoinRelationEntityByTargetJob: - * prepare tuples [source entity - relation - target entity] (S - R - T): + * (phase 1): prepare tuples [relation - target entity] (R - T): * for each entity type E_i - * join (R.target = E_i.id), - * map E_i as RelatedEntity T_i, extracting only the necessary information beforehand to produce [R - T_i] - * join (E_i.id = [R - T_i].source), where E_i becomes the source entity S + * map E_i as RelatedEntity T_i to simplify the model and extracting only the necessary information + * join (R.target = T_i.id) + * save the tuples (R_i, T_i) + * (phase 2): + * create the union of all the entity types E, hash by id + * read the tuples (R, T), hash by R.source + * join E.id = (R, T).source, where E becomes the Source Entity S + * save the tuples (S, R, T) * * 3) AdjacencyListBuilderJob: - * given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mappnig the result as JoinedEntity + * given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mapping the result as JoinedEntity * * 4) XmlConverterJob: * convert the JoinedEntities as XML records @@ -62,6 +68,8 @@ public class XmlConverterJob { private static final Logger log = LoggerFactory.getLogger(XmlConverterJob.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + public static final String schemaLocation = "https://www.openaire.eu/schema/1.0/oaf-1.0.xsd"; public static void main(String[] args) throws Exception { @@ -107,12 +115,31 @@ public class XmlConverterJob { spark.read() .load(inputPath) .as(Encoders.bean(JoinedEntity.class)) + /* .map((MapFunction) value -> OBJECT_MAPPER.writeValueAsString(value), Encoders.STRING()) + .write() + .option("codec", "org.apache.hadoop.io.compress.GzipCodec") + .text("/tmp/json"); + + spark.read() + .textFile("/tmp/json") + .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, JoinedEntity.class), Encoders.bean(JoinedEntity.class)) + .map((MapFunction) j -> { + if (j.getLinks() != null) { + j.setLinks(j.getLinks() + .stream() + .filter(t -> t.getRelation() != null & t.getRelatedEntity() != null) + .collect(Collectors.toCollection(ArrayList::new))); + } + return j; + }, Encoders.bean(JoinedEntity.class)) + + */ .map((MapFunction>) je -> new Tuple2<>( je.getEntity().getId(), recordFactory.build(je) ), Encoders.tuple(Encoders.STRING(), Encoders.STRING())) .javaRDD() - .mapToPair((PairFunction, String, String>) t -> t) + .mapToPair((PairFunction, Text, Text>) t -> new Tuple2<>(new Text(t._1()), new Text(t._2()))) .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java index 8d1c79798..b6e97a503 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java @@ -1,12 +1,13 @@ package eu.dnetlib.dhp.oa.provision.model; import java.io.Serializable; +import java.util.List; public class JoinedEntity implements Serializable { private TypedRow entity; - private Links links; + private List links; public JoinedEntity() { } @@ -19,11 +20,11 @@ public class JoinedEntity implements Serializable { this.entity = entity; } - public Links getLinks() { + public List getLinks() { return links; } - public void setLinks(Links links) { + public void setLinks(List links) { this.links = links; } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Links.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Links.java deleted file mode 100644 index f23d96190..000000000 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Links.java +++ /dev/null @@ -1,10 +0,0 @@ -package eu.dnetlib.dhp.oa.provision.model; - -import java.io.Serializable; -import java.util.HashSet; - -public class Links extends HashSet implements Serializable { - - public Links() { - } -} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/GraphMappingUtils.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/GraphMappingUtils.java index b65c88201..398a272e2 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/GraphMappingUtils.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/GraphMappingUtils.java @@ -114,7 +114,7 @@ public class GraphMappingUtils { final RelatedEntity re = new RelatedEntity(); re.setId(entity.getId()); - re.setType(clazz.getName()); + re.setType(EntityType.fromClass(clazz).name()); re.setPid(entity.getPid()); re.setCollectedfrom(entity.getCollectedfrom()); @@ -125,16 +125,16 @@ public class GraphMappingUtils { case otherresearchproduct: case software: - Result r = (Result) entity; + Result result = (Result) entity; - if (r.getTitle() == null && !r.getTitle().isEmpty()) { - re.setTitle(r.getTitle().stream().findFirst().get()); + if (result.getTitle() == null && !result.getTitle().isEmpty()) { + re.setTitle(result.getTitle().stream().findFirst().get()); } - re.setDateofacceptance(getValue(r.getDateofacceptance())); - re.setPublisher(getValue(r.getPublisher())); - re.setResulttype(re.getResulttype()); - re.setInstances(re.getInstances()); + re.setDateofacceptance(getValue(result.getDateofacceptance())); + re.setPublisher(getValue(result.getPublisher())); + re.setResulttype(result.getResulttype()); + re.setInstances(result.getInstance()); //TODO still to be mapped //re.setCodeRepositoryUrl(j.read("$.coderepositoryurl")); diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java index 9c339d41c..2ea78fe84 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java @@ -694,7 +694,7 @@ public class XmlRecordFactory implements Serializable { if (isNotBlank(re.getCodeRepositoryUrl())) { metadata.add(XmlSerializationUtils.asXmlElement("coderepositoryurl", re.getCodeRepositoryUrl())); } - if (re.getResulttype() != null & !re.getResulttype().isBlank()) { + if (re.getResulttype() != null & re.getResulttype().isBlank()) { metadata.add(XmlSerializationUtils.mapQualifier("resulttype", re.getResulttype())); } if (re.getCollectedfrom() != null) { diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml index 4a78df5b0..5bc89396b 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml @@ -75,7 +75,7 @@ ${wf:conf('reuseRecords') eq false} ${wf:conf('reuseRecords') eq true} - + @@ -132,7 +132,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 + --conf spark.sql.shuffle.partitions=7680 --conf spark.network.timeout=${sparkNetworkTimeout} --inputRelationsPath${workingDir}/relation @@ -324,7 +324,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 + --conf spark.sql.shuffle.partitions=7680 --conf spark.network.timeout=${sparkNetworkTimeout} --inputGraphRootPath${inputGraphRootPath} @@ -351,7 +351,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 + --conf spark.sql.shuffle.partitions=7680 --conf spark.network.timeout=${sparkNetworkTimeout} --inputPath ${workingDir}/join_entities @@ -365,7 +365,7 @@ yarn cluster - build_adjacency_lists + convert_to_xml eu.dnetlib.dhp.oa.provision.XmlConverterJob dhp-graph-provision-${projectVersion}.jar