From 4c3836f62e3358d173163aef8de11ead52fcc707 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 10 Jul 2020 19:00:44 +0200 Subject: [PATCH] materialize the related entities before joining them --- .../CreateRelatedEntitiesJob_phase1.java | 25 ++++++++++++++++--- .../CreateRelatedEntitiesJob_phase2.java | 17 +++++-------- .../dhp/oa/provision/ProvisionConstants.java | 14 +++++++++++ 3 files changed, 42 insertions(+), 14 deletions(-) create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/ProvisionConstants.java diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java index 80b800017..57dca7bb1 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java @@ -9,6 +9,7 @@ import java.util.Optional; import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; @@ -115,11 +116,21 @@ public class CreateRelatedEntitiesJob_phase1 { Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class))) .cache(); - Dataset> entities = readPathEntity(spark, inputEntityPath, clazz) + final String relatedEntityPath = outputPath + "_relatedEntity"; + readPathEntity(spark, inputEntityPath, clazz) .filter("dataInfo.invisible == false") .map( (MapFunction) value -> asRelatedEntity(value, clazz), Encoders.kryo(RelatedEntity.class)) + .repartition(5000) + .write() + .mode(SaveMode.Overwrite) + .parquet(relatedEntityPath); + + Dataset> entities = spark + .read() + .load(relatedEntityPath) + .as(Encoders.kryo(RelatedEntity.class)) .map( (MapFunction>) e -> new Tuple2<>(e.getId(), e), Encoders.tuple(Encoders.STRING(), Encoders.kryo(RelatedEntity.class))) @@ -165,13 +176,21 @@ public class CreateRelatedEntitiesJob_phase1 { Result result = (Result) entity; if (result.getTitle() != null && !result.getTitle().isEmpty()) { - re.setTitle(result.getTitle().stream().findFirst().get()); + final StructuredProperty title = result.getTitle().stream().findFirst().get(); + title.setValue(StringUtils.left(title.getValue(), ProvisionConstants.MAX_TITLE_LENGTH)); + re.setTitle(title); } re.setDateofacceptance(getValue(result.getDateofacceptance())); re.setPublisher(getValue(result.getPublisher())); re.setResulttype(result.getResulttype()); - re.setInstances(result.getInstance()); + re + .setInstances( + result + .getInstance() + .stream() + .limit(ProvisionConstants.MAX_INSTANCES) + .collect(Collectors.toList())); // TODO still to be mapped // re.setCodeRepositoryUrl(j.read("$.coderepositoryurl")); diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java index bfcc648a3..7e175121e 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java @@ -61,12 +61,6 @@ public class CreateRelatedEntitiesJob_phase2 { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final int MAX_EXTERNAL_ENTITIES = 50; - private static final int MAX_AUTHORS = 200; - private static final int MAX_AUTHOR_FULLNAME_LENGTH = 1000; - private static final int MAX_TITLE_LENGTH = 5000; - private static final int MAX_ABSTRACT_LENGTH = 100000; - public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils @@ -246,15 +240,15 @@ public class CreateRelatedEntitiesJob_phase2 { List refs = r .getExternalReference() .stream() - .limit(MAX_EXTERNAL_ENTITIES) + .limit(ProvisionConstants.MAX_EXTERNAL_ENTITIES) .collect(Collectors.toList()); r.setExternalReference(refs); } if (r.getAuthor() != null) { List authors = Lists.newArrayList(); for (Author a : r.getAuthor()) { - a.setFullname(StringUtils.left(a.getFullname(), MAX_AUTHOR_FULLNAME_LENGTH)); - if (authors.size() < MAX_AUTHORS || hasORCID(a)) { + a.setFullname(StringUtils.left(a.getFullname(), ProvisionConstants.MAX_AUTHOR_FULLNAME_LENGTH)); + if (authors.size() < ProvisionConstants.MAX_AUTHORS || hasORCID(a)) { authors.add(a); } } @@ -266,7 +260,7 @@ public class CreateRelatedEntitiesJob_phase2 { .stream() .filter(Objects::nonNull) .map(d -> { - d.setValue(StringUtils.left(d.getValue(), MAX_ABSTRACT_LENGTH)); + d.setValue(StringUtils.left(d.getValue(), ProvisionConstants.MAX_ABSTRACT_LENGTH)); return d; }) .collect(Collectors.toList()); @@ -278,9 +272,10 @@ public class CreateRelatedEntitiesJob_phase2 { .stream() .filter(Objects::nonNull) .map(t -> { - t.setValue(StringUtils.left(t.getValue(), MAX_TITLE_LENGTH)); + t.setValue(StringUtils.left(t.getValue(), ProvisionConstants.MAX_TITLE_LENGTH)); return t; }) + .limit(ProvisionConstants.MAX_TITLES) .collect(Collectors.toList()); r.setTitle(titles); } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/ProvisionConstants.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/ProvisionConstants.java new file mode 100644 index 000000000..9bc3706cd --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/ProvisionConstants.java @@ -0,0 +1,14 @@ + +package eu.dnetlib.dhp.oa.provision; + +public class ProvisionConstants { + + public static final int MAX_EXTERNAL_ENTITIES = 50; + public static final int MAX_AUTHORS = 200; + public static final int MAX_AUTHOR_FULLNAME_LENGTH = 1000; + public static final int MAX_TITLE_LENGTH = 5000; + public static final int MAX_TITLES = 10; + public static final int MAX_ABSTRACT_LENGTH = 100000; + public static final int MAX_INSTANCES = 10; + +}