Merge pull request 'Fixes in Graph Provision' (#434) from beta_provision_relation into beta

Reviewed-on: #434
This commit is contained in:
Claudio Atzori 2024-05-09 14:15:05 +02:00
commit c1237ab39e
14 changed files with 191 additions and 380 deletions

View File

@ -153,10 +153,15 @@ public class CreateRelatedEntitiesJob_phase1 {
result result
.getTitle() .getTitle()
.stream() .stream()
.filter(t -> StringUtils.isNotBlank(t.getValue()))
.findFirst() .findFirst()
.map(StructuredProperty::getValue)
.ifPresent( .ifPresent(
title -> re.getTitle().setValue(StringUtils.left(title, ModelHardLimits.MAX_TITLE_LENGTH))); title -> {
re.setTitle(title);
re
.getTitle()
.setValue(StringUtils.left(title.getValue(), ModelHardLimits.MAX_TITLE_LENGTH));
});
} }
if (Objects.nonNull(result.getDescription()) && !result.getDescription().isEmpty()) { if (Objects.nonNull(result.getDescription()) && !result.getDescription().isEmpty()) {
result result

View File

@ -3,24 +3,16 @@ package eu.dnetlib.dhp.oa.provision;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.utils.DHPUtils.toSeq; import static eu.dnetlib.dhp.utils.DHPUtils.toSeq;
import static org.apache.spark.sql.functions.*;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext; import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.*; import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.util.LongAccumulator; import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -45,9 +37,9 @@ import scala.Tuple2;
/** /**
* XmlConverterJob converts the JoinedEntities as XML records * XmlConverterJob converts the JoinedEntities as XML records
*/ */
public class XmlConverterJob { public class PayloadConverterJob {
private static final Logger log = LoggerFactory.getLogger(XmlConverterJob.class); private static final Logger log = LoggerFactory.getLogger(PayloadConverterJob.class);
public static final String schemaLocation = "https://www.openaire.eu/schema/1.0/oaf-1.0.xsd"; public static final String schemaLocation = "https://www.openaire.eu/schema/1.0/oaf-1.0.xsd";
@ -56,8 +48,8 @@ public class XmlConverterJob {
final ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils IOUtils
.toString( .toString(
XmlConverterJob.class PayloadConverterJob.class
.getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_xml_converter.json"))); .getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_payload_converter.json")));
parser.parseArgument(args); parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional final Boolean isSparkSessionManaged = Optional
@ -72,6 +64,12 @@ public class XmlConverterJob {
final String outputPath = parser.get("outputPath"); final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath); log.info("outputPath: {}", outputPath);
final Boolean validateXML = Optional
.ofNullable(parser.get("validateXML"))
.map(Boolean::valueOf)
.orElse(Boolean.FALSE);
log.info("validateXML: {}", validateXML);
final String contextApiBaseUrl = parser.get("contextApiBaseUrl"); final String contextApiBaseUrl = parser.get("contextApiBaseUrl");
log.info("contextApiBaseUrl: {}", contextApiBaseUrl); log.info("contextApiBaseUrl: {}", contextApiBaseUrl);
@ -86,18 +84,19 @@ public class XmlConverterJob {
runWithSparkSession(conf, isSparkSessionManaged, spark -> { runWithSparkSession(conf, isSparkSessionManaged, spark -> {
removeOutputDir(spark, outputPath); removeOutputDir(spark, outputPath);
convertToXml( createPayloads(
spark, inputPath, outputPath, ContextMapper.fromAPI(contextApiBaseUrl), spark, inputPath, outputPath, ContextMapper.fromAPI(contextApiBaseUrl),
VocabularyGroup.loadVocsFromIS(isLookup)); VocabularyGroup.loadVocsFromIS(isLookup), validateXML);
}); });
} }
private static void convertToXml( private static void createPayloads(
final SparkSession spark, final SparkSession spark,
final String inputPath, final String inputPath,
final String outputPath, final String outputPath,
final ContextMapper contextMapper, final ContextMapper contextMapper,
final VocabularyGroup vocabularies) { final VocabularyGroup vocabularies,
final Boolean validateXML) {
final XmlRecordFactory recordFactory = new XmlRecordFactory( final XmlRecordFactory recordFactory = new XmlRecordFactory(
prepareAccumulators(spark.sparkContext()), prepareAccumulators(spark.sparkContext()),
@ -118,7 +117,7 @@ public class XmlConverterJob {
.as(Encoders.kryo(JoinedEntity.class)) .as(Encoders.kryo(JoinedEntity.class))
.map( .map(
(MapFunction<JoinedEntity, Tuple2<String, SolrRecord>>) je -> new Tuple2<>( (MapFunction<JoinedEntity, Tuple2<String, SolrRecord>>) je -> new Tuple2<>(
recordFactory.build(je), recordFactory.build(je, validateXML),
ProvisionModelSupport.transform(je, contextMapper, vocabularies)), ProvisionModelSupport.transform(je, contextMapper, vocabularies)),
Encoders.tuple(Encoders.STRING(), Encoders.bean(SolrRecord.class))) Encoders.tuple(Encoders.STRING(), Encoders.bean(SolrRecord.class)))
.map( .map(

View File

@ -2,42 +2,34 @@
package eu.dnetlib.dhp.oa.provision; package eu.dnetlib.dhp.oa.provision;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static org.apache.spark.sql.functions.col;
import java.util.HashSet; import java.util.HashSet;
import java.util.Optional; import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.expressions.Aggregator; import org.apache.spark.sql.expressions.Window;
import org.apache.spark.sql.expressions.WindowSpec;
import org.apache.spark.sql.functions;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter; import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport; import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport;
import eu.dnetlib.dhp.oa.provision.model.SortableRelationKey;
import eu.dnetlib.dhp.oa.provision.utils.RelationPartitioner;
import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Relation;
import scala.Tuple2;
/** /**
* PrepareRelationsJob prunes the relationships: only consider relationships that are not virtually deleted * PrepareRelationsJob prunes the relationships: only consider relationships that are not virtually deleted
@ -130,132 +122,36 @@ public class PrepareRelationsJob {
private static void prepareRelationsRDD(SparkSession spark, String inputRelationsPath, String outputPath, private static void prepareRelationsRDD(SparkSession spark, String inputRelationsPath, String outputPath,
Set<String> relationFilter, int sourceMaxRelations, int targetMaxRelations, int relPartitions) { Set<String> relationFilter, int sourceMaxRelations, int targetMaxRelations, int relPartitions) {
JavaRDD<Relation> rels = readPathRelationRDD(spark, inputRelationsPath) WindowSpec source_w = Window
.filter(rel -> !(rel.getSource().startsWith("unresolved") || rel.getTarget().startsWith("unresolved"))) .partitionBy("source", "subRelType")
.filter(rel -> !rel.getDataInfo().getDeletedbyinference()) .orderBy(col("target").desc_nulls_last());
.filter(rel -> !relationFilter.contains(StringUtils.lowerCase(rel.getRelClass())));
JavaRDD<Relation> pruned = pruneRels( WindowSpec target_w = Window
pruneRels( .partitionBy("target", "subRelType")
rels, .orderBy(col("source").desc_nulls_last());
sourceMaxRelations, relPartitions, (Function<Relation, String>) Relation::getSource),
targetMaxRelations, relPartitions, (Function<Relation, String>) Relation::getTarget);
spark
.createDataset(pruned.rdd(), Encoders.bean(Relation.class))
.repartition(relPartitions)
.write()
.mode(SaveMode.Overwrite)
.parquet(outputPath);
}
private static JavaRDD<Relation> pruneRels(JavaRDD<Relation> rels, int maxRelations,
int relPartitions, Function<Relation, String> idFn) {
return rels
.mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, idFn.call(r)), r))
.repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions))
.groupBy(Tuple2::_1)
.map(Tuple2::_2)
.map(t -> Iterables.limit(t, maxRelations))
.flatMap(Iterable::iterator)
.map(Tuple2::_2);
}
// experimental
private static void prepareRelationsDataset(
SparkSession spark, String inputRelationsPath, String outputPath, Set<String> relationFilter, int maxRelations,
int relPartitions) {
spark spark
.read() .read()
.textFile(inputRelationsPath) .schema(Encoders.bean(Relation.class).schema())
.repartition(relPartitions) .json(inputRelationsPath)
.map( .where("source NOT LIKE 'unresolved%' AND target NOT LIKE 'unresolved%'")
(MapFunction<String, Relation>) s -> OBJECT_MAPPER.readValue(s, Relation.class), .where("datainfo.deletedbyinference != true")
Encoders.kryo(Relation.class)) .where(
.filter((FilterFunction<Relation>) rel -> !rel.getDataInfo().getDeletedbyinference()) relationFilter.isEmpty() ? ""
.filter((FilterFunction<Relation>) rel -> !relationFilter.contains(rel.getRelClass())) : "lower(relClass) NOT IN ("
.groupByKey( + relationFilter.stream().map(s -> "'" + s + "'").collect(Collectors.joining(",")) + ")")
(MapFunction<Relation, String>) Relation::getSource, .withColumn("source_w_pos", functions.row_number().over(source_w))
Encoders.STRING()) .where("source_w_pos < " + sourceMaxRelations)
.agg(new RelationAggregator(maxRelations).toColumn()) .drop("source_w_pos")
.flatMap( .withColumn("target_w_pos", functions.row_number().over(target_w))
(FlatMapFunction<Tuple2<String, RelationList>, Relation>) t -> Iterables .where("target_w_pos < " + targetMaxRelations)
.limit(t._2().getRelations(), maxRelations) .drop("target_w_pos")
.iterator(), .coalesce(relPartitions)
Encoders.bean(Relation.class))
.repartition(relPartitions)
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.parquet(outputPath); .parquet(outputPath);
} }
public static class RelationAggregator
extends Aggregator<Relation, RelationList, RelationList> {
private final int maxRelations;
public RelationAggregator(int maxRelations) {
this.maxRelations = maxRelations;
}
@Override
public RelationList zero() {
return new RelationList();
}
@Override
public RelationList reduce(RelationList b, Relation a) {
b.getRelations().add(a);
return getSortableRelationList(b);
}
@Override
public RelationList merge(RelationList b1, RelationList b2) {
b1.getRelations().addAll(b2.getRelations());
return getSortableRelationList(b1);
}
@Override
public RelationList finish(RelationList r) {
return getSortableRelationList(r);
}
private RelationList getSortableRelationList(RelationList b1) {
RelationList sr = new RelationList();
sr
.setRelations(
b1
.getRelations()
.stream()
.limit(maxRelations)
.collect(Collectors.toCollection(() -> new PriorityQueue<>(new RelationComparator()))));
return sr;
}
@Override
public Encoder<RelationList> bufferEncoder() {
return Encoders.kryo(RelationList.class);
}
@Override
public Encoder<RelationList> outputEncoder() {
return Encoders.kryo(RelationList.class);
}
}
/**
* Reads a JavaRDD of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline delimited json text
* file,
*
* @param spark
* @param inputPath
* @return the JavaRDD<SortableRelation> containing all the relationships
*/
private static JavaRDD<Relation> readPathRelationRDD(
SparkSession spark, final String inputPath) {
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
return sc.textFile(inputPath).map(s -> OBJECT_MAPPER.readValue(s, Relation.class));
}
private static void removeOutputDir(SparkSession spark, String path) { private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
} }

View File

@ -1,44 +0,0 @@
package eu.dnetlib.dhp.oa.provision;
import java.util.Comparator;
import java.util.Map;
import java.util.Optional;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Maps;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class RelationComparator implements Comparator<Relation> {
private static final Map<String, Integer> weights = Maps.newHashMap();
static {
weights.put(ModelConstants.OUTCOME, 0);
weights.put(ModelConstants.SUPPLEMENT, 1);
weights.put(ModelConstants.REVIEW, 2);
weights.put(ModelConstants.CITATION, 3);
weights.put(ModelConstants.AFFILIATION, 4);
weights.put(ModelConstants.RELATIONSHIP, 5);
weights.put(ModelConstants.PUBLICATION_DATASET, 6);
weights.put(ModelConstants.SIMILARITY, 7);
weights.put(ModelConstants.PROVISION, 8);
weights.put(ModelConstants.PARTICIPATION, 9);
weights.put(ModelConstants.DEDUP, 10);
}
private Integer getWeight(Relation o) {
return Optional.ofNullable(weights.get(o.getSubRelType())).orElse(Integer.MAX_VALUE);
}
@Override
public int compare(Relation o1, Relation o2) {
return ComparisonChain
.start()
.compare(getWeight(o1), getWeight(o2))
.result();
}
}

View File

@ -1,25 +0,0 @@
package eu.dnetlib.dhp.oa.provision;
import java.io.Serializable;
import java.util.PriorityQueue;
import java.util.Queue;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class RelationList implements Serializable {
private Queue<Relation> relations;
public RelationList() {
this.relations = new PriorityQueue<>(new RelationComparator());
}
public Queue<Relation> getRelations() {
return relations;
}
public void setRelations(Queue<Relation> relations) {
this.relations = relations;
}
}

View File

@ -1,81 +0,0 @@
package eu.dnetlib.dhp.oa.provision;
import java.io.Serializable;
import java.util.Map;
import java.util.Optional;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Maps;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class SortableRelation extends Relation implements Comparable<SortableRelation>, Serializable {
private static final Map<String, Integer> weights = Maps.newHashMap();
static {
weights.put(ModelConstants.OUTCOME, 0);
weights.put(ModelConstants.SUPPLEMENT, 1);
weights.put(ModelConstants.REVIEW, 2);
weights.put(ModelConstants.CITATION, 3);
weights.put(ModelConstants.AFFILIATION, 4);
weights.put(ModelConstants.RELATIONSHIP, 5);
weights.put(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID, 6);
weights.put(ModelConstants.SIMILARITY, 7);
weights.put(ModelConstants.PROVISION, 8);
weights.put(ModelConstants.PARTICIPATION, 9);
weights.put(ModelConstants.DEDUP, 10);
}
private static final long serialVersionUID = 34753984579L;
private String groupingKey;
public static SortableRelation create(Relation r, String groupingKey) {
SortableRelation sr = new SortableRelation();
sr.setGroupingKey(groupingKey);
sr.setSource(r.getSource());
sr.setTarget(r.getTarget());
sr.setRelType(r.getRelType());
sr.setSubRelType(r.getSubRelType());
sr.setRelClass(r.getRelClass());
sr.setDataInfo(r.getDataInfo());
sr.setCollectedfrom(r.getCollectedfrom());
sr.setLastupdatetimestamp(r.getLastupdatetimestamp());
sr.setProperties(r.getProperties());
sr.setValidated(r.getValidated());
sr.setValidationDate(r.getValidationDate());
return sr;
}
@JsonIgnore
public Relation asRelation() {
return this;
}
@Override
public int compareTo(SortableRelation o) {
return ComparisonChain
.start()
.compare(getGroupingKey(), o.getGroupingKey())
.compare(getWeight(this), getWeight(o))
.result();
}
private Integer getWeight(SortableRelation o) {
return Optional.ofNullable(weights.get(o.getSubRelType())).orElse(Integer.MAX_VALUE);
}
public String getGroupingKey() {
return groupingKey;
}
public void setGroupingKey(String groupingKey) {
this.groupingKey = groupingKey;
}
}

View File

@ -1,8 +1,6 @@
package eu.dnetlib.dhp.oa.provision.model; package eu.dnetlib.dhp.oa.provision.model;
import static org.apache.commons.lang3.StringUtils.substringBefore;
import java.io.StringReader; import java.io.StringReader;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -16,16 +14,15 @@ import org.jetbrains.annotations.Nullable;
import com.google.common.base.Splitter; import com.google.common.base.Splitter;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.common.vocabulary.VocabularyTerm; import eu.dnetlib.dhp.common.vocabulary.VocabularyTerm;
import eu.dnetlib.dhp.oa.provision.RelationList;
import eu.dnetlib.dhp.oa.provision.SortableRelation;
import eu.dnetlib.dhp.oa.provision.utils.ContextDef; import eu.dnetlib.dhp.oa.provision.utils.ContextDef;
import eu.dnetlib.dhp.oa.provision.utils.ContextMapper; import eu.dnetlib.dhp.oa.provision.utils.ContextMapper;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
import eu.dnetlib.dhp.schema.solr.*; import eu.dnetlib.dhp.schema.solr.*;
import eu.dnetlib.dhp.schema.solr.AccessRight; import eu.dnetlib.dhp.schema.solr.AccessRight;
import eu.dnetlib.dhp.schema.solr.Author; import eu.dnetlib.dhp.schema.solr.Author;
@ -55,10 +52,7 @@ public class ProvisionModelSupport {
.newArrayList( .newArrayList(
RelatedEntityWrapper.class, RelatedEntityWrapper.class,
JoinedEntity.class, JoinedEntity.class,
RelatedEntity.class, RelatedEntity.class));
SortableRelationKey.class,
SortableRelation.class,
RelationList.class));
return modelClasses.toArray(new Class[] {}); return modelClasses.toArray(new Class[] {});
} }
@ -74,7 +68,11 @@ public class ProvisionModelSupport {
.setHeader( .setHeader(
SolrRecordHeader SolrRecordHeader
.newInstance( .newInstance(
e.getId(), e.getOriginalId(), type, deletedbyinference)); StringUtils
.substringAfter(
e.getId(),
IdentifierFactory.ID_PREFIX_SEPARATOR),
e.getOriginalId(), type, deletedbyinference));
r.setCollectedfrom(asProvenance(e.getCollectedfrom())); r.setCollectedfrom(asProvenance(e.getCollectedfrom()));
r.setContext(asContext(e.getContext(), contextMapper)); r.setContext(asContext(e.getContext(), contextMapper));
r.setPid(asPid(e.getPid())); r.setPid(asPid(e.getPid()));
@ -114,7 +112,8 @@ public class ProvisionModelSupport {
.newInstance( .newInstance(
relation.getRelType(), relation.getRelType(),
relation.getRelClass(), relation.getRelClass(),
relation.getTarget(), relatedRecordType)); StringUtils.substringAfter(relation.getTarget(), IdentifierFactory.ID_PREFIX_SEPARATOR),
relatedRecordType));
rr.setAcronym(re.getAcronym()); rr.setAcronym(re.getAcronym());
rr.setCode(re.getCode()); rr.setCode(re.getCode());

View File

@ -1,25 +1,23 @@
package eu.dnetlib.dhp.oa.provision.utils; package eu.dnetlib.dhp.oa.provision.utils;
import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.authorPidTypes; import com.fasterxml.jackson.databind.ObjectMapper;
import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.getRelDescriptor; import com.google.common.base.Joiner;
import static org.apache.commons.lang3.StringUtils.isNotBlank; import com.google.common.base.Splitter;
import static org.apache.commons.lang3.StringUtils.substringBefore; import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException; import com.google.common.collect.Sets;
import java.io.Serializable; import com.mycila.xmltool.XMLDoc;
import java.io.StringReader; import com.mycila.xmltool.XMLTag;
import java.io.StringWriter; import eu.dnetlib.dhp.oa.provision.model.JoinedEntity;
import java.net.MalformedURLException; import eu.dnetlib.dhp.oa.provision.model.RelatedEntity;
import java.net.URL; import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper;
import java.util.*; import eu.dnetlib.dhp.oa.provision.model.XmlInstance;
import java.util.stream.Collectors; import eu.dnetlib.dhp.schema.common.*;
import java.util.stream.Stream; import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.*;
import javax.xml.transform.*; import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
import javax.xml.transform.dom.DOMSource; import eu.dnetlib.dhp.schema.oaf.utils.ModelHardLimits;
import javax.xml.transform.stream.StreamResult;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
@ -31,27 +29,26 @@ import org.dom4j.Node;
import org.dom4j.io.OutputFormat; import org.dom4j.io.OutputFormat;
import org.dom4j.io.SAXReader; import org.dom4j.io.SAXReader;
import org.dom4j.io.XMLWriter; import org.dom4j.io.XMLWriter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.mycila.xmltool.XMLDoc;
import com.mycila.xmltool.XMLTag;
import eu.dnetlib.dhp.oa.provision.model.JoinedEntity;
import eu.dnetlib.dhp.oa.provision.model.RelatedEntity;
import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper;
import eu.dnetlib.dhp.oa.provision.model.XmlInstance;
import eu.dnetlib.dhp.schema.common.*;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
import eu.dnetlib.dhp.schema.oaf.utils.ModelHardLimits;
import scala.Tuple2; import scala.Tuple2;
import javax.xml.transform.*;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import java.io.IOException;
import java.io.Serializable;
import java.io.StringReader;
import java.io.StringWriter;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.authorPidTypes;
import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.getRelDescriptor;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.commons.lang3.StringUtils.substringBefore;
public class XmlRecordFactory implements Serializable { public class XmlRecordFactory implements Serializable {
/** /**
@ -93,10 +90,13 @@ public class XmlRecordFactory implements Serializable {
} }
public String build(final JoinedEntity je) { public String build(final JoinedEntity je) {
return build(je, false);
}
public String build(final JoinedEntity je, final Boolean validate) {
final Set<String> contexts = Sets.newHashSet(); final Set<String> contexts = Sets.newHashSet();
// final OafEntity entity = toOafEntity(je.getEntity());
final OafEntity entity = je.getEntity(); final OafEntity entity = je.getEntity();
final TemplateFactory templateFactory = new TemplateFactory(); final TemplateFactory templateFactory = new TemplateFactory();
try { try {
@ -122,8 +122,14 @@ public class XmlRecordFactory implements Serializable {
.buildBody( .buildBody(
mainType, metadata, relations, listChildren(entity, je, templateFactory), listExtraInfo(entity)); mainType, metadata, relations, listChildren(entity, je, templateFactory), listExtraInfo(entity));
return templateFactory.buildRecord(entity, schemaLocation, body); String xmlRecord = templateFactory.buildRecord(entity, schemaLocation, body);
// return printXML(templateFactory.buildRecord(entity, schemaLocation, body), indent);
if (Boolean.TRUE.equals(validate)) {
// rise an exception when an invalid record was built
new SAXReader().read(new StringReader(xmlRecord));
}
return xmlRecord;
// return printXML(templateFactory.buildRecord(entity, schemaLocation, body), indent);
} catch (final Throwable e) { } catch (final Throwable e) {
throw new RuntimeException(String.format("error building record '%s'", entity.getId()), e); throw new RuntimeException(String.format("error building record '%s'", entity.getId()), e);
} }
@ -1038,13 +1044,21 @@ public class XmlRecordFactory implements Serializable {
} }
private List<String> measuresAsXml(List<Measure> measures) { private List<String> measuresAsXml(List<Measure> measures) {
return measures return Stream
.stream() .concat(
.map(m -> { measures
List<Tuple2<String, String>> l = Lists.newArrayList(new Tuple2<>("id", m.getId())); .stream()
m.getUnit().forEach(kv -> l.add(new Tuple2<>(kv.getKey(), kv.getValue()))); .filter(m -> !"downloads".equals(m.getId()) && !"views".equals(m.getId()))
return XmlSerializationUtils.asXmlElement("measure", l); .map(m -> {
}) List<Tuple2<String, String>> l = Lists.newArrayList(new Tuple2<>("id", m.getId()));
m.getUnit().forEach(kv -> l.add(new Tuple2<>(kv.getKey(), kv.getValue())));
return XmlSerializationUtils.asXmlElement("measure", l);
}),
measures
.stream()
.filter(m -> "downloads".equals(m.getId()) || "views".equals(m.getId()))
.filter(m -> m.getUnit().stream().anyMatch(u -> Integer.parseInt(u.getValue()) > 0))
.map(m -> XmlSerializationUtils.usageMeasureAsXmlElement("measure", m)))
.collect(Collectors.toList()); .collect(Collectors.toList());
} }

View File

@ -5,7 +5,11 @@ import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.removePrefix;
import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -166,6 +170,35 @@ public class XmlSerializationUtils {
return sb.toString(); return sb.toString();
} }
// <measure downloads="0" views="0">infrastruct_::f66f1bd369679b5b077dcdf006089556||OpenAIRE</measure>
public static String usageMeasureAsXmlElement(String name, Measure measure) {
HashSet<String> dsIds = Optional
.ofNullable(measure.getUnit())
.map(
m -> m
.stream()
.map(KeyValue::getKey)
.collect(Collectors.toCollection(HashSet::new)))
.orElse(new HashSet<>());
StringBuilder sb = new StringBuilder();
dsIds.forEach(dsId -> {
sb
.append("<")
.append(name);
for (KeyValue kv : measure.getUnit()) {
sb.append(" ").append(attr(measure.getId(), kv.getValue()));
}
sb
.append(">")
.append(dsId)
.append("</")
.append(name)
.append(">");
});
return sb.toString();
}
public static String mapEoscIf(EoscIfGuidelines e) { public static String mapEoscIf(EoscIfGuidelines e) {
return asXmlElement( return asXmlElement(
"eoscifguidelines", Lists "eoscifguidelines", Lists

View File

@ -22,5 +22,11 @@
"paramLongName": "isLookupUrl", "paramLongName": "isLookupUrl",
"paramDescription": "URL of the context ISLookup Service", "paramDescription": "URL of the context ISLookup Service",
"paramRequired": true "paramRequired": true
},
{
"paramName": "val",
"paramLongName": "validateXML",
"paramDescription": "should the process check the XML validity",
"paramRequired": false
} }
] ]

View File

@ -13,6 +13,11 @@
<name>contextApiBaseUrl</name> <name>contextApiBaseUrl</name>
<description>context API URL</description> <description>context API URL</description>
</property> </property>
<property>
<name>validateXML</name>
<description>should the payload converter validate the XMLs</description>
<value>false</value>
</property>
<property> <property>
<name>relPartitions</name> <name>relPartitions</name>
<description>number or partitions for the relations Dataset</description> <description>number or partitions for the relations Dataset</description>
@ -125,7 +130,7 @@
<case to="prepare_relations">${wf:conf('resumeFrom') eq 'prepare_relations'}</case> <case to="prepare_relations">${wf:conf('resumeFrom') eq 'prepare_relations'}</case>
<case to="fork_join_related_entities">${wf:conf('resumeFrom') eq 'fork_join_related_entities'}</case> <case to="fork_join_related_entities">${wf:conf('resumeFrom') eq 'fork_join_related_entities'}</case>
<case to="fork_join_all_entities">${wf:conf('resumeFrom') eq 'fork_join_all_entities'}</case> <case to="fork_join_all_entities">${wf:conf('resumeFrom') eq 'fork_join_all_entities'}</case>
<case to="convert_to_xml">${wf:conf('resumeFrom') eq 'convert_to_xml'}</case> <case to="create_payloads">${wf:conf('resumeFrom') eq 'create_payloads'}</case>
<case to="drop_solr_collection">${wf:conf('resumeFrom') eq 'drop_solr_collection'}</case> <case to="drop_solr_collection">${wf:conf('resumeFrom') eq 'drop_solr_collection'}</case>
<case to="to_solr_index">${wf:conf('resumeFrom') eq 'to_solr_index'}</case> <case to="to_solr_index">${wf:conf('resumeFrom') eq 'to_solr_index'}</case>
<default to="prepare_relations"/> <default to="prepare_relations"/>
@ -144,21 +149,23 @@
<class>eu.dnetlib.dhp.oa.provision.PrepareRelationsJob</class> <class>eu.dnetlib.dhp.oa.provision.PrepareRelationsJob</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar> <jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCoresForJoining} --executor-cores=4
--executor-memory=${sparkExecutorMemoryForJoining} --executor-memory=6G
--driver-memory=${sparkDriverMemoryForJoining} --driver-memory=${sparkDriverMemoryForJoining}
--conf spark.executor.memoryOverhead=6G
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=15000
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputRelationsPath</arg><arg>${inputGraphRootPath}/relation</arg> <arg>--inputRelationsPath</arg><arg>${inputGraphRootPath}/relation</arg>
<arg>--outputPath</arg><arg>${workingDir}/relation</arg> <arg>--outputPath</arg><arg>${workingDir}/relation</arg>
<arg>--sourceMaxRelations</arg><arg>${sourceMaxRelations}</arg> <arg>--sourceMaxRelations</arg><arg>${sourceMaxRelations}</arg>
<arg>--targetMaxRelations</arg><arg>${targetMaxRelations}</arg> <arg>--targetMaxRelations</arg><arg>${targetMaxRelations}</arg>
<arg>--relationFilter</arg><arg>${relationFilter}</arg> <arg>--relationFilter</arg><arg>${relationFilter}</arg>
<arg>--relPartitions</arg><arg>5000</arg> <arg>--relPartitions</arg><arg>15000</arg>
</spark> </spark>
<ok to="fork_join_related_entities"/> <ok to="fork_join_related_entities"/>
<error to="Kill"/> <error to="Kill"/>
@ -585,19 +592,20 @@
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<join name="wait_join_phase2" to="convert_to_xml"/> <join name="wait_join_phase2" to="create_payloads"/>
<action name="convert_to_xml"> <action name="create_payloads">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>convert_to_xml</name> <name>create_payloads</name>
<class>eu.dnetlib.dhp.oa.provision.XmlConverterJob</class> <class>eu.dnetlib.dhp.oa.provision.PayloadConverterJob</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar> <jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
@ -605,8 +613,9 @@
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
--conf spark.network.timeout=${sparkNetworkTimeout} --conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${workingDir}/join_entities</arg> <arg>--inputPath</arg><arg>/user/claudio.atzori/data/beta_provision/join_entities</arg>
<arg>--outputPath</arg><arg>${workingDir}/xml_json</arg> <arg>--outputPath</arg><arg>${workingDir}/xml_json</arg>
<arg>--validateXML</arg><arg>${validateXML}</arg>
<arg>--contextApiBaseUrl</arg><arg>${contextApiBaseUrl}</arg> <arg>--contextApiBaseUrl</arg><arg>${contextApiBaseUrl}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
</spark> </spark>

View File

@ -50,7 +50,7 @@ public class EOSCFuture_Test {
final ContextMapper contextMapper = new ContextMapper(); final ContextMapper contextMapper = new ContextMapper();
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false, final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
XmlConverterJob.schemaLocation); PayloadConverterJob.schemaLocation);
final OtherResearchProduct p = OBJECT_MAPPER final OtherResearchProduct p = OBJECT_MAPPER
.readValue( .readValue(

View File

@ -57,7 +57,7 @@ public class IndexRecordTransformerTest {
public void testPublicationRecordTransformation() throws IOException, TransformerException { public void testPublicationRecordTransformation() throws IOException, TransformerException {
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false, final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
XmlConverterJob.schemaLocation); PayloadConverterJob.schemaLocation);
final Publication p = load("publication.json", Publication.class); final Publication p = load("publication.json", Publication.class);
final Project pj = load("project.json", Project.class); final Project pj = load("project.json", Project.class);
@ -82,7 +82,7 @@ public class IndexRecordTransformerTest {
void testPeerReviewed() throws IOException, TransformerException { void testPeerReviewed() throws IOException, TransformerException {
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false, final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
XmlConverterJob.schemaLocation); PayloadConverterJob.schemaLocation);
final Publication p = load("publication.json", Publication.class); final Publication p = load("publication.json", Publication.class);
@ -98,7 +98,7 @@ public class IndexRecordTransformerTest {
public void testRiunet() throws IOException, TransformerException { public void testRiunet() throws IOException, TransformerException {
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false, final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
XmlConverterJob.schemaLocation); PayloadConverterJob.schemaLocation);
final Publication p = load("riunet.json", Publication.class); final Publication p = load("riunet.json", Publication.class);

View File

@ -37,7 +37,7 @@ public class XmlRecordFactoryTest {
final ContextMapper contextMapper = new ContextMapper(); final ContextMapper contextMapper = new ContextMapper();
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false, final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
XmlConverterJob.schemaLocation); PayloadConverterJob.schemaLocation);
final Publication p = OBJECT_MAPPER final Publication p = OBJECT_MAPPER
.readValue(IOUtils.toString(getClass().getResourceAsStream("publication.json")), Publication.class); .readValue(IOUtils.toString(getClass().getResourceAsStream("publication.json")), Publication.class);
@ -105,7 +105,7 @@ public class XmlRecordFactoryTest {
final ContextMapper contextMapper = new ContextMapper(); final ContextMapper contextMapper = new ContextMapper();
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false, final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
XmlConverterJob.schemaLocation); PayloadConverterJob.schemaLocation);
final Publication p = OBJECT_MAPPER final Publication p = OBJECT_MAPPER
.readValue(IOUtils.toString(getClass().getResourceAsStream("publication.json")), Publication.class); .readValue(IOUtils.toString(getClass().getResourceAsStream("publication.json")), Publication.class);
@ -136,7 +136,7 @@ public class XmlRecordFactoryTest {
final ContextMapper contextMapper = new ContextMapper(); final ContextMapper contextMapper = new ContextMapper();
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false, final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
XmlConverterJob.schemaLocation); PayloadConverterJob.schemaLocation);
final Publication p = OBJECT_MAPPER final Publication p = OBJECT_MAPPER
.readValue(IOUtils.toString(getClass().getResourceAsStream("publication.json")), Publication.class); .readValue(IOUtils.toString(getClass().getResourceAsStream("publication.json")), Publication.class);
@ -166,7 +166,7 @@ public class XmlRecordFactoryTest {
final ContextMapper contextMapper = new ContextMapper(); final ContextMapper contextMapper = new ContextMapper();
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false, final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
XmlConverterJob.schemaLocation); PayloadConverterJob.schemaLocation);
final Datasource d = OBJECT_MAPPER final Datasource d = OBJECT_MAPPER
.readValue(IOUtils.toString(getClass().getResourceAsStream("datasource.json")), Datasource.class); .readValue(IOUtils.toString(getClass().getResourceAsStream("datasource.json")), Datasource.class);
@ -203,7 +203,7 @@ public class XmlRecordFactoryTest {
final ContextMapper contextMapper = new ContextMapper(); final ContextMapper contextMapper = new ContextMapper();
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false, final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
XmlConverterJob.schemaLocation); PayloadConverterJob.schemaLocation);
final OtherResearchProduct p = OBJECT_MAPPER final OtherResearchProduct p = OBJECT_MAPPER
.readValue( .readValue(
@ -226,7 +226,7 @@ public class XmlRecordFactoryTest {
final ContextMapper contextMapper = new ContextMapper(); final ContextMapper contextMapper = new ContextMapper();
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false, final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
XmlConverterJob.schemaLocation); PayloadConverterJob.schemaLocation);
final OtherResearchProduct p = OBJECT_MAPPER final OtherResearchProduct p = OBJECT_MAPPER
.readValue( .readValue(
@ -249,7 +249,7 @@ public class XmlRecordFactoryTest {
final ContextMapper contextMapper = new ContextMapper(); final ContextMapper contextMapper = new ContextMapper();
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false, final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
XmlConverterJob.schemaLocation); PayloadConverterJob.schemaLocation);
final Publication p = OBJECT_MAPPER final Publication p = OBJECT_MAPPER
.readValue( .readValue(