From f057dcdf659d36781af376c9aa77a52712607be7 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 27 May 2020 12:37:33 +0200 Subject: [PATCH 01/21] limit the max number of externalreferences to MAX_EXTERNAL_ENTITIES --- .../CreateRelatedEntitiesJob_phase2.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java index 403817019..7655d0da6 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java @@ -3,8 +3,10 @@ package eu.dnetlib.dhp.oa.provision; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; @@ -59,6 +61,8 @@ public class CreateRelatedEntitiesJob_phase2 { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final int MAX_EXTERNAL_ENTITIES = 50; + public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils @@ -190,6 +194,20 @@ public class CreateRelatedEntitiesJob_phase2 { (MapFunction) value -> OBJECT_MAPPER.readValue(value, entityClazz), Encoders.bean(entityClazz)) .filter("dataInfo.invisible == false") + .map((MapFunction) e -> { + if (ModelSupport.isSubClass(entityClazz, Result.class)) { + Result r = (Result) e; + if (r.getExternalReference() != null) { + List refs = r + .getExternalReference() + .stream() + .limit(MAX_EXTERNAL_ENTITIES) + .collect(Collectors.toList()); + r.setExternalReference(refs); + } + } + return e; + }, Encoders.bean(entityClazz)) .map( (MapFunction) value -> getTypedRow( StringUtils.substringAfterLast(inputEntityPath, "/"), value), From 8047d16dd9f56787ad469746839f86d70facdadc Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 27 May 2020 12:38:12 +0200 Subject: [PATCH 02/21] added RDD based adjacency list creation procedure --- .../oa/provision/AdjacencyListBuilderJob.java | 40 ++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java index 99247b756..9f221ae45 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java @@ -9,14 +9,20 @@ import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapGroupsFunction; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.rdd.RDD; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Lists; + import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity; @@ -83,7 +89,7 @@ public class AdjacencyListBuilderJob { isSparkSessionManaged, spark -> { removeOutputDir(spark, outputPath); - createAdjacencyLists(spark, inputPath, outputPath); + createAdjacencyListsRDD(spark, inputPath, outputPath); }); } @@ -118,6 +124,38 @@ public class AdjacencyListBuilderJob { .parquet(outputPath); } + private static void createAdjacencyListsRDD( + SparkSession spark, String inputPath, String outputPath) { + + log.info("Reading joined entities from: {}", inputPath); + RDD joinedEntities = spark + .read() + .load(inputPath) + .as(Encoders.bean(EntityRelEntity.class)) + .javaRDD() + .mapToPair(re -> { + JoinedEntity je = new JoinedEntity(); + je.setEntity(re.getEntity()); + je.setLinks(Lists.newArrayList()); + if (re.getRelation() != null && re.getTarget() != null) { + je.getLinks().add(new Tuple2(re.getRelation(), re.getTarget())); + } + return new scala.Tuple2<>(re.getEntity().getId(), je); + }) + .reduceByKey((je1, je2) -> { + je1.getLinks().addAll(je2.getLinks()); + return je1; + }) + .map(t -> t._2()) + .rdd(); + + spark + .createDataset(joinedEntities, Encoders.bean(JoinedEntity.class)) + .write() + .mode(SaveMode.Overwrite) + .parquet(outputPath); + } + private static void removeOutputDir(SparkSession spark, String path) { HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } From 9e4ec1543b860edd2706d77bd199ff897337fd63 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 27 May 2020 12:38:42 +0200 Subject: [PATCH 03/21] updated test --- .../oa/provision/XmlRecordFactoryTest.java | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlRecordFactoryTest.java diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlRecordFactoryTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlRecordFactoryTest.java new file mode 100644 index 000000000..c7156d8bd --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlRecordFactoryTest.java @@ -0,0 +1,50 @@ + +package eu.dnetlib.dhp.oa.provision; + +import static org.junit.jupiter.api.Assertions.*; + +import java.io.IOException; +import java.io.StringReader; + +import org.apache.commons.io.IOUtils; +import org.dom4j.Document; +import org.dom4j.DocumentException; +import org.dom4j.io.SAXReader; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.oa.provision.model.JoinedEntity; +import eu.dnetlib.dhp.oa.provision.utils.ContextMapper; +import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory; + +public class XmlRecordFactoryTest { + + private static final String otherDsTypeId = "scholarcomminfra,infospace,pubsrepository::mock,entityregistry,entityregistry::projects,entityregistry::repositories,websource"; + + @Test + public void testXMLRecordFactory() throws IOException, DocumentException { + + String json = IOUtils.toString(getClass().getResourceAsStream("joined_entity.json")); + + assertNotNull(json); + JoinedEntity je = new ObjectMapper().readValue(json, JoinedEntity.class); + assertNotNull(je); + + ContextMapper contextMapper = new ContextMapper(); + + XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false, XmlConverterJob.schemaLocation, + otherDsTypeId); + + String xml = xmlRecordFactory.build(je); + + assertNotNull(xml); + + Document doc = new SAXReader().read(new StringReader(xml)); + + assertNotNull(doc); + + //TODO add assertions based of values extracted from the XML record + } +} From cfd753217cac987f85841baad42064b4047c71e5 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 27 May 2020 12:44:01 +0200 Subject: [PATCH 04/21] repartition the join_entities in 24k files --- .../eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml index 6983ecf53..5d1889439 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml @@ -362,7 +362,7 @@ --inputGraphRootPath${inputGraphRootPath} --inputRelatedEntitiesPath${workingDir}/join_partial --outputPath${workingDir}/join_entities - --numPartitions12000 + --numPartitions24000 @@ -386,7 +386,7 @@ --conf spark.sql.shuffle.partitions=7680 --conf spark.network.timeout=${sparkNetworkTimeout} - --inputPath ${workingDir}/join_entities + --inputPath${workingDir}/join_entities --outputPath${workingDir}/joined From fdd54bad1cc304b663a3b96ba3021c6aefd19cc3 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 27 May 2020 19:31:54 +0200 Subject: [PATCH 05/21] code formatting --- .../java/eu/dnetlib/dhp/oa/provision/XmlRecordFactoryTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlRecordFactoryTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlRecordFactoryTest.java index c7156d8bd..9a115bfa6 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlRecordFactoryTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlRecordFactoryTest.java @@ -45,6 +45,6 @@ public class XmlRecordFactoryTest { assertNotNull(doc); - //TODO add assertions based of values extracted from the XML record + // TODO add assertions based of values extracted from the XML record } } From 5dea155a87ab2eee9cbc3eaa59beccec7e9a7128 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 28 May 2020 13:49:59 +0200 Subject: [PATCH 06/21] increased number of partitions produced by the join_all_entities phase as well as spark.sql.shuffle.partitions in adjancency_lists phase --- .../eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml index 5d1889439..02148ed57 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml @@ -362,7 +362,7 @@ --inputGraphRootPath${inputGraphRootPath} --inputRelatedEntitiesPath${workingDir}/join_partial --outputPath${workingDir}/join_entities - --numPartitions24000 + --numPartitions35000 @@ -383,7 +383,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 + --conf spark.sql.shuffle.partitions=15000 --conf spark.network.timeout=${sparkNetworkTimeout} --inputPath${workingDir}/join_entities From ef115930682635ff4e478eedfdf4b555d656ab53 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 28 May 2020 13:50:44 +0200 Subject: [PATCH 07/21] JoinedEntity.links defined as empty list by default --- .../java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java index e29ec9d19..7681fa76f 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java @@ -2,13 +2,14 @@ package eu.dnetlib.dhp.oa.provision.model; import java.io.Serializable; +import java.util.ArrayList; import java.util.List; public class JoinedEntity implements Serializable { private TypedRow entity; - private List links; + private List links = new ArrayList<>(); public JoinedEntity() { } From 83504ecacebcab44212a0679350dfa35a839c484 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 28 May 2020 13:52:30 +0200 Subject: [PATCH 08/21] limiting the maximum number of authors allowed in XML records to MAX_AUTHORS = 200; authors with ORCID can exceed that limit --- .../CreateRelatedEntitiesJob_phase2.java | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java index 7655d0da6..cc9f17ee7 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java @@ -5,7 +5,9 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.Optional; +import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; @@ -22,6 +24,7 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; @@ -63,6 +66,8 @@ public class CreateRelatedEntitiesJob_phase2 { private static final int MAX_EXTERNAL_ENTITIES = 50; + private static final int MAX_AUTHORS = 200; + public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils @@ -205,6 +210,16 @@ public class CreateRelatedEntitiesJob_phase2 { .collect(Collectors.toList()); r.setExternalReference(refs); } + if (r.getAuthor() != null && r.getAuthor().size() > MAX_AUTHORS) { + List authors = Lists.newArrayList(); + for (int i = 0; i < r.getAuthor().size(); i++) { + final Author a = r.getAuthor().get(i); + if (authors.size() < MAX_AUTHORS || hasORCID(a)) { + authors.add(a); + } + } + r.setAuthor(authors); + } } return e; }, Encoders.bean(entityClazz)) @@ -214,6 +229,18 @@ public class CreateRelatedEntitiesJob_phase2 { Encoders.bean(TypedRow.class)); } + private static boolean hasORCID(Author a) { + return a.getPid() != null && a + .getPid() + .stream() + .filter(Objects::nonNull) + .map(StructuredProperty::getQualifier) + .filter(Objects::nonNull) + .map(Qualifier::getClassid) + .filter(StringUtils::isNotBlank) + .anyMatch(c -> "orcid".equals(c.toLowerCase())); + } + private static TypedRow getTypedRow(String type, OafEntity entity) throws JsonProcessingException { TypedRow t = new TypedRow(); From 821be1f8b66ac13bc79105cb17bc86ca5fbf4f85 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 28 May 2020 13:53:13 +0200 Subject: [PATCH 09/21] experimental implementation of custom aggregation using kryo encoders --- .../oa/provision/AdjacencyListBuilderJob.java | 94 +++++++++++++++++-- 1 file changed, 85 insertions(+), 9 deletions(-) diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java index 9f221ae45..63b90be7c 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java @@ -4,20 +4,20 @@ package eu.dnetlib.dhp.oa.provision; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.rdd.RDD; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.*; +import org.apache.spark.sql.expressions.Aggregator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,10 +25,11 @@ import com.google.common.collect.Lists; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity; -import eu.dnetlib.dhp.oa.provision.model.JoinedEntity; -import eu.dnetlib.dhp.oa.provision.model.Tuple2; +import eu.dnetlib.dhp.oa.provision.model.*; import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import scala.Function1; +import scala.Function2; /** * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. The @@ -82,17 +83,92 @@ public class AdjacencyListBuilderJob { SparkConf conf = new SparkConf(); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.registerKryoClasses(ModelSupport.getOafModelClasses()); + List> modelClasses = Arrays.asList(ModelSupport.getOafModelClasses()); + modelClasses + .addAll( + Lists + .newArrayList( + TypedRow.class, + EntityRelEntity.class, + JoinedEntity.class, + RelatedEntity.class, + Tuple2.class, + SortableRelation.class)); + conf.registerKryoClasses(modelClasses.toArray(new Class[] {})); runWithSparkSession( conf, isSparkSessionManaged, spark -> { removeOutputDir(spark, outputPath); - createAdjacencyListsRDD(spark, inputPath, outputPath); + createAdjacencyListsKryo(spark, inputPath, outputPath); }); } + private static void createAdjacencyListsKryo( + SparkSession spark, String inputPath, String outputPath) { + + TypedColumn aggregator = new AdjacencyListAggregator().toColumn(); + log.info("Reading joined entities from: {}", inputPath); + spark + .read() + .load(inputPath) + .as(Encoders.kryo(EntityRelEntity.class)) + .groupByKey( + (MapFunction) value -> value.getEntity().getId(), + Encoders.STRING()) + .agg(aggregator) + .write() + .mode(SaveMode.Overwrite) + .parquet(outputPath); + } + + public static class AdjacencyListAggregator extends Aggregator { + + @Override + public JoinedEntity zero() { + return new JoinedEntity(); + } + + @Override + public JoinedEntity reduce(JoinedEntity j, EntityRelEntity e) { + j.setEntity(e.getEntity()); + if (j.getLinks().size() <= MAX_LINKS) { + j.getLinks().add(new Tuple2(e.getRelation(), e.getTarget())); + } + return j; + } + + @Override + public JoinedEntity merge(JoinedEntity j1, JoinedEntity j2) { + j1.getLinks().addAll(j2.getLinks()); + return j1; + } + + @Override + public JoinedEntity finish(JoinedEntity j) { + if (j.getLinks().size() > MAX_LINKS) { + ArrayList links = j + .getLinks() + .stream() + .limit(MAX_LINKS) + .collect(Collectors.toCollection(ArrayList::new)); + j.setLinks(links); + } + return j; + } + + @Override + public Encoder bufferEncoder() { + return Encoders.kryo(JoinedEntity.class); + } + + @Override + public Encoder outputEncoder() { + return Encoders.kryo(JoinedEntity.class); + } + } + private static void createAdjacencyLists( SparkSession spark, String inputPath, String outputPath) { From a57965a3ea5fdc0536444166d357a35197978610 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 28 May 2020 17:36:37 +0200 Subject: [PATCH 10/21] limiting the dimensions of outliers --- .../CreateRelatedEntitiesJob_phase2.java | 80 ++++++++++++------- 1 file changed, 53 insertions(+), 27 deletions(-) diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java index cc9f17ee7..1de734ee4 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java @@ -3,11 +3,9 @@ package eu.dnetlib.dhp.oa.provision; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.Optional; -import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; @@ -65,8 +63,10 @@ public class CreateRelatedEntitiesJob_phase2 { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final int MAX_EXTERNAL_ENTITIES = 50; - private static final int MAX_AUTHORS = 200; + private static final int MAX_AUTHOR_FULLNAME_LENGTH = 1000; + private static final int MAX_TITLE_LENGTH = 5000; + private static final int MAX_ABSTRACT_LENGTH = 100000; public static void main(String[] args) throws Exception { @@ -199,36 +199,62 @@ public class CreateRelatedEntitiesJob_phase2 { (MapFunction) value -> OBJECT_MAPPER.readValue(value, entityClazz), Encoders.bean(entityClazz)) .filter("dataInfo.invisible == false") - .map((MapFunction) e -> { - if (ModelSupport.isSubClass(entityClazz, Result.class)) { - Result r = (Result) e; - if (r.getExternalReference() != null) { - List refs = r - .getExternalReference() - .stream() - .limit(MAX_EXTERNAL_ENTITIES) - .collect(Collectors.toList()); - r.setExternalReference(refs); - } - if (r.getAuthor() != null && r.getAuthor().size() > MAX_AUTHORS) { - List authors = Lists.newArrayList(); - for (int i = 0; i < r.getAuthor().size(); i++) { - final Author a = r.getAuthor().get(i); - if (authors.size() < MAX_AUTHORS || hasORCID(a)) { - authors.add(a); - } - } - r.setAuthor(authors); - } - } - return e; - }, Encoders.bean(entityClazz)) + .map((MapFunction) e -> pruneOutliers(entityClazz, e), Encoders.bean(entityClazz)) .map( (MapFunction) value -> getTypedRow( StringUtils.substringAfterLast(inputEntityPath, "/"), value), Encoders.bean(TypedRow.class)); } + private static E pruneOutliers(Class entityClazz, E e) { + if (ModelSupport.isSubClass(entityClazz, Result.class)) { + Result r = (Result) e; + if (r.getExternalReference() != null) { + List refs = r + .getExternalReference() + .stream() + .limit(MAX_EXTERNAL_ENTITIES) + .collect(Collectors.toList()); + r.setExternalReference(refs); + } + if (r.getAuthor() != null) { + List authors = Lists.newArrayList(); + for (Author a : r.getAuthor()) { + a.setFullname(StringUtils.left(a.getFullname(), MAX_AUTHOR_FULLNAME_LENGTH)); + if (authors.size() < MAX_AUTHORS || hasORCID(a)) { + authors.add(a); + } + } + r.setAuthor(authors); + } + if (r.getDescription() != null) { + List> desc = r + .getDescription() + .stream() + .filter(Objects::nonNull) + .map(d -> { + d.setValue(StringUtils.left(d.getValue(), MAX_ABSTRACT_LENGTH)); + return d; + }) + .collect(Collectors.toList()); + r.setDescription(desc); + } + if (r.getTitle() != null) { + List titles = r + .getTitle() + .stream() + .filter(Objects::nonNull) + .map(t -> { + t.setValue(StringUtils.left(t.getValue(), MAX_TITLE_LENGTH)); + return t; + }) + .collect(Collectors.toList()); + r.setTitle(titles); + } + } + return e; + } + private static boolean hasORCID(Author a) { return a.getPid() != null && a .getPid() From b2f9564f13b90312a8f462a27fb56935f29e944d Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 29 May 2020 10:58:15 +0200 Subject: [PATCH 11/21] WIP: fixed PrepareRelationsJob; parallel implementation of CreateRelatedEntitiesJob_phase2, now works by OafType; introduced custom aggregator in AdjacencyListBuilderJob --- .../dhp/schema/common/ModelSupport.java | 16 ++ .../oa/provision/AdjacencyListBuilderJob.java | 29 +-- .../CreateRelatedEntitiesJob_phase1.java | 2 +- .../CreateRelatedEntitiesJob_phase2.java | 97 ++++---- .../dhp/oa/provision/PrepareRelationsJob.java | 78 ++----- .../model/ProvisionModelSupport.java | 26 +++ .../provision/utils/RelationPartitioner.java | 6 +- ...input_params_related_entities_pahase2.json | 10 +- .../dhp/oa/provision/oozie_app/workflow.xml | 215 ++++++++++++++++-- 9 files changed, 329 insertions(+), 150 deletions(-) create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelSupport.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelSupport.java index 9ee7c2deb..7d8be81ac 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelSupport.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelSupport.java @@ -58,6 +58,18 @@ public class ModelSupport { oafTypes.put("relation", Relation.class); } + public static final Map idPrefixMap = Maps.newHashMap(); + + static { + idPrefixMap.put(Datasource.class, "10"); + idPrefixMap.put(Organization.class, "20"); + idPrefixMap.put(Project.class, "40"); + idPrefixMap.put(Dataset.class, "50"); + idPrefixMap.put(OtherResearchProduct.class, "50"); + idPrefixMap.put(Software.class, "50"); + idPrefixMap.put(Publication.class, "50"); + } + public static final Map entityIdPrefix = Maps.newHashMap(); static { @@ -289,6 +301,10 @@ public class ModelSupport { private ModelSupport() { } + public static String getIdPrefix(Class clazz) { + return idPrefixMap.get(clazz); + } + /** * Checks subclass-superclass relationship. * diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java index 63b90be7c..910138988 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java @@ -30,6 +30,8 @@ import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.Oaf; import scala.Function1; import scala.Function2; +import scala.collection.JavaConverters; +import scala.collection.Seq; /** * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. The @@ -83,18 +85,7 @@ public class AdjacencyListBuilderJob { SparkConf conf = new SparkConf(); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - List> modelClasses = Arrays.asList(ModelSupport.getOafModelClasses()); - modelClasses - .addAll( - Lists - .newArrayList( - TypedRow.class, - EntityRelEntity.class, - JoinedEntity.class, - RelatedEntity.class, - Tuple2.class, - SortableRelation.class)); - conf.registerKryoClasses(modelClasses.toArray(new Class[] {})); + conf.registerKryoClasses(ProvisionModelSupport.getModelClasses()); runWithSparkSession( conf, @@ -108,11 +99,17 @@ public class AdjacencyListBuilderJob { private static void createAdjacencyListsKryo( SparkSession spark, String inputPath, String outputPath) { - TypedColumn aggregator = new AdjacencyListAggregator().toColumn(); log.info("Reading joined entities from: {}", inputPath); + + final List paths = HdfsSupport + .listFiles(inputPath, spark.sparkContext().hadoopConfiguration()); + + log.info("Found paths: {}", String.join(",", paths)); + + TypedColumn aggregator = new AdjacencyListAggregator().toColumn(); spark .read() - .load(inputPath) + .load(toSeq(paths)) .as(Encoders.kryo(EntityRelEntity.class)) .groupByKey( (MapFunction) value -> value.getEntity().getId(), @@ -232,6 +229,10 @@ public class AdjacencyListBuilderJob { .parquet(outputPath); } + private static Seq toSeq(List list) { + return JavaConverters.asScalaIteratorConverter(list.iterator()).asScala().toSeq(); + } + private static void removeOutputDir(SparkSession spark, String path) { HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java index 606fa4cc0..ccb20a136 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java @@ -134,7 +134,7 @@ public class CreateRelatedEntitiesJob_phase1 { Encoders.bean(EntityRelEntity.class)) .write() .mode(SaveMode.Overwrite) - .parquet(outputPath + "/" + EntityType.fromClass(clazz)); + .parquet(outputPath); } private static Dataset readPathEntity( diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java index 1de734ee4..757ab47d3 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java @@ -27,6 +27,7 @@ import com.google.common.collect.Lists; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity; +import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport; import eu.dnetlib.dhp.oa.provision.model.TypedRow; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; @@ -74,7 +75,7 @@ public class CreateRelatedEntitiesJob_phase2 { .toString( PrepareRelationsJob.class .getResourceAsStream( - "/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase2.json")); + "/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase2.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); @@ -87,8 +88,8 @@ public class CreateRelatedEntitiesJob_phase2 { String inputRelatedEntitiesPath = parser.get("inputRelatedEntitiesPath"); log.info("inputRelatedEntitiesPath: {}", inputRelatedEntitiesPath); - String inputGraphRootPath = parser.get("inputGraphRootPath"); - log.info("inputGraphRootPath: {}", inputGraphRootPath); + String inputEntityPath = parser.get("inputEntityPath"); + log.info("inputEntityPath: {}", inputEntityPath); String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); @@ -96,44 +97,49 @@ public class CreateRelatedEntitiesJob_phase2 { int numPartitions = Integer.parseInt(parser.get("numPartitions")); log.info("numPartitions: {}", numPartitions); + String graphTableClassName = parser.get("graphTableClassName"); + log.info("graphTableClassName: {}", graphTableClassName); + + Class entityClazz = (Class) Class.forName(graphTableClassName); + SparkConf conf = new SparkConf(); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.registerKryoClasses(ModelSupport.getOafModelClasses()); + conf.registerKryoClasses(ProvisionModelSupport.getModelClasses()); runWithSparkSession( conf, isSparkSessionManaged, spark -> { removeOutputDir(spark, outputPath); - joinAllEntities( - spark, inputRelatedEntitiesPath, inputGraphRootPath, outputPath, numPartitions); + joinEntityWithRelatedEntities( + spark, inputRelatedEntitiesPath, inputEntityPath, outputPath, numPartitions, entityClazz); }); } - private static void joinAllEntities( + private static void joinEntityWithRelatedEntities( SparkSession spark, - String inputRelatedEntitiesPath, - String inputGraphRootPath, + String relatedEntitiesPath, + String entityPath, String outputPath, - int numPartitions) { + int numPartitions, + Class entityClazz) { - Dataset> entities = readAllEntities(spark, inputGraphRootPath, numPartitions); - Dataset> relsBySource = readRelatedEntities(spark, inputRelatedEntitiesPath); + Dataset> entity = readPathEntity(spark, entityPath, entityClazz); + Dataset> relatedEntities = readRelatedEntities( + spark, relatedEntitiesPath, entityClazz); - entities - .joinWith(relsBySource, entities.col("_1").equalTo(relsBySource.col("_1")), "left_outer") - .map( - (MapFunction, Tuple2>, EntityRelEntity>) value -> { - EntityRelEntity re = new EntityRelEntity(); - re.setEntity(value._1()._2()); - Optional related = Optional.ofNullable(value._2()).map(Tuple2::_2); - if (related.isPresent()) { - re.setRelation(related.get().getRelation()); - re.setTarget(related.get().getTarget()); - } - return re; - }, - Encoders.bean(EntityRelEntity.class)) + entity + .joinWith(relatedEntities, entity.col("_1").equalTo(relatedEntities.col("_1")), "left_outer") + .map((MapFunction, Tuple2>, EntityRelEntity>) value -> { + EntityRelEntity re = new EntityRelEntity(); + re.setEntity(getTypedRow(entityClazz.getCanonicalName().toLowerCase(), value._1()._2())); + Optional related = Optional.ofNullable(value._2()).map(Tuple2::_2); + if (related.isPresent()) { + re.setRelation(related.get().getRelation()); + re.setTarget(related.get().getTarget()); + } + return re; + }, Encoders.bean(EntityRelEntity.class)) .repartition(numPartitions) .filter( (FilterFunction) value -> value.getEntity() != null @@ -143,33 +149,8 @@ public class CreateRelatedEntitiesJob_phase2 { .parquet(outputPath); } - private static Dataset> readAllEntities( - SparkSession spark, String inputGraphPath, int numPartitions) { - Dataset publication = readPathEntity(spark, inputGraphPath + "/publication", Publication.class); - Dataset dataset = readPathEntity( - spark, inputGraphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class); - Dataset other = readPathEntity( - spark, inputGraphPath + "/otherresearchproduct", OtherResearchProduct.class); - Dataset software = readPathEntity(spark, inputGraphPath + "/software", Software.class); - Dataset datasource = readPathEntity(spark, inputGraphPath + "/datasource", Datasource.class); - Dataset organization = readPathEntity(spark, inputGraphPath + "/organization", Organization.class); - Dataset project = readPathEntity(spark, inputGraphPath + "/project", Project.class); - - return publication - .union(dataset) - .union(other) - .union(software) - .union(datasource) - .union(organization) - .union(project) - .map( - (MapFunction>) value -> new Tuple2<>(value.getId(), value), - Encoders.tuple(Encoders.STRING(), Encoders.kryo(TypedRow.class))) - .repartition(numPartitions); - } - - private static Dataset> readRelatedEntities( - SparkSession spark, String inputRelatedEntitiesPath) { + private static Dataset> readRelatedEntities( + SparkSession spark, String inputRelatedEntitiesPath, Class entityClazz) { log.info("Reading related entities from: {}", inputRelatedEntitiesPath); @@ -178,17 +159,20 @@ public class CreateRelatedEntitiesJob_phase2 { log.info("Found paths: {}", String.join(",", paths)); + final String idPrefix = ModelSupport.getIdPrefix(entityClazz); + return spark .read() .load(toSeq(paths)) .as(Encoders.bean(EntityRelEntity.class)) + .filter((FilterFunction) e -> e.getRelation().getSource().startsWith(idPrefix)) .map( (MapFunction>) value -> new Tuple2<>( value.getRelation().getSource(), value), Encoders.tuple(Encoders.STRING(), Encoders.kryo(EntityRelEntity.class))); } - private static Dataset readPathEntity( + private static Dataset> readPathEntity( SparkSession spark, String inputEntityPath, Class entityClazz) { log.info("Reading Graph table from: {}", inputEntityPath); @@ -201,9 +185,8 @@ public class CreateRelatedEntitiesJob_phase2 { .filter("dataInfo.invisible == false") .map((MapFunction) e -> pruneOutliers(entityClazz, e), Encoders.bean(entityClazz)) .map( - (MapFunction) value -> getTypedRow( - StringUtils.substringAfterLast(inputEntityPath, "/"), value), - Encoders.bean(TypedRow.class)); + (MapFunction>) e -> new Tuple2<>(e.getId(), e), + Encoders.tuple(Encoders.STRING(), Encoders.kryo(entityClazz))); } private static E pruneOutliers(Class entityClazz, E e) { diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java index 72d68a389..6b184071a 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java @@ -3,9 +3,8 @@ package eu.dnetlib.dhp.oa.provision; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -import java.util.HashSet; -import java.util.Optional; -import java.util.Set; +import java.util.*; +import java.util.function.Function; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; @@ -20,6 +19,7 @@ import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.clearspring.analytics.util.Lists; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Splitter; import com.google.common.collect.Iterables; @@ -27,9 +27,11 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Sets; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.FunctionalInterfaceSupport; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.oa.provision.model.SortableRelation; import eu.dnetlib.dhp.oa.provision.utils.RelationPartitioner; +import scala.Function1; import scala.Tuple2; /** @@ -111,37 +113,10 @@ public class PrepareRelationsJob { spark -> { removeOutputDir(spark, outputPath); prepareRelationsRDD( - spark, inputRelationsPath, outputPath, relationFilter, relPartitions, maxRelations); + spark, inputRelationsPath, outputPath, relationFilter, maxRelations, relPartitions); }); } - /** - * Dataset based implementation that prepares the graph relations by limiting the number of outgoing links and - * filtering the relation types according to the given criteria. - * - * @param spark the spark session - * @param inputRelationsPath source path for the graph relations - * @param outputPath output path for the processed relations - * @param relationFilter set of relation filters applied to the `relClass` field - * @param maxRelations maximum number of allowed outgoing edges - */ - private static void prepareRelations( - SparkSession spark, String inputRelationsPath, String outputPath, Set relationFilter, - int maxRelations) { - readPathRelation(spark, inputRelationsPath) - .filter("dataInfo.deletedbyinference == false") - .filter((FilterFunction) rel -> !relationFilter.contains(rel.getRelClass())) - .groupByKey( - (MapFunction) value -> value.getSource(), Encoders.STRING()) - .flatMapGroups( - (FlatMapGroupsFunction) (key, values) -> Iterators - .limit(values, maxRelations), - Encoders.bean(SortableRelation.class)) - .write() - .mode(SaveMode.Overwrite) - .parquet(outputPath); - } - /** * RDD based implementation that prepares the graph relations by limiting the number of outgoing links and filtering * the relation types according to the given criteria. Moreover, outgoing links kept within the given limit are @@ -152,50 +127,41 @@ public class PrepareRelationsJob { * @param outputPath output path for the processed relations * @param relationFilter set of relation filters applied to the `relClass` field * @param maxRelations maximum number of allowed outgoing edges + * @param relPartitions number of partitions for the output RDD */ - // TODO work in progress private static void prepareRelationsRDD( - SparkSession spark, String inputRelationsPath, String outputPath, Set relationFilter, int relPartitions, - int maxRelations) { - JavaRDD rels = readPathRelationRDD(spark, inputRelationsPath).repartition(relPartitions); - RelationPartitioner partitioner = new RelationPartitioner(rels.getNumPartitions()); + SparkSession spark, String inputRelationsPath, String outputPath, Set relationFilter, int maxRelations, + int relPartitions) { - // only consider those that are not virtually deleted - RDD d = rels + RDD cappedRels = readPathRelationRDD(spark, inputRelationsPath) + .repartition(relPartitions) .filter(rel -> !rel.getDataInfo().getDeletedbyinference()) .filter(rel -> !relationFilter.contains(rel.getRelClass())) - .mapToPair( - (PairFunction) rel -> new Tuple2<>(rel, rel)) - .groupByKey(partitioner) - .map(group -> Iterables.limit(group._2(), maxRelations)) - .flatMap(group -> group.iterator()) + // group by SOURCE and apply limit + .mapToPair(rel -> new Tuple2<>(rel.getSource(), rel)) + .groupByKey(new RelationPartitioner(relPartitions)) + .flatMap(group -> Iterables.limit(group._2(), maxRelations).iterator()) + // group by TARGET and apply limit + .mapToPair(rel -> new Tuple2<>(rel.getTarget(), rel)) + .groupByKey(new RelationPartitioner(relPartitions)) + .flatMap(group -> Iterables.limit(group._2(), maxRelations).iterator()) .rdd(); spark - .createDataset(d, Encoders.bean(SortableRelation.class)) + .createDataset(cappedRels, Encoders.bean(SortableRelation.class)) .write() .mode(SaveMode.Overwrite) .parquet(outputPath); } /** - * Reads a Dataset of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline delimited json text + * Reads a JavaRDD of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline delimited json text * file, * * @param spark * @param inputPath - * @return the Dataset containing all the relationships + * @return the JavaRDD containing all the relationships */ - private static Dataset readPathRelation( - SparkSession spark, final String inputPath) { - return spark - .read() - .textFile(inputPath) - .map( - (MapFunction) value -> OBJECT_MAPPER.readValue(value, SortableRelation.class), - Encoders.bean(SortableRelation.class)); - } - private static JavaRDD readPathRelationRDD( SparkSession spark, final String inputPath) { JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java new file mode 100644 index 000000000..3cccce7c4 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java @@ -0,0 +1,26 @@ + +package eu.dnetlib.dhp.oa.provision.model; + +import java.util.List; + +import com.google.common.collect.Lists; + +import eu.dnetlib.dhp.schema.common.ModelSupport; + +public class ProvisionModelSupport { + + public static Class[] getModelClasses() { + List> modelClasses = Lists.newArrayList(ModelSupport.getOafModelClasses()); + modelClasses + .addAll( + Lists + .newArrayList( + TypedRow.class, + EntityRelEntity.class, + JoinedEntity.class, + RelatedEntity.class, + Tuple2.class, + SortableRelation.class)); + return modelClasses.toArray(new Class[] {}); + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/RelationPartitioner.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/RelationPartitioner.java index a09a27837..c7862b48a 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/RelationPartitioner.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/RelationPartitioner.java @@ -4,8 +4,6 @@ package eu.dnetlib.dhp.oa.provision.utils; import org.apache.spark.Partitioner; import org.apache.spark.util.Utils; -import eu.dnetlib.dhp.oa.provision.model.SortableRelation; - /** * Used in combination with SortableRelationKey, allows to partition the records by source id, therefore allowing to * sort relations sharing the same source id by the ordering defined in SortableRelationKey. @@ -25,6 +23,8 @@ public class RelationPartitioner extends Partitioner { @Override public int getPartition(Object key) { - return Utils.nonNegativeMod(((SortableRelation) key).getSource().hashCode(), numPartitions()); + String partitionKey = (String) key; + return Utils.nonNegativeMod(partitionKey.hashCode(), numPartitions()); } + } diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase2.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase2.json index 2727f153b..2c9f0e4f3 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase2.json +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase2.json @@ -13,8 +13,14 @@ }, { "paramName": "iep", - "paramLongName": "inputGraphRootPath", - "paramDescription": "root graph path", + "paramLongName": "inputEntityPath", + "paramDescription": "input Entity Path", + "paramRequired": true + }, + { + "paramName": "clazz", + "paramLongName": "graphTableClassName", + "paramDescription": "class name associated to the input entity path", "paramRequired": true }, { diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml index 02148ed57..dcd434e9b 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml @@ -103,7 +103,7 @@ ${wf:conf('resumeFrom') eq 'prepare_relations'} ${wf:conf('resumeFrom') eq 'fork_join_related_entities'} - ${wf:conf('resumeFrom') eq 'join_all_entities'} + ${wf:conf('resumeFrom') eq 'fork_join_all_entities'} ${wf:conf('resumeFrom') eq 'adjancency_lists'} ${wf:conf('resumeFrom') eq 'convert_to_xml'} ${wf:conf('resumeFrom') eq 'to_solr_index'} @@ -134,7 +134,7 @@ --inputRelationsPath${inputGraphRootPath}/relation --outputPath${workingDir}/relation - --relPartitions3000 + --relPartitions5000 @@ -171,7 +171,7 @@ --inputRelationsPath${workingDir}/relation --inputEntityPath${inputGraphRootPath}/publication --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication - --outputPath${workingDir}/join_partial + --outputPath${workingDir}/join_partial/publication @@ -198,7 +198,7 @@ --inputRelationsPath${workingDir}/relation --inputEntityPath${inputGraphRootPath}/dataset --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset - --outputPath${workingDir}/join_partial + --outputPath${workingDir}/join_partial/dataset @@ -225,7 +225,7 @@ --inputRelationsPath${workingDir}/relation --inputEntityPath${inputGraphRootPath}/otherresearchproduct --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct - --outputPath${workingDir}/join_partial + --outputPath${workingDir}/join_partial/otherresearchproduct @@ -252,7 +252,7 @@ --inputRelationsPath${workingDir}/relation --inputEntityPath${inputGraphRootPath}/software --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software - --outputPath${workingDir}/join_partial + --outputPath${workingDir}/join_partial/software @@ -279,7 +279,7 @@ --inputRelationsPath${workingDir}/relation --inputEntityPath${inputGraphRootPath}/datasource --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Datasource - --outputPath${workingDir}/join_partial + --outputPath${workingDir}/join_partial/datasource @@ -306,7 +306,7 @@ --inputRelationsPath${workingDir}/relation --inputEntityPath${inputGraphRootPath}/organization --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Organization - --outputPath${workingDir}/join_partial + --outputPath${workingDir}/join_partial/organization @@ -333,19 +333,29 @@ --inputRelationsPath${workingDir}/relation --inputEntityPath${inputGraphRootPath}/project --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Project - --outputPath${workingDir}/join_partial + --outputPath${workingDir}/join_partial/project - + - + + + + + + + + + + + yarn cluster - Join[entities.id = relatedEntity.source] + Join[publication.id = relatedEntity.source] eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2 dhp-graph-provision-${projectVersion}.jar @@ -356,18 +366,189 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 + --conf spark.sql.shuffle.partitions=15360 --conf spark.network.timeout=${sparkNetworkTimeout} - --inputGraphRootPath${inputGraphRootPath} + --inputEntityPath${inputGraphRootPath}/publication + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication --inputRelatedEntitiesPath${workingDir}/join_partial - --outputPath${workingDir}/join_entities + --outputPath${workingDir}/join_entities/publication --numPartitions35000 - + + + + yarn + cluster + Join[dataset.id = relatedEntity.source] + eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2 + dhp-graph-provision-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=15360 + --conf spark.network.timeout=${sparkNetworkTimeout} + + --inputEntityPath${inputGraphRootPath}/dataset + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset + --inputRelatedEntitiesPath${workingDir}/join_partial + --outputPath${workingDir}/join_entities/dataset + --numPartitions20000 + + + + + + + + yarn + cluster + Join[otherresearchproduct.id = relatedEntity.source] + eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2 + dhp-graph-provision-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=15360 + --conf spark.network.timeout=${sparkNetworkTimeout} + + --inputEntityPath${inputGraphRootPath}/otherresearchproduct + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct + --inputRelatedEntitiesPath${workingDir}/join_partial + --outputPath${workingDir}/join_entities/otherresearchproduct + --numPartitions10000 + + + + + + + + yarn + cluster + Join[software.id = relatedEntity.source] + eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2 + dhp-graph-provision-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=15360 + --conf spark.network.timeout=${sparkNetworkTimeout} + + --inputEntityPath${inputGraphRootPath}/software + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software + --inputRelatedEntitiesPath${workingDir}/join_partial + --outputPath${workingDir}/join_entities/software + --numPartitions10000 + + + + + + + + yarn + cluster + Join[datasource.id = relatedEntity.source] + eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2 + dhp-graph-provision-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=15360 + --conf spark.network.timeout=${sparkNetworkTimeout} + + --inputEntityPath${inputGraphRootPath}/datasource + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Datasource + --inputRelatedEntitiesPath${workingDir}/join_partial + --outputPath${workingDir}/join_entities/datasource + --numPartitions1000 + + + + + + + + yarn + cluster + Join[organization.id = relatedEntity.source] + eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2 + dhp-graph-provision-${projectVersion}.jar + + --executor-cores=2 + --executor-memory=12G + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=15360 + --conf spark.network.timeout=${sparkNetworkTimeout} + + --inputEntityPath${inputGraphRootPath}/organization + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Organization + --inputRelatedEntitiesPath${workingDir}/join_partial + --outputPath${workingDir}/join_entities/organization + --numPartitions20000 + + + + + + + + yarn + cluster + Join[project.id = relatedEntity.source] + eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2 + dhp-graph-provision-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=15360 + --conf spark.network.timeout=${sparkNetworkTimeout} + + --inputEntityPath${inputGraphRootPath}/project + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Project + --inputRelatedEntitiesPath${workingDir}/join_partial + --outputPath${workingDir}/join_entities/project + --numPartitions10000 + + + + + + + yarn @@ -383,7 +564,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=15000 + --conf spark.sql.shuffle.partitions=15360 --conf spark.network.timeout=${sparkNetworkTimeout} --inputPath${workingDir}/join_entities From 6f5f498c7897f191b963ded2432993cb2f37b28e Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 29 May 2020 11:22:00 +0200 Subject: [PATCH 12/21] restored common properties driving executor-cores and executor-memory in join_organization_relations wf node --- .../eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml index dcd434e9b..15d352790 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml @@ -499,8 +499,8 @@ eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2 dhp-graph-provision-${projectVersion}.jar - --executor-cores=2 - --executor-memory=12G + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} --driver-memory=${sparkDriverMemoryForJoining} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} From 05f269a1c085296471f42554c3d86472e0d78fb4 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 1 Jun 2020 00:32:42 +0200 Subject: [PATCH 13/21] kryo based parallel implementation of CreateRelatedEntitiesJob_phase2, now works by OafType; introduced custom aggregator in AdjacencyListBuilderJob --- .../oa/provision/AdjacencyListBuilderJob.java | 132 +----------------- .../CreateRelatedEntitiesJob_phase1.java | 12 +- .../CreateRelatedEntitiesJob_phase2.java | 117 ++++++++++++---- .../dhp/oa/provision/XmlConverterJob.java | 35 ++--- .../dhp/oa/provision/model/JoinedEntity.java | 29 ++-- .../model/ProvisionModelSupport.java | 3 +- ...lEntity.java => RelatedEntityWrapper.java} | 25 +--- .../dhp/oa/provision/model/Tuple2.java | 53 ------- .../oa/provision/utils/XmlRecordFactory.java | 33 +++-- .../dhp/oa/provision/oozie_app/workflow.xml | 46 ++---- 10 files changed, 169 insertions(+), 316 deletions(-) rename dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/{EntityRelEntity.java => RelatedEntityWrapper.java} (56%) delete mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Tuple2.java diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java index 910138988..d9cc03cd5 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java @@ -4,32 +4,23 @@ package eu.dnetlib.dhp.oa.provision; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapGroupsFunction; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.rdd.RDD; import org.apache.spark.sql.*; import org.apache.spark.sql.expressions.Aggregator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; - import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.oa.provision.model.*; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.Oaf; -import scala.Function1; -import scala.Function2; +import scala.Tuple2; import scala.collection.JavaConverters; import scala.collection.Seq; @@ -106,127 +97,6 @@ public class AdjacencyListBuilderJob { log.info("Found paths: {}", String.join(",", paths)); - TypedColumn aggregator = new AdjacencyListAggregator().toColumn(); - spark - .read() - .load(toSeq(paths)) - .as(Encoders.kryo(EntityRelEntity.class)) - .groupByKey( - (MapFunction) value -> value.getEntity().getId(), - Encoders.STRING()) - .agg(aggregator) - .write() - .mode(SaveMode.Overwrite) - .parquet(outputPath); - } - - public static class AdjacencyListAggregator extends Aggregator { - - @Override - public JoinedEntity zero() { - return new JoinedEntity(); - } - - @Override - public JoinedEntity reduce(JoinedEntity j, EntityRelEntity e) { - j.setEntity(e.getEntity()); - if (j.getLinks().size() <= MAX_LINKS) { - j.getLinks().add(new Tuple2(e.getRelation(), e.getTarget())); - } - return j; - } - - @Override - public JoinedEntity merge(JoinedEntity j1, JoinedEntity j2) { - j1.getLinks().addAll(j2.getLinks()); - return j1; - } - - @Override - public JoinedEntity finish(JoinedEntity j) { - if (j.getLinks().size() > MAX_LINKS) { - ArrayList links = j - .getLinks() - .stream() - .limit(MAX_LINKS) - .collect(Collectors.toCollection(ArrayList::new)); - j.setLinks(links); - } - return j; - } - - @Override - public Encoder bufferEncoder() { - return Encoders.kryo(JoinedEntity.class); - } - - @Override - public Encoder outputEncoder() { - return Encoders.kryo(JoinedEntity.class); - } - } - - private static void createAdjacencyLists( - SparkSession spark, String inputPath, String outputPath) { - - log.info("Reading joined entities from: {}", inputPath); - spark - .read() - .load(inputPath) - .as(Encoders.bean(EntityRelEntity.class)) - .groupByKey( - (MapFunction) value -> value.getEntity().getId(), - Encoders.STRING()) - .mapGroups( - (MapGroupsFunction) (key, values) -> { - JoinedEntity j = new JoinedEntity(); - List links = new ArrayList<>(); - while (values.hasNext() && links.size() < MAX_LINKS) { - EntityRelEntity curr = values.next(); - if (j.getEntity() == null) { - j.setEntity(curr.getEntity()); - } - links.add(new Tuple2(curr.getRelation(), curr.getTarget())); - } - j.setLinks(links); - return j; - }, - Encoders.bean(JoinedEntity.class)) - .write() - .mode(SaveMode.Overwrite) - .parquet(outputPath); - } - - private static void createAdjacencyListsRDD( - SparkSession spark, String inputPath, String outputPath) { - - log.info("Reading joined entities from: {}", inputPath); - RDD joinedEntities = spark - .read() - .load(inputPath) - .as(Encoders.bean(EntityRelEntity.class)) - .javaRDD() - .mapToPair(re -> { - JoinedEntity je = new JoinedEntity(); - je.setEntity(re.getEntity()); - je.setLinks(Lists.newArrayList()); - if (re.getRelation() != null && re.getTarget() != null) { - je.getLinks().add(new Tuple2(re.getRelation(), re.getTarget())); - } - return new scala.Tuple2<>(re.getEntity().getId(), je); - }) - .reduceByKey((je1, je2) -> { - je1.getLinks().addAll(je2.getLinks()); - return je1; - }) - .map(t -> t._2()) - .rdd(); - - spark - .createDataset(joinedEntities, Encoders.bean(JoinedEntity.class)) - .write() - .mode(SaveMode.Overwrite) - .parquet(outputPath); } private static Seq toSeq(List list) { diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java index ccb20a136..4d2633bc5 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java @@ -2,7 +2,6 @@ package eu.dnetlib.dhp.oa.provision; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.*; import java.util.List; import java.util.Objects; @@ -23,8 +22,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity; +import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport; import eu.dnetlib.dhp.oa.provision.model.RelatedEntity; +import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper; import eu.dnetlib.dhp.oa.provision.model.SortableRelation; import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelSupport; @@ -91,7 +91,7 @@ public class CreateRelatedEntitiesJob_phase1 { SparkConf conf = new SparkConf(); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.registerKryoClasses(ModelSupport.getOafModelClasses()); + conf.registerKryoClasses(ProvisionModelSupport.getModelClasses()); runWithSparkSession( conf, @@ -120,7 +120,7 @@ public class CreateRelatedEntitiesJob_phase1 { .filter("dataInfo.invisible == false") .map( (MapFunction) value -> asRelatedEntity(value, clazz), - Encoders.bean(RelatedEntity.class)) + Encoders.kryo(RelatedEntity.class)) .map( (MapFunction>) e -> new Tuple2<>(e.getId(), e), Encoders.tuple(Encoders.STRING(), Encoders.kryo(RelatedEntity.class))) @@ -129,9 +129,9 @@ public class CreateRelatedEntitiesJob_phase1 { relsByTarget .joinWith(entities, entities.col("_1").equalTo(relsByTarget.col("_1")), "inner") .map( - (MapFunction, Tuple2>, EntityRelEntity>) t -> new EntityRelEntity( + (MapFunction, Tuple2>, RelatedEntityWrapper>) t -> new RelatedEntityWrapper( t._1()._2(), t._2()._2()), - Encoders.bean(EntityRelEntity.class)) + Encoders.kryo(RelatedEntityWrapper.class)) .write() .mode(SaveMode.Overwrite) .parquet(outputPath); diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java index 757ab47d3..5ef30d6e1 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java @@ -13,10 +13,9 @@ import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.*; import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.expressions.Aggregator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,9 +25,11 @@ import com.google.common.collect.Lists; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity; +import eu.dnetlib.dhp.oa.provision.model.JoinedEntity; import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport; +import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper; import eu.dnetlib.dhp.oa.provision.model.TypedRow; +import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import scala.Tuple2; @@ -75,7 +76,7 @@ public class CreateRelatedEntitiesJob_phase2 { .toString( PrepareRelationsJob.class .getResourceAsStream( - "/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase2.json")); + "/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase2.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); @@ -124,32 +125,84 @@ public class CreateRelatedEntitiesJob_phase2 { int numPartitions, Class entityClazz) { - Dataset> entity = readPathEntity(spark, entityPath, entityClazz); - Dataset> relatedEntities = readRelatedEntities( + Dataset> entities = readPathEntity(spark, entityPath, entityClazz); + Dataset> relatedEntities = readRelatedEntities( spark, relatedEntitiesPath, entityClazz); - entity - .joinWith(relatedEntities, entity.col("_1").equalTo(relatedEntities.col("_1")), "left_outer") - .map((MapFunction, Tuple2>, EntityRelEntity>) value -> { - EntityRelEntity re = new EntityRelEntity(); - re.setEntity(getTypedRow(entityClazz.getCanonicalName().toLowerCase(), value._1()._2())); - Optional related = Optional.ofNullable(value._2()).map(Tuple2::_2); - if (related.isPresent()) { - re.setRelation(related.get().getRelation()); - re.setTarget(related.get().getTarget()); - } - return re; - }, Encoders.bean(EntityRelEntity.class)) - .repartition(numPartitions) - .filter( - (FilterFunction) value -> value.getEntity() != null - && StringUtils.isNotBlank(value.getEntity().getId())) + TypedColumn aggregator = new AdjacencyListAggregator().toColumn(); + + entities + .joinWith(relatedEntities, entities.col("_1").equalTo(relatedEntities.col("_1")), "left_outer") + .map((MapFunction, Tuple2>, JoinedEntity>) value -> { + JoinedEntity je = new JoinedEntity(value._1()._2()); + Optional + .ofNullable(value._2()) + .map(Tuple2::_2) + .ifPresent(r -> je.getLinks().add(r)); + return je; + }, Encoders.kryo(JoinedEntity.class)) + .filter(filterEmptyEntityFn()) + .groupByKey( + (MapFunction) value -> value.getEntity().getId(), + Encoders.STRING()) + .agg(aggregator) + .map( + (MapFunction, JoinedEntity>) value -> value._2(), + Encoders.kryo(JoinedEntity.class)) + .filter(filterEmptyEntityFn()) .write() .mode(SaveMode.Overwrite) .parquet(outputPath); } - private static Dataset> readRelatedEntities( + public static class AdjacencyListAggregator extends Aggregator { + + @Override + public JoinedEntity zero() { + return new JoinedEntity(); + } + + @Override + public JoinedEntity reduce(JoinedEntity b, JoinedEntity a) { + return mergeAndGet(b, a); + } + + private JoinedEntity mergeAndGet(JoinedEntity b, JoinedEntity a) { + b + .setEntity( + Optional + .ofNullable(a.getEntity()) + .orElse( + Optional + .ofNullable(b.getEntity()) + .orElse(null))); + b.getLinks().addAll(a.getLinks()); + return b; + } + + @Override + public JoinedEntity merge(JoinedEntity b, JoinedEntity a) { + return mergeAndGet(b, a); + } + + @Override + public JoinedEntity finish(JoinedEntity j) { + return j; + } + + @Override + public Encoder bufferEncoder() { + return Encoders.kryo(JoinedEntity.class); + } + + @Override + public Encoder outputEncoder() { + return Encoders.kryo(JoinedEntity.class); + } + + } + + private static Dataset> readRelatedEntities( SparkSession spark, String inputRelatedEntitiesPath, Class entityClazz) { log.info("Reading related entities from: {}", inputRelatedEntitiesPath); @@ -164,12 +217,12 @@ public class CreateRelatedEntitiesJob_phase2 { return spark .read() .load(toSeq(paths)) - .as(Encoders.bean(EntityRelEntity.class)) - .filter((FilterFunction) e -> e.getRelation().getSource().startsWith(idPrefix)) + .as(Encoders.kryo(RelatedEntityWrapper.class)) + .filter((FilterFunction) e -> e.getRelation().getSource().startsWith(idPrefix)) .map( - (MapFunction>) value -> new Tuple2<>( + (MapFunction>) value -> new Tuple2<>( value.getRelation().getSource(), value), - Encoders.tuple(Encoders.STRING(), Encoders.kryo(EntityRelEntity.class))); + Encoders.tuple(Encoders.STRING(), Encoders.kryo(RelatedEntityWrapper.class))); } private static Dataset> readPathEntity( @@ -250,6 +303,14 @@ public class CreateRelatedEntitiesJob_phase2 { .anyMatch(c -> "orcid".equals(c.toLowerCase())); } + private static FilterFunction filterEmptyEntityFn() { + return (FilterFunction) v -> Objects.nonNull(v.getEntity()); + /* + * return (FilterFunction) v -> Optional .ofNullable(v.getEntity()) .map(e -> + * StringUtils.isNotBlank(e.getId())) .orElse(false); + */ + } + private static TypedRow getTypedRow(String type, OafEntity entity) throws JsonProcessingException { TypedRow t = new TypedRow(); diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java index a88b28592..a1ed7fd2a 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java @@ -4,6 +4,7 @@ package eu.dnetlib.dhp.oa.provision; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; @@ -32,6 +33,8 @@ import eu.dnetlib.dhp.oa.provision.utils.ContextMapper; import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory; import eu.dnetlib.dhp.schema.oaf.*; import scala.Tuple2; +import scala.collection.JavaConverters; +import scala.collection.Seq; /** * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. The @@ -89,6 +92,8 @@ public class XmlConverterJob { log.info("otherDsTypeId: {}", otherDsTypeId); SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ProvisionModelSupport.getModelClasses()); runWithSparkSession( conf, @@ -114,26 +119,18 @@ public class XmlConverterJob { schemaLocation, otherDsTypeId); + final List paths = HdfsSupport + .listFiles(inputPath, spark.sparkContext().hadoopConfiguration()); + + log.info("Found paths: {}", String.join(",", paths)); + spark .read() - .load(inputPath) - .as(Encoders.bean(JoinedEntity.class)) + .load(toSeq(paths)) + .as(Encoders.kryo(JoinedEntity.class)) .map( - (MapFunction) j -> { - if (j.getLinks() != null) { - j - .setLinks( - j - .getLinks() - .stream() - .filter(t -> t.getRelation() != null & t.getRelatedEntity() != null) - .collect(Collectors.toCollection(ArrayList::new))); - } - return j; - }, - Encoders.bean(JoinedEntity.class)) - .map( - (MapFunction>) je -> new Tuple2<>(je.getEntity().getId(), + (MapFunction>) je -> new Tuple2<>( + je.getEntity().getId(), recordFactory.build(je)), Encoders.tuple(Encoders.STRING(), Encoders.STRING())) .javaRDD() @@ -148,6 +145,10 @@ public class XmlConverterJob { HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } + private static Seq toSeq(List list) { + return JavaConverters.asScalaIteratorConverter(list.iterator()).asScala().toSeq(); + } + private static Map prepareAccumulators(SparkContext sc) { Map accumulators = Maps.newHashMap(); accumulators diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java index 7681fa76f..2eb9cf38b 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java @@ -3,30 +3,39 @@ package eu.dnetlib.dhp.oa.provision.model; import java.io.Serializable; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; -public class JoinedEntity implements Serializable { +import eu.dnetlib.dhp.schema.oaf.OafEntity; - private TypedRow entity; +public class JoinedEntity implements Serializable { - private List links = new ArrayList<>(); + private E entity; + + private List links; public JoinedEntity() { + links = new LinkedList<>(); } - public TypedRow getEntity() { - return entity; - } - - public void setEntity(TypedRow entity) { + public JoinedEntity(E entity) { + this(); this.entity = entity; } - public List getLinks() { + public E getEntity() { + return entity; + } + + public void setEntity(E entity) { + this.entity = entity; + } + + public List getLinks() { return links; } - public void setLinks(List links) { + public void setLinks(List links) { this.links = links; } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java index 3cccce7c4..f9fde14e5 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java @@ -16,10 +16,9 @@ public class ProvisionModelSupport { Lists .newArrayList( TypedRow.class, - EntityRelEntity.class, + RelatedEntityWrapper.class, JoinedEntity.class, RelatedEntity.class, - Tuple2.class, SortableRelation.class)); return modelClasses.toArray(new Class[] {}); } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/EntityRelEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper.java similarity index 56% rename from dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/EntityRelEntity.java rename to dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper.java index a6b3c5591..d708b6ed0 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/EntityRelEntity.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper.java @@ -5,33 +5,23 @@ import java.io.Serializable; import com.google.common.base.Objects; -public class EntityRelEntity implements Serializable { +public class RelatedEntityWrapper implements Serializable { - private TypedRow entity; private SortableRelation relation; private RelatedEntity target; - public EntityRelEntity() { + public RelatedEntityWrapper() { } - public EntityRelEntity(SortableRelation relation, RelatedEntity target) { + public RelatedEntityWrapper(SortableRelation relation, RelatedEntity target) { this(null, relation, target); } - public EntityRelEntity(TypedRow entity, SortableRelation relation, RelatedEntity target) { - this.entity = entity; + public RelatedEntityWrapper(TypedRow entity, SortableRelation relation, RelatedEntity target) { this.relation = relation; this.target = target; } - public TypedRow getEntity() { - return entity; - } - - public void setEntity(TypedRow entity) { - this.entity = entity; - } - public SortableRelation getRelation() { return relation; } @@ -54,14 +44,13 @@ public class EntityRelEntity implements Serializable { return true; if (o == null || getClass() != o.getClass()) return false; - EntityRelEntity that = (EntityRelEntity) o; - return Objects.equal(entity, that.entity) - && Objects.equal(relation, that.relation) + RelatedEntityWrapper that = (RelatedEntityWrapper) o; + return Objects.equal(relation, that.relation) && Objects.equal(target, that.target); } @Override public int hashCode() { - return Objects.hashCode(entity, relation, target); + return Objects.hashCode(relation, target); } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Tuple2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Tuple2.java deleted file mode 100644 index 5ebe9c9eb..000000000 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Tuple2.java +++ /dev/null @@ -1,53 +0,0 @@ - -package eu.dnetlib.dhp.oa.provision.model; - -import java.io.Serializable; -import java.util.Objects; - -import eu.dnetlib.dhp.schema.oaf.Relation; - -public class Tuple2 implements Serializable { - - private Relation relation; - - private RelatedEntity relatedEntity; - - public Tuple2() { - } - - public Tuple2(Relation relation, RelatedEntity relatedEntity) { - this.relation = relation; - this.relatedEntity = relatedEntity; - } - - public Relation getRelation() { - return relation; - } - - public void setRelation(Relation relation) { - this.relation = relation; - } - - public RelatedEntity getRelatedEntity() { - return relatedEntity; - } - - public void setRelatedEntity(RelatedEntity relatedEntity) { - this.relatedEntity = relatedEntity; - } - - @Override - public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - Tuple2 t2 = (Tuple2) o; - return getRelation().equals(t2.getRelation()); - } - - @Override - public int hashCode() { - return Objects.hash(getRelation().hashCode()); - } -} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java index f99298130..d950a816d 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java @@ -85,17 +85,19 @@ public class XmlRecordFactory implements Serializable { final Set contexts = Sets.newHashSet(); - final OafEntity entity = toOafEntity(je.getEntity()); + // final OafEntity entity = toOafEntity(je.getEntity()); + OafEntity entity = je.getEntity(); TemplateFactory templateFactory = new TemplateFactory(); try { - final EntityType type = EntityType.valueOf(je.getEntity().getType()); + + final EntityType type = EntityType.fromClass(entity.getClass()); final List metadata = metadata(type, entity, contexts); // rels has to be processed before the contexts because they enrich the contextMap with // the // funding info. - final List relations = je - .getLinks() + final List links = je.getLinks(); + final List relations = links .stream() .filter(link -> !isDuplicate(link)) .map(link -> mapRelation(contexts, templateFactory, type, link)) @@ -975,10 +977,10 @@ public class XmlRecordFactory implements Serializable { metadata.add(XmlSerializationUtils.mapQualifier("datasourcetypeui", dsType)); } - private List mapFields(Tuple2 link, Set contexts) { + private List mapFields(RelatedEntityWrapper link, Set contexts) { final Relation rel = link.getRelation(); - final RelatedEntity re = link.getRelatedEntity(); - final String targetType = link.getRelatedEntity().getType(); + final RelatedEntity re = link.getTarget(); + final String targetType = link.getTarget().getType(); final List metadata = Lists.newArrayList(); switch (EntityType.valueOf(targetType)) { @@ -1089,9 +1091,10 @@ public class XmlRecordFactory implements Serializable { return metadata; } - private String mapRelation(Set contexts, TemplateFactory templateFactory, EntityType type, Tuple2 link) { + private String mapRelation(Set contexts, TemplateFactory templateFactory, EntityType type, + RelatedEntityWrapper link) { final Relation rel = link.getRelation(); - final String targetType = link.getRelatedEntity().getType(); + final String targetType = link.getTarget().getType(); final String scheme = ModelSupport.getScheme(type.toString(), targetType); if (StringUtils.isBlank(scheme)) { @@ -1107,18 +1110,18 @@ public class XmlRecordFactory implements Serializable { private List listChildren( final OafEntity entity, JoinedEntity je, TemplateFactory templateFactory) { - EntityType entityType = EntityType.valueOf(je.getEntity().getType()); + final EntityType entityType = EntityType.fromClass(je.getEntity().getClass()); - List children = je - .getLinks() + final List links = je.getLinks(); + List children = links .stream() .filter(link -> isDuplicate(link)) .map(link -> { - final String targetType = link.getRelatedEntity().getType(); + final String targetType = link.getTarget().getType(); final String name = ModelSupport.getMainType(EntityType.valueOf(targetType)); final HashSet fields = Sets.newHashSet(mapFields(link, null)); return templateFactory - .getChild(name, link.getRelatedEntity().getId(), Lists.newArrayList(fields)); + .getChild(name, link.getTarget().getId(), Lists.newArrayList(fields)); }) .collect(Collectors.toCollection(ArrayList::new)); @@ -1227,7 +1230,7 @@ public class XmlRecordFactory implements Serializable { return children; } - private boolean isDuplicate(Tuple2 link) { + private boolean isDuplicate(RelatedEntityWrapper link) { return REL_SUBTYPE_DEDUP.equalsIgnoreCase(link.getRelation().getSubRelType()); } diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml index 15d352790..0d5121cf1 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml @@ -104,7 +104,6 @@ ${wf:conf('resumeFrom') eq 'prepare_relations'} ${wf:conf('resumeFrom') eq 'fork_join_related_entities'} ${wf:conf('resumeFrom') eq 'fork_join_all_entities'} - ${wf:conf('resumeFrom') eq 'adjancency_lists'} ${wf:conf('resumeFrom') eq 'convert_to_xml'} ${wf:conf('resumeFrom') eq 'to_solr_index'} @@ -373,7 +372,7 @@ --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication --inputRelatedEntitiesPath${workingDir}/join_partial --outputPath${workingDir}/join_entities/publication - --numPartitions35000 + --numPartitions30000 @@ -394,7 +393,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=15360 + --conf spark.sql.shuffle.partitions=7680 --conf spark.network.timeout=${sparkNetworkTimeout} --inputEntityPath${inputGraphRootPath}/dataset @@ -422,7 +421,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=15360 + --conf spark.sql.shuffle.partitions=7680 --conf spark.network.timeout=${sparkNetworkTimeout} --inputEntityPath${inputGraphRootPath}/otherresearchproduct @@ -450,7 +449,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=15360 + --conf spark.sql.shuffle.partitions=3840 --conf spark.network.timeout=${sparkNetworkTimeout} --inputEntityPath${inputGraphRootPath}/software @@ -478,7 +477,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=15360 + --conf spark.sql.shuffle.partitions=7680 --conf spark.network.timeout=${sparkNetworkTimeout} --inputEntityPath${inputGraphRootPath}/datasource @@ -506,7 +505,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=15360 + --conf spark.sql.shuffle.partitions=7680 --conf spark.network.timeout=${sparkNetworkTimeout} --inputEntityPath${inputGraphRootPath}/organization @@ -534,7 +533,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=15360 + --conf spark.sql.shuffle.partitions=3840 --conf spark.network.timeout=${sparkNetworkTimeout} --inputEntityPath${inputGraphRootPath}/project @@ -547,32 +546,7 @@ - - - - - yarn - cluster - build_adjacency_lists - eu.dnetlib.dhp.oa.provision.AdjacencyListBuilderJob - dhp-graph-provision-${projectVersion}.jar - - --executor-cores=${sparkExecutorCoresForJoining} - --executor-memory=${sparkExecutorMemoryForJoining} - --driver-memory=${sparkDriverMemoryForJoining} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=15360 - --conf spark.network.timeout=${sparkNetworkTimeout} - - --inputPath${workingDir}/join_entities - --outputPath${workingDir}/joined - - - - + @@ -592,7 +566,7 @@ --conf spark.sql.shuffle.partitions=3840 --conf spark.network.timeout=${sparkNetworkTimeout} - --inputPath${workingDir}/joined + --inputPath${workingDir}/join_entities --outputPath${workingDir}/xml --isLookupUrl${isLookupUrl} --otherDsTypeId${otherDsTypeId} @@ -622,7 +596,7 @@ --conf spark.hadoop.mapreduce.reduce.speculative=false --inputPath${workingDir}/xml - --isLookupUrl ${isLookupUrl} + --isLookupUrl${isLookupUrl} --format${format} --batchSize${batchSize} From 94533b71bcb583c8d96163e1438ea16532d7166b Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 8 Jun 2020 15:01:21 +0200 Subject: [PATCH 14/21] added comments for model fields removal --- .../src/main/java/eu/dnetlib/dhp/schema/oaf/Dataset.java | 1 + .../src/main/java/eu/dnetlib/dhp/schema/oaf/Software.java | 2 ++ 2 files changed, 3 insertions(+) diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Dataset.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Dataset.java index 07ddbb00e..b5587c6b7 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Dataset.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Dataset.java @@ -10,6 +10,7 @@ public class Dataset extends Result implements Serializable { private Field storagedate; + // candidate for removal private Field device; private Field size; diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Software.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Software.java index 40332bf53..d25b5c9ce 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Software.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Software.java @@ -10,8 +10,10 @@ public class Software extends Result implements Serializable { private List> documentationUrl; + // candidate for removal private List license; + // candidate for removal private Field codeRepositoryUrl; private Qualifier programmingLanguage; From 81e85465d85e04efb92d27b70774abeb9580e755 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Mon, 8 Jun 2020 16:26:16 +0200 Subject: [PATCH 15/21] join simrels --- .../broker/oa/GenerateEventsApplication.java | 74 +++++++++---------- .../dhp/broker/oa/util/EventGroup.java | 32 ++++++++ .../dhp/broker/oa/util/ResultAggregator.java | 50 +++++++++++++ .../dhp/broker/oa/util/ResultGroup.java | 35 +++++++++ 4 files changed, 153 insertions(+), 38 deletions(-) create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventGroup.java create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ResultAggregator.java create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ResultGroup.java diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java index fede6f8bf..05fab47f0 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java @@ -12,16 +12,13 @@ import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.tuple.Pair; -import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Column; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.TypedColumn; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,6 +52,9 @@ import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreOpenAccess; import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMorePid; import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreSubject; import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; +import eu.dnetlib.dhp.broker.oa.util.EventGroup; +import eu.dnetlib.dhp.broker.oa.util.ResultAggregator; +import eu.dnetlib.dhp.broker.oa.util.ResultGroup; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.schema.oaf.OafEntity; @@ -63,6 +63,7 @@ import eu.dnetlib.dhp.schema.oaf.Publication; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Software; +import scala.Tuple2; public class GenerateEventsApplication { @@ -130,20 +131,20 @@ public class GenerateEventsApplication { final SparkConf conf = new SparkConf(); runWithSparkSession(conf, isSparkSessionManaged, spark -> { - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + removeOutputDir(spark, eventsPath); - final JavaRDD eventsRdd = sc.emptyRDD(); + final Dataset all = spark.emptyDataset(Encoders.kryo(Event.class)); for (final Class r1 : BrokerConstants.RESULT_CLASSES) { - eventsRdd.union(generateSimpleEvents(spark, graphPath, r1)); + all.union(generateSimpleEvents(spark, graphPath, r1)); for (final Class r2 : BrokerConstants.RESULT_CLASSES) { - eventsRdd.union(generateRelationEvents(spark, graphPath, r1, r2)); + all.union(generateRelationEvents(spark, graphPath, r1, r2)); } } - eventsRdd.saveAsTextFile(eventsPath, GzipCodec.class); + all.write().mode(SaveMode.Overwrite).json(eventsPath); }); } @@ -152,51 +153,48 @@ public class GenerateEventsApplication { HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } - private static JavaRDD generateSimpleEvents(final SparkSession spark, + private static Dataset generateSimpleEvents(final SparkSession spark, final String graphPath, final Class resultClazz) { - final Dataset results = readPath(spark, graphPath + "/" + resultClazz.getSimpleName().toLowerCase(), resultClazz) + final Dataset results = readPath(spark, graphPath + "/" + resultClazz.getSimpleName().toLowerCase(), Result.class) .filter(r -> r.getDataInfo().getDeletedbyinference()); final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class) .filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)); - final Column c = null; // TODO - - final Dataset aa = results - .joinWith(rels, results.col("id").equalTo(rels.col("source")), "inner") - .groupBy(rels.col("target")) - .agg(c) - .filter(x -> x.size() > 1) - // generateSimpleEvents(...) - // flatMap() - // toRdd() - ; - - return null; + final TypedColumn, ResultGroup> aggr = new ResultAggregator().toColumn(); + return results.joinWith(rels, results.col("id").equalTo(rels.col("source")), "inner") + .groupByKey((MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING()) + .agg(aggr) + .map((MapFunction, ResultGroup>) t -> t._2, Encoders.kryo(ResultGroup.class)) + .filter(ResultGroup::isValid) + .map((MapFunction) g -> GenerateEventsApplication.generateSimpleEvents(g), Encoders.kryo(EventGroup.class)) + .flatMap(group -> group.getData().iterator(), Encoders.kryo(Event.class)); } - private List generateSimpleEvents(final Collection children) { + private static EventGroup generateSimpleEvents(final ResultGroup results) { final List> list = new ArrayList<>(); - for (final Result target : children) { - list.addAll(enrichMissingAbstract.searchUpdatesForRecord(target, children)); - list.addAll(enrichMissingAuthorOrcid.searchUpdatesForRecord(target, children)); - list.addAll(enrichMissingOpenAccess.searchUpdatesForRecord(target, children)); - list.addAll(enrichMissingPid.searchUpdatesForRecord(target, children)); - list.addAll(enrichMissingPublicationDate.searchUpdatesForRecord(target, children)); - list.addAll(enrichMissingSubject.searchUpdatesForRecord(target, children)); - list.addAll(enrichMoreOpenAccess.searchUpdatesForRecord(target, children)); - list.addAll(enrichMorePid.searchUpdatesForRecord(target, children)); - list.addAll(enrichMoreSubject.searchUpdatesForRecord(target, children)); + for (final Result target : results.getData()) { + list.addAll(enrichMissingAbstract.searchUpdatesForRecord(target, results.getData())); + list.addAll(enrichMissingAuthorOrcid.searchUpdatesForRecord(target, results.getData())); + list.addAll(enrichMissingOpenAccess.searchUpdatesForRecord(target, results.getData())); + list.addAll(enrichMissingPid.searchUpdatesForRecord(target, results.getData())); + list.addAll(enrichMissingPublicationDate.searchUpdatesForRecord(target, results.getData())); + list.addAll(enrichMissingSubject.searchUpdatesForRecord(target, results.getData())); + list.addAll(enrichMoreOpenAccess.searchUpdatesForRecord(target, results.getData())); + list.addAll(enrichMorePid.searchUpdatesForRecord(target, results.getData())); + list.addAll(enrichMoreSubject.searchUpdatesForRecord(target, results.getData())); } - return list.stream().map(EventFactory::newBrokerEvent).collect(Collectors.toList()); + final EventGroup events = new EventGroup(); + list.stream().map(EventFactory::newBrokerEvent).forEach(events::addElement); + return events; } - private static JavaRDD generateRelationEvents(final SparkSession spark, + private static Dataset generateRelationEvents(final SparkSession spark, final String graphPath, final Class sourceClass, final Class targetClass) { diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventGroup.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventGroup.java new file mode 100644 index 000000000..9c7081c79 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventGroup.java @@ -0,0 +1,32 @@ +package eu.dnetlib.dhp.broker.oa.util; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import eu.dnetlib.dhp.broker.model.Event; + +public class EventGroup implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 765977943803533130L; + + private final List data = new ArrayList<>(); + + public List getData() { + return data; + } + + public EventGroup addElement(final Event elem) { + data.add(elem); + return this; + } + + public EventGroup addGroup(final EventGroup group) { + data.addAll(group.getData()); + return this; + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ResultAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ResultAggregator.java new file mode 100644 index 000000000..94685eeae --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ResultAggregator.java @@ -0,0 +1,50 @@ +package eu.dnetlib.dhp.broker.oa.util; + +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.expressions.Aggregator; + +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.Result; +import scala.Tuple2; + +public class ResultAggregator extends Aggregator, ResultGroup, ResultGroup> { + + /** + * + */ + private static final long serialVersionUID = -1492327874705585538L; + + @Override + public ResultGroup zero() { + return new ResultGroup(); + } + + @Override + public ResultGroup reduce(final ResultGroup group, final Tuple2 t) { + return group.addElement(t._1); + } + + @Override + public ResultGroup merge(final ResultGroup g1, final ResultGroup g2) { + return g1.addGroup(g2); + } + + @Override + public ResultGroup finish(final ResultGroup group) { + return group; + } + + @Override + public Encoder bufferEncoder() { + return Encoders.kryo(ResultGroup.class); + + } + + @Override + public Encoder outputEncoder() { + return Encoders.kryo(ResultGroup.class); + + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ResultGroup.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ResultGroup.java new file mode 100644 index 000000000..8fe7a5939 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ResultGroup.java @@ -0,0 +1,35 @@ +package eu.dnetlib.dhp.broker.oa.util; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import eu.dnetlib.dhp.schema.oaf.Result; + +public class ResultGroup implements Serializable { + + /** + * + */ + private static final long serialVersionUID = -3360828477088669296L; + + private final List data = new ArrayList<>(); + + public List getData() { + return data; + } + + public ResultGroup addElement(final Result elem) { + data.add(elem); + return this; + } + + public ResultGroup addGroup(final ResultGroup group) { + data.addAll(group.getData()); + return this; + } + + public boolean isValid() { + return data.size() > 1; + } +} From 16cb073b15e065b763abe37ac9d621b1f7d2355b Mon Sep 17 00:00:00 2001 From: Alessia Bardi Date: Mon, 8 Jun 2020 19:06:03 +0200 Subject: [PATCH 16/21] set the instance datepfacceptance with the Crossref createdDate in case the issuedDate is blank --- .../main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala index cc2c9d586..85997fa36 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala @@ -179,6 +179,9 @@ case object Crossref2Oaf { if (StringUtils.isNotBlank(issuedDate)) { instance.setDateofacceptance(asField(issuedDate)) } + else { + instance.setDateofacceptance(asField(createdDate.getValue)) + } val s: String = (json \ "URL").extract[String] val links: List[String] = ((for {JString(url) <- json \ "link" \ "URL"} yield url) ::: List(s)).filter(p => p != null).distinct if (links.nonEmpty) From 9fd25887f7dfd8033fb3e0c7e9f9b2d6c482a43b Mon Sep 17 00:00:00 2001 From: Alessia Bardi Date: Mon, 8 Jun 2020 19:32:24 +0200 Subject: [PATCH 17/21] Result identifiers all start with 50| --- .../java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala index 90bfacdc9..7b21ecda2 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala @@ -319,15 +319,7 @@ object DoiBoostMappingUtil { def generateIdentifier (oaf: Result, doi: String): String = { val id = DHPUtils.md5 (doi.toLowerCase) - if (oaf.isInstanceOf[Dataset] ) - return s"60|${ - doiBoostNSPREFIX - }${ - SEPARATOR - }${ - id - }" - s"50|${ + return s"50|${ doiBoostNSPREFIX }${ SEPARATOR From 181f52b9bccfdb1c13858e0eeebfa85d63911c5c Mon Sep 17 00:00:00 2001 From: Alessia Bardi Date: Mon, 8 Jun 2020 19:33:47 +0200 Subject: [PATCH 18/21] Added mapping table for Crossref --- .../crossref_mapping.csv | 60 +++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu.dnetlib.dhp.doiboost.mappings/crossref_mapping.csv diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu.dnetlib.dhp.doiboost.mappings/crossref_mapping.csv b/dhp-workflows/dhp-doiboost/src/main/resources/eu.dnetlib.dhp.doiboost.mappings/crossref_mapping.csv new file mode 100644 index 000000000..d87ae5da6 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu.dnetlib.dhp.doiboost.mappings/crossref_mapping.csv @@ -0,0 +1,60 @@ +Crossref Field,Type,Required,Description (from Crossref),OAF field,Comments +publisher,String,Yes,Name of work's publisher,Result/Publisher, +title,Array of String,Yes,"Work titles, including translated titles","Result/Title with Qualifier(""main title"", ""dnet:dataCite_title"")", +original-title,Array of String,No,Work titles in the work's original publication language,"Result/Title with Qualifier(""alternative title"", ""dnet:dataCite_title"")", +short-title,Array of String,No,Short or abbreviated work titles,"Result/Title with Qualifier(""alternative title"", ""dnet:dataCite_title"")", +abstract,XML String,No,Abstract as a JSON string or a JATS XML snippet encoded into a JSON string,Result/description, +reference-count,Number,Yes,Deprecated Same as references-count,"- ", +references-count,Number,Yes,Count of outbound references deposited with Crossref,N/A, +is-referenced-by-count,Number,Yes,Count of inbound references deposited with Crossref,N/A, +source,String,Yes,Currently always Crossref,Result/source, +prefix,String,Yes,DOI prefix identifier of the form http://id.crossref.org/prefix/DOI_PREFIX,N/A, +DOI,String,Yes,DOI of the work,OafEntity/originalId, +,,,,OafEntity/PID, +,,,,"Oaf/id ",Use to generate the OpenAIRE id in the form 50|doiboost____::md5(DOI) +URL,URL,Yes,URL form of the work's DOI,Instance/url, +member,String,Yes,Member identifier of the form http://id.crossref.org/member/MEMBER_ID,N/A, +type,String,Yes,"Enumeration, one of the type ids from https://api.crossref.org/v1/types",Instance/instancetype,Also use to map the record as OAF Publication or Dataset according to the mapping defined in eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala +created,Date,Yes,Date on which the DOI was first registered,"Result/relevantDate with Qualifier(""created"", ""dnet:dataCite_date"")", +,,,,"Result/dateofacceptance +Instance/dateofacceptance",If crossref.issued is blank +deposited,Date,Yes,Date on which the work metadata was most recently updated,N/A, +indexed,Date,Yes,"Date on which the work metadata was most recently indexed. Re-indexing does not imply a metadata change, see deposited for the most recent metadata change date",Result/lastupdatetimestamp, +issued,Partial Date,Yes,Earliest of published-print and published-online,Result/dateofacceptance,OAF dateofacceptance is used also for the publishing date. It's the date visualised in the OpenAIRE EXPLORE portal. +,,,,Instance/dateofacceptance, +posted,Partial Date,No,Date on which posted content was made available online,"Result/relevantDate with Qualifier(""available"", ""dnet:dataCite_date"")", +accepted,Partial Date,No,"Date on which a work was accepted, after being submitted, during a submission process","Result/relevantDate with Qualifier(""accepted"", ""dnet:dataCite_date"")", +subtitle,Array of String,No,"Work subtitles, including original language and translated","Result/Title with Qualifier(""subtitle"", ""dnet:dataCite_title"")", +container-title,Array of String,No,Full titles of the containing work (usually a book or journal),Publication/Journal/name only in case of Journal title for book title see ISBN Mapping, +short-container-title,Array of String,No,Abbreviated titles of the containing work,N/A, +group-title,String,No,Group title for posted content,N/A, +issue,String,No,Issue number of an article's journal,Publication/Journal/iss, +volume,String,No,Volume number of an article's journal,Publication/Journal/vol, +page,String,No,Pages numbers of an article within its journal,"Publication/Journal/sp +Publication/Journal/ep",Obtain start and end page by splitting by '-' +article-number,String,No,,N/A, +published-print,Partial Date,No,Date on which the work was published in print,"Result/relevantDate with Qualifier(""published-print"", ""dnet:dataCite_date"")", +published-online,Partial Date,No,Date on which the work was published online,"Result/relevantDate with Qualifier(""published-online"", ""dnet:dataCite_date"")", +subject,Array of String,No,"Subject category names, a controlled vocabulary from Sci-Val. Available for most journal articles","Result/subject with Qualifier(""keywords"", ""dnet:subject_classification_typologies""). ","Future improvements: map the controlled vocabulary instead of using the generic ""keywords"" qualifier" +ISSN,Array of String,No,,"Publication/Journal/issn +Publication/Journal/lissn +Publication/Journal/eissn",The mapping depends on the value of issn-type +issn-type,Array of ISSN with Type,No,List of ISSNs with ISSN type information,N/A,Its value guides the setting of the properties in Journal (see row above) +ISBN,Array of String,No,,Publication/source,"In case of Book We can map ISBN and container title on Publication/source using this syntax container-title + ""ISBN: "" + ISBN" +archive,Array of String,No,,N/A, +license,Array of License,No,,Result/Instance/License, +funder,Array of Funder,No,,Relation,Whenever we are able to link to a funder or project integrated into OpenAIRE. Mapping to OpenAIRE funders and projects is in eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala.generateSimpleRelationFromAward +assertion,Array of Assertion,No,,N/A, +author,Array of Contributor,No,,Result/author (with orcid if available), +editor,Array of Contributor,No,,N/A, +chair,Array of Contributor,No,,N/A, +translator,Array of Contributor,No,,N/A, +update-to,Array of Update,No,,N/A, +update-policy,URL,No,Link to an update policy covering Crossmark updates for this work,N/A, +link,Array of Resource Link,No,URLs to full-text locations,Result/Instance/url, +clinical-trial-number,Array of Clinical Trial Number,No,,OafEntity/originalId, +alternative-id,String,No,Other identifiers for the work provided by the depositing member,OafEntity/originalId, +reference,Array of Reference,No,List of references made by the work,,Future improvement: map to references +content-domain,Content Domain,No,Information on domains that support Crossmark for this work,N/A, +relation,Relations,No,Relations to other works,Result/Instance/refereed,"if(relation.has-review) instance.refereed = ""peerReviewed"". " +review,Review,No,Peer review metadata,N/A, From b7cb1163eadc208129c2fbe669d50426a28d0af9 Mon Sep 17 00:00:00 2001 From: Alessia Bardi Date: Tue, 9 Jun 2020 10:39:11 +0200 Subject: [PATCH 19/21] identifiers always start with 50 --- .../src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala index 1c6e1b0e6..b97fb739c 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala @@ -190,7 +190,7 @@ case object ConversionUtil { pub.setPid(List(createSP(paper.Doi.toLowerCase, "doi", PID_TYPES)).asJava) pub.setOriginalId(List(paper.PaperId.toString, paper.Doi.toLowerCase).asJava) - //Set identifier as {50|60} | doiboost____::md5(DOI) + //Set identifier as 50|doiboost____::md5(DOI) pub.setId(generateIdentifier(pub, paper.Doi.toLowerCase)) val mainTitles = createSP(paper.PaperTitle, "main title", "dnet:dataCite_title") @@ -247,7 +247,7 @@ case object ConversionUtil { pub.setPid(List(createSP(paper.Doi.toLowerCase, "doi", PID_TYPES)).asJava) pub.setOriginalId(List(paper.PaperId.toString, paper.Doi.toLowerCase).asJava) - //Set identifier as {50|60} | doiboost____::md5(DOI) + //Set identifier as 50 | doiboost____::md5(DOI) pub.setId(generateIdentifier(pub, paper.Doi.toLowerCase)) val mainTitles = createSP(paper.PaperTitle, "main title", "dnet:dataCite_title") From f072125152ff9dd4c6f98400653571bbe620119a Mon Sep 17 00:00:00 2001 From: Alessia Bardi Date: Tue, 9 Jun 2020 14:32:10 +0200 Subject: [PATCH 20/21] map volume and issue in journal information from MAG --- .../src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala index b97fb739c..58516202b 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala @@ -229,6 +229,8 @@ case object ConversionUtil { pub.setPublisher(asField(journal.Publisher.get)) if (journal.Issn.isDefined) j.setIssnPrinted(journal.Issn.get) + j.setVol(paper.Volume) + j.setIss(paper.Issue) pub.setJournal(j) } pub.setCollectedfrom(List(createMAGCollectedFrom()).asJava) From d6de406e1140490a4ad6c4bd8e6dea42f0214514 Mon Sep 17 00:00:00 2001 From: Alessia Bardi Date: Tue, 9 Jun 2020 14:43:34 +0200 Subject: [PATCH 21/21] fixed classid for subjects --- .../main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala index 58516202b..2419f86a3 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala @@ -129,16 +129,16 @@ case object ConversionUtil { val fieldOfStudy = item._2 if (fieldOfStudy != null && fieldOfStudy.subjects != null && fieldOfStudy.subjects.nonEmpty) { val p: List[StructuredProperty] = fieldOfStudy.subjects.flatMap(s => { - val s1 = createSP(s.DisplayName, "keywords", "dnet:subject_classification_typologies") + val s1 = createSP(s.DisplayName, "keyword", "dnet:subject_classification_typologies") val di = DoiBoostMappingUtil.generateDataInfo(s.Score.toString) var resList: List[StructuredProperty] = List(s1) if (s.MainType.isDefined) { val maintp = s.MainType.get - val s2 = createSP(s.MainType.get, "keywords", "dnet:subject_classification_typologies") + val s2 = createSP(s.MainType.get, "keyword", "dnet:subject_classification_typologies") s2.setDataInfo(di) resList = resList ::: List(s2) if (maintp.contains(".")) { - val s3 = createSP(maintp.split("\\.").head, "keywords", "dnet:subject_classification_typologies") + val s3 = createSP(maintp.split("\\.").head, "keyword", "dnet:subject_classification_typologies") s3.setDataInfo(di) resList = resList ::: List(s3) }