1
0
Fork 0

materialize the related entities before joining them

This commit is contained in:
Claudio Atzori 2020-07-10 19:00:44 +02:00
parent b21866a2da
commit 4c3836f62e
3 changed files with 42 additions and 14 deletions

View File

@ -9,6 +9,7 @@ import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
@ -115,11 +116,21 @@ public class CreateRelatedEntitiesJob_phase1 {
Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class))) Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class)))
.cache(); .cache();
Dataset<Tuple2<String, RelatedEntity>> entities = readPathEntity(spark, inputEntityPath, clazz) final String relatedEntityPath = outputPath + "_relatedEntity";
readPathEntity(spark, inputEntityPath, clazz)
.filter("dataInfo.invisible == false") .filter("dataInfo.invisible == false")
.map( .map(
(MapFunction<E, RelatedEntity>) value -> asRelatedEntity(value, clazz), (MapFunction<E, RelatedEntity>) value -> asRelatedEntity(value, clazz),
Encoders.kryo(RelatedEntity.class)) Encoders.kryo(RelatedEntity.class))
.repartition(5000)
.write()
.mode(SaveMode.Overwrite)
.parquet(relatedEntityPath);
Dataset<Tuple2<String, RelatedEntity>> entities = spark
.read()
.load(relatedEntityPath)
.as(Encoders.kryo(RelatedEntity.class))
.map( .map(
(MapFunction<RelatedEntity, Tuple2<String, RelatedEntity>>) e -> new Tuple2<>(e.getId(), e), (MapFunction<RelatedEntity, Tuple2<String, RelatedEntity>>) e -> new Tuple2<>(e.getId(), e),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(RelatedEntity.class))) Encoders.tuple(Encoders.STRING(), Encoders.kryo(RelatedEntity.class)))
@ -165,13 +176,21 @@ public class CreateRelatedEntitiesJob_phase1 {
Result result = (Result) entity; Result result = (Result) entity;
if (result.getTitle() != null && !result.getTitle().isEmpty()) { if (result.getTitle() != null && !result.getTitle().isEmpty()) {
re.setTitle(result.getTitle().stream().findFirst().get()); final StructuredProperty title = result.getTitle().stream().findFirst().get();
title.setValue(StringUtils.left(title.getValue(), ProvisionConstants.MAX_TITLE_LENGTH));
re.setTitle(title);
} }
re.setDateofacceptance(getValue(result.getDateofacceptance())); re.setDateofacceptance(getValue(result.getDateofacceptance()));
re.setPublisher(getValue(result.getPublisher())); re.setPublisher(getValue(result.getPublisher()));
re.setResulttype(result.getResulttype()); re.setResulttype(result.getResulttype());
re.setInstances(result.getInstance()); re
.setInstances(
result
.getInstance()
.stream()
.limit(ProvisionConstants.MAX_INSTANCES)
.collect(Collectors.toList()));
// TODO still to be mapped // TODO still to be mapped
// re.setCodeRepositoryUrl(j.read("$.coderepositoryurl")); // re.setCodeRepositoryUrl(j.read("$.coderepositoryurl"));

View File

@ -61,12 +61,6 @@ public class CreateRelatedEntitiesJob_phase2 {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final int MAX_EXTERNAL_ENTITIES = 50;
private static final int MAX_AUTHORS = 200;
private static final int MAX_AUTHOR_FULLNAME_LENGTH = 1000;
private static final int MAX_TITLE_LENGTH = 5000;
private static final int MAX_ABSTRACT_LENGTH = 100000;
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils String jsonConfiguration = IOUtils
@ -246,15 +240,15 @@ public class CreateRelatedEntitiesJob_phase2 {
List<ExternalReference> refs = r List<ExternalReference> refs = r
.getExternalReference() .getExternalReference()
.stream() .stream()
.limit(MAX_EXTERNAL_ENTITIES) .limit(ProvisionConstants.MAX_EXTERNAL_ENTITIES)
.collect(Collectors.toList()); .collect(Collectors.toList());
r.setExternalReference(refs); r.setExternalReference(refs);
} }
if (r.getAuthor() != null) { if (r.getAuthor() != null) {
List<Author> authors = Lists.newArrayList(); List<Author> authors = Lists.newArrayList();
for (Author a : r.getAuthor()) { for (Author a : r.getAuthor()) {
a.setFullname(StringUtils.left(a.getFullname(), MAX_AUTHOR_FULLNAME_LENGTH)); a.setFullname(StringUtils.left(a.getFullname(), ProvisionConstants.MAX_AUTHOR_FULLNAME_LENGTH));
if (authors.size() < MAX_AUTHORS || hasORCID(a)) { if (authors.size() < ProvisionConstants.MAX_AUTHORS || hasORCID(a)) {
authors.add(a); authors.add(a);
} }
} }
@ -266,7 +260,7 @@ public class CreateRelatedEntitiesJob_phase2 {
.stream() .stream()
.filter(Objects::nonNull) .filter(Objects::nonNull)
.map(d -> { .map(d -> {
d.setValue(StringUtils.left(d.getValue(), MAX_ABSTRACT_LENGTH)); d.setValue(StringUtils.left(d.getValue(), ProvisionConstants.MAX_ABSTRACT_LENGTH));
return d; return d;
}) })
.collect(Collectors.toList()); .collect(Collectors.toList());
@ -278,9 +272,10 @@ public class CreateRelatedEntitiesJob_phase2 {
.stream() .stream()
.filter(Objects::nonNull) .filter(Objects::nonNull)
.map(t -> { .map(t -> {
t.setValue(StringUtils.left(t.getValue(), MAX_TITLE_LENGTH)); t.setValue(StringUtils.left(t.getValue(), ProvisionConstants.MAX_TITLE_LENGTH));
return t; return t;
}) })
.limit(ProvisionConstants.MAX_TITLES)
.collect(Collectors.toList()); .collect(Collectors.toList());
r.setTitle(titles); r.setTitle(titles);
} }

View File

@ -0,0 +1,14 @@
package eu.dnetlib.dhp.oa.provision;
public class ProvisionConstants {
public static final int MAX_EXTERNAL_ENTITIES = 50;
public static final int MAX_AUTHORS = 200;
public static final int MAX_AUTHOR_FULLNAME_LENGTH = 1000;
public static final int MAX_TITLE_LENGTH = 5000;
public static final int MAX_TITLES = 10;
public static final int MAX_ABSTRACT_LENGTH = 100000;
public static final int MAX_INSTANCES = 10;
}