diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/ModelHardLimits.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/ModelHardLimits.java index 36d138ba1..e4b184fa1 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/ModelHardLimits.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/ModelHardLimits.java @@ -1,6 +1,12 @@ package eu.dnetlib.dhp.schema.oaf.utils; +import java.util.Map; + +import com.google.common.collect.Maps; + +import eu.dnetlib.dhp.schema.common.ModelConstants; + public class ModelHardLimits { private ModelHardLimits() { @@ -19,6 +25,12 @@ public class ModelHardLimits { public static final int MAX_ABSTRACT_LENGTH = 150000; public static final int MAX_RELATED_ABSTRACT_LENGTH = 500; public static final int MAX_INSTANCES = 10; + public static final Map MAX_RELATIONS_BY_RELCLASS = Maps.newHashMap(); + + static { + MAX_RELATIONS_BY_RELCLASS.put(ModelConstants.PERSON_PERSON_HASCOAUTHORED, 500); + MAX_RELATIONS_BY_RELCLASS.put(ModelConstants.RESULT_PERSON_HASAUTHORED, 500); + } public static String getCollectionName(String format) { return format + SEPARATOR + LAYOUT + SEPARATOR + INTERPRETATION; diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PayloadConverterJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PayloadConverterJob.java index 351526336..cb2d2e799 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PayloadConverterJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PayloadConverterJob.java @@ -2,10 +2,12 @@ package eu.dnetlib.dhp.oa.provision; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import static eu.dnetlib.dhp.schema.oaf.utils.ModelHardLimits.MAX_RELATIONS_BY_RELCLASS; import static eu.dnetlib.dhp.utils.DHPUtils.toSeq; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import org.apache.commons.io.IOUtils; @@ -15,11 +17,13 @@ import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; import org.apache.spark.util.LongAccumulator; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import eu.dnetlib.dhp.application.ArgumentApplicationParser; @@ -27,11 +31,13 @@ import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.oa.provision.model.JoinedEntity; import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport; +import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper; import eu.dnetlib.dhp.oa.provision.model.TupleWrapper; import eu.dnetlib.dhp.oa.provision.utils.ContextMapper; import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory; import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.schema.oaf.utils.ModelHardLimits; import eu.dnetlib.dhp.schema.solr.SolrRecord; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; @@ -124,6 +130,9 @@ public class PayloadConverterJob { .map(Oaf::getDataInfo) .map(DataInfo::getDeletedbyinference) .orElse(false)) + .map( + (MapFunction) PayloadConverterJob::pruneRelatedEntities, + Encoders.kryo(JoinedEntity.class)) .map( (MapFunction>) je -> new Tuple2<>( recordFactory.build(je, validateXML), @@ -139,6 +148,30 @@ public class PayloadConverterJob { .json(outputPath); } + /** + This function iterates through the RelatedEntityWrapper(s) associated to the JoinedEntity and rules out + those exceeding the maximum allowed frequency defined in eu.dnetlib.dhp.schema.oaf.utils.ModelHardLimits#MAX_RELATIONS_BY_RELCLASS + */ + private static JoinedEntity pruneRelatedEntities(JoinedEntity je) { + Map freqs = Maps.newHashMap(); + List rew = Lists.newArrayList(); + + if (je.getLinks() != null) { + je.getLinks().forEach(link -> { + final String relClass = link.getRelation().getRelClass(); + Long count = freqs.putIfAbsent(relClass, 0L); + if (Objects.isNull(count) || (MAX_RELATIONS_BY_RELCLASS.containsKey(relClass) + && count <= MAX_RELATIONS_BY_RELCLASS.get(relClass))) { + rew.add(link); + freqs.put(relClass, freqs.get(relClass) + 1); + } + }); + je.setLinks(rew); + } + + return je; + } + private static void removeOutputDir(final SparkSession spark, final String path) { HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java index b1f419a7e..97d2d3989 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java @@ -1046,7 +1046,13 @@ public class XmlRecordFactory implements Serializable { metadata.add(XmlSerializationUtils.asXmlElement("familyname", person.getFamilyName())); } if (person.getAlternativeNames() != null) { - metadata.addAll(person.getAlternativeNames()); + metadata + .addAll( + person + .getAlternativeNames() + .stream() + .map(altName -> XmlSerializationUtils.asXmlElement("alternativename", altName)) + .collect(Collectors.toList())); } if (person.getBiography() != null) { metadata.add(XmlSerializationUtils.asXmlElement("biography", person.getBiography()));