[graph provision] person serialisation, limit the number of authorships and coauthorships before expanding the payloads

This commit is contained in:
Claudio Atzori 2024-09-23 10:29:46 +02:00
parent 5f86c93be6
commit e0ff84baf0
3 changed files with 52 additions and 1 deletions

View File

@ -1,6 +1,12 @@
package eu.dnetlib.dhp.schema.oaf.utils; package eu.dnetlib.dhp.schema.oaf.utils;
import java.util.Map;
import com.google.common.collect.Maps;
import eu.dnetlib.dhp.schema.common.ModelConstants;
public class ModelHardLimits { public class ModelHardLimits {
private ModelHardLimits() { private ModelHardLimits() {
@ -19,6 +25,12 @@ public class ModelHardLimits {
public static final int MAX_ABSTRACT_LENGTH = 150000; public static final int MAX_ABSTRACT_LENGTH = 150000;
public static final int MAX_RELATED_ABSTRACT_LENGTH = 500; public static final int MAX_RELATED_ABSTRACT_LENGTH = 500;
public static final int MAX_INSTANCES = 10; public static final int MAX_INSTANCES = 10;
public static final Map<String, Integer> MAX_RELATIONS_BY_RELCLASS = Maps.newHashMap();
static {
MAX_RELATIONS_BY_RELCLASS.put(ModelConstants.PERSON_PERSON_HASCOAUTHORED, 500);
MAX_RELATIONS_BY_RELCLASS.put(ModelConstants.RESULT_PERSON_HASAUTHORED, 500);
}
public static String getCollectionName(String format) { public static String getCollectionName(String format) {
return format + SEPARATOR + LAYOUT + SEPARATOR + INTERPRETATION; return format + SEPARATOR + LAYOUT + SEPARATOR + INTERPRETATION;

View File

@ -2,10 +2,12 @@
package eu.dnetlib.dhp.oa.provision; package eu.dnetlib.dhp.oa.provision;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.schema.oaf.utils.ModelHardLimits.MAX_RELATIONS_BY_RELCLASS;
import static eu.dnetlib.dhp.utils.DHPUtils.toSeq; import static eu.dnetlib.dhp.utils.DHPUtils.toSeq;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
@ -15,11 +17,13 @@ import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*; import org.apache.spark.sql.*;
import org.apache.spark.util.LongAccumulator; import org.apache.spark.util.LongAccumulator;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
@ -27,11 +31,13 @@ import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.oa.provision.model.JoinedEntity; import eu.dnetlib.dhp.oa.provision.model.JoinedEntity;
import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport; import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport;
import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper;
import eu.dnetlib.dhp.oa.provision.model.TupleWrapper; import eu.dnetlib.dhp.oa.provision.model.TupleWrapper;
import eu.dnetlib.dhp.oa.provision.utils.ContextMapper; import eu.dnetlib.dhp.oa.provision.utils.ContextMapper;
import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory; import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory;
import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.utils.ModelHardLimits;
import eu.dnetlib.dhp.schema.solr.SolrRecord; import eu.dnetlib.dhp.schema.solr.SolrRecord;
import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@ -124,6 +130,9 @@ public class PayloadConverterJob {
.map(Oaf::getDataInfo) .map(Oaf::getDataInfo)
.map(DataInfo::getDeletedbyinference) .map(DataInfo::getDeletedbyinference)
.orElse(false)) .orElse(false))
.map(
(MapFunction<JoinedEntity, JoinedEntity>) PayloadConverterJob::pruneRelatedEntities,
Encoders.kryo(JoinedEntity.class))
.map( .map(
(MapFunction<JoinedEntity, Tuple2<String, SolrRecord>>) je -> new Tuple2<>( (MapFunction<JoinedEntity, Tuple2<String, SolrRecord>>) je -> new Tuple2<>(
recordFactory.build(je, validateXML), recordFactory.build(je, validateXML),
@ -139,6 +148,30 @@ public class PayloadConverterJob {
.json(outputPath); .json(outputPath);
} }
/**
This function iterates through the RelatedEntityWrapper(s) associated to the JoinedEntity and rules out
those exceeding the maximum allowed frequency defined in eu.dnetlib.dhp.schema.oaf.utils.ModelHardLimits#MAX_RELATIONS_BY_RELCLASS
*/
private static JoinedEntity pruneRelatedEntities(JoinedEntity je) {
Map<String, Long> freqs = Maps.newHashMap();
List<RelatedEntityWrapper> rew = Lists.newArrayList();
if (je.getLinks() != null) {
je.getLinks().forEach(link -> {
final String relClass = link.getRelation().getRelClass();
Long count = freqs.putIfAbsent(relClass, 0L);
if (Objects.isNull(count) || (MAX_RELATIONS_BY_RELCLASS.containsKey(relClass)
&& count <= MAX_RELATIONS_BY_RELCLASS.get(relClass))) {
rew.add(link);
freqs.put(relClass, freqs.get(relClass) + 1);
}
});
je.setLinks(rew);
}
return je;
}
private static void removeOutputDir(final SparkSession spark, final String path) { private static void removeOutputDir(final SparkSession spark, final String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
} }

View File

@ -1046,7 +1046,13 @@ public class XmlRecordFactory implements Serializable {
metadata.add(XmlSerializationUtils.asXmlElement("familyname", person.getFamilyName())); metadata.add(XmlSerializationUtils.asXmlElement("familyname", person.getFamilyName()));
} }
if (person.getAlternativeNames() != null) { if (person.getAlternativeNames() != null) {
metadata.addAll(person.getAlternativeNames()); metadata
.addAll(
person
.getAlternativeNames()
.stream()
.map(altName -> XmlSerializationUtils.asXmlElement("alternativename", altName))
.collect(Collectors.toList()));
} }
if (person.getBiography() != null) { if (person.getBiography() != null) {
metadata.add(XmlSerializationUtils.asXmlElement("biography", person.getBiography())); metadata.add(XmlSerializationUtils.asXmlElement("biography", person.getBiography()));