[graph provision] fixed XML serialization of the usage counts measures, renamed workflow actions to better reflect their role

This commit is contained in:
Claudio Atzori 2024-05-09 13:54:42 +02:00
parent 18aa323ee9
commit 39a2afe8b5
9 changed files with 120 additions and 74 deletions

View File

@ -3,24 +3,16 @@ package eu.dnetlib.dhp.oa.provision;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.utils.DHPUtils.toSeq; import static eu.dnetlib.dhp.utils.DHPUtils.toSeq;
import static org.apache.spark.sql.functions.*;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext; import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.*; import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.util.LongAccumulator; import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -45,9 +37,9 @@ import scala.Tuple2;
/** /**
* XmlConverterJob converts the JoinedEntities as XML records * XmlConverterJob converts the JoinedEntities as XML records
*/ */
public class XmlConverterJob { public class PayloadConverterJob {
private static final Logger log = LoggerFactory.getLogger(XmlConverterJob.class); private static final Logger log = LoggerFactory.getLogger(PayloadConverterJob.class);
public static final String schemaLocation = "https://www.openaire.eu/schema/1.0/oaf-1.0.xsd"; public static final String schemaLocation = "https://www.openaire.eu/schema/1.0/oaf-1.0.xsd";
@ -56,8 +48,8 @@ public class XmlConverterJob {
final ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils IOUtils
.toString( .toString(
XmlConverterJob.class PayloadConverterJob.class
.getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_xml_converter.json"))); .getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_payload_converter.json")));
parser.parseArgument(args); parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional final Boolean isSparkSessionManaged = Optional

View File

@ -19,8 +19,10 @@ import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.common.vocabulary.VocabularyTerm; import eu.dnetlib.dhp.common.vocabulary.VocabularyTerm;
import eu.dnetlib.dhp.oa.provision.utils.ContextDef; import eu.dnetlib.dhp.oa.provision.utils.ContextDef;
import eu.dnetlib.dhp.oa.provision.utils.ContextMapper; import eu.dnetlib.dhp.oa.provision.utils.ContextMapper;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
import eu.dnetlib.dhp.schema.solr.*; import eu.dnetlib.dhp.schema.solr.*;
import eu.dnetlib.dhp.schema.solr.AccessRight; import eu.dnetlib.dhp.schema.solr.AccessRight;
import eu.dnetlib.dhp.schema.solr.Author; import eu.dnetlib.dhp.schema.solr.Author;
@ -66,7 +68,11 @@ public class ProvisionModelSupport {
.setHeader( .setHeader(
SolrRecordHeader SolrRecordHeader
.newInstance( .newInstance(
e.getId(), e.getOriginalId(), type, deletedbyinference)); StringUtils
.substringAfter(
e.getId(),
IdentifierFactory.ID_PREFIX_SEPARATOR),
e.getOriginalId(), type, deletedbyinference));
r.setCollectedfrom(asProvenance(e.getCollectedfrom())); r.setCollectedfrom(asProvenance(e.getCollectedfrom()));
r.setContext(asContext(e.getContext(), contextMapper)); r.setContext(asContext(e.getContext(), contextMapper));
r.setPid(asPid(e.getPid())); r.setPid(asPid(e.getPid()));
@ -106,7 +112,8 @@ public class ProvisionModelSupport {
.newInstance( .newInstance(
relation.getRelType(), relation.getRelType(),
relation.getRelClass(), relation.getRelClass(),
relation.getTarget(), relatedRecordType)); StringUtils.substringAfter(relation.getTarget(), IdentifierFactory.ID_PREFIX_SEPARATOR),
relatedRecordType));
rr.setAcronym(re.getAcronym()); rr.setAcronym(re.getAcronym());
rr.setCode(re.getCode()); rr.setCode(re.getCode());

View File

@ -1,25 +1,23 @@
package eu.dnetlib.dhp.oa.provision.utils; package eu.dnetlib.dhp.oa.provision.utils;
import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.authorPidTypes; import com.fasterxml.jackson.databind.ObjectMapper;
import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.getRelDescriptor; import com.google.common.base.Joiner;
import static org.apache.commons.lang3.StringUtils.isNotBlank; import com.google.common.base.Splitter;
import static org.apache.commons.lang3.StringUtils.substringBefore; import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException; import com.google.common.collect.Sets;
import java.io.Serializable; import com.mycila.xmltool.XMLDoc;
import java.io.StringReader; import com.mycila.xmltool.XMLTag;
import java.io.StringWriter; import eu.dnetlib.dhp.oa.provision.model.JoinedEntity;
import java.net.MalformedURLException; import eu.dnetlib.dhp.oa.provision.model.RelatedEntity;
import java.net.URL; import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper;
import java.util.*; import eu.dnetlib.dhp.oa.provision.model.XmlInstance;
import java.util.stream.Collectors; import eu.dnetlib.dhp.schema.common.*;
import java.util.stream.Stream; import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.*;
import javax.xml.transform.*; import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
import javax.xml.transform.dom.DOMSource; import eu.dnetlib.dhp.schema.oaf.utils.ModelHardLimits;
import javax.xml.transform.stream.StreamResult;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
@ -31,27 +29,26 @@ import org.dom4j.Node;
import org.dom4j.io.OutputFormat; import org.dom4j.io.OutputFormat;
import org.dom4j.io.SAXReader; import org.dom4j.io.SAXReader;
import org.dom4j.io.XMLWriter; import org.dom4j.io.XMLWriter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.mycila.xmltool.XMLDoc;
import com.mycila.xmltool.XMLTag;
import eu.dnetlib.dhp.oa.provision.model.JoinedEntity;
import eu.dnetlib.dhp.oa.provision.model.RelatedEntity;
import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper;
import eu.dnetlib.dhp.oa.provision.model.XmlInstance;
import eu.dnetlib.dhp.schema.common.*;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
import eu.dnetlib.dhp.schema.oaf.utils.ModelHardLimits;
import scala.Tuple2; import scala.Tuple2;
import javax.xml.transform.*;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import java.io.IOException;
import java.io.Serializable;
import java.io.StringReader;
import java.io.StringWriter;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.authorPidTypes;
import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.getRelDescriptor;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.commons.lang3.StringUtils.substringBefore;
public class XmlRecordFactory implements Serializable { public class XmlRecordFactory implements Serializable {
/** /**
@ -93,10 +90,13 @@ public class XmlRecordFactory implements Serializable {
} }
public String build(final JoinedEntity je) { public String build(final JoinedEntity je) {
return build(je, false);
}
public String build(final JoinedEntity je, final Boolean validate) {
final Set<String> contexts = Sets.newHashSet(); final Set<String> contexts = Sets.newHashSet();
// final OafEntity entity = toOafEntity(je.getEntity());
final OafEntity entity = je.getEntity(); final OafEntity entity = je.getEntity();
final TemplateFactory templateFactory = new TemplateFactory(); final TemplateFactory templateFactory = new TemplateFactory();
try { try {
@ -122,8 +122,14 @@ public class XmlRecordFactory implements Serializable {
.buildBody( .buildBody(
mainType, metadata, relations, listChildren(entity, je, templateFactory), listExtraInfo(entity)); mainType, metadata, relations, listChildren(entity, je, templateFactory), listExtraInfo(entity));
return templateFactory.buildRecord(entity, schemaLocation, body); String xmlRecord = templateFactory.buildRecord(entity, schemaLocation, body);
// return printXML(templateFactory.buildRecord(entity, schemaLocation, body), indent);
if (Boolean.TRUE.equals(validate)) {
// rise an exception when an invalid record was built
new SAXReader().read(new StringReader(xmlRecord));
}
return xmlRecord;
// return printXML(templateFactory.buildRecord(entity, schemaLocation, body), indent);
} catch (final Throwable e) { } catch (final Throwable e) {
throw new RuntimeException(String.format("error building record '%s'", entity.getId()), e); throw new RuntimeException(String.format("error building record '%s'", entity.getId()), e);
} }
@ -1038,13 +1044,21 @@ public class XmlRecordFactory implements Serializable {
} }
private List<String> measuresAsXml(List<Measure> measures) { private List<String> measuresAsXml(List<Measure> measures) {
return measures return Stream
.stream() .concat(
.map(m -> { measures
List<Tuple2<String, String>> l = Lists.newArrayList(new Tuple2<>("id", m.getId())); .stream()
m.getUnit().forEach(kv -> l.add(new Tuple2<>(kv.getKey(), kv.getValue()))); .filter(m -> !"downloads".equals(m.getId()) && !"views".equals(m.getId()))
return XmlSerializationUtils.asXmlElement("measure", l); .map(m -> {
}) List<Tuple2<String, String>> l = Lists.newArrayList(new Tuple2<>("id", m.getId()));
m.getUnit().forEach(kv -> l.add(new Tuple2<>(kv.getKey(), kv.getValue())));
return XmlSerializationUtils.asXmlElement("measure", l);
}),
measures
.stream()
.filter(m -> "downloads".equals(m.getId()) || "views".equals(m.getId()))
.filter(m -> m.getUnit().stream().anyMatch(u -> Integer.parseInt(u.getValue()) > 0))
.map(m -> XmlSerializationUtils.usageMeasureAsXmlElement("measure", m)))
.collect(Collectors.toList()); .collect(Collectors.toList());
} }

View File

@ -5,7 +5,11 @@ import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.removePrefix;
import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -166,6 +170,35 @@ public class XmlSerializationUtils {
return sb.toString(); return sb.toString();
} }
// <measure downloads="0" views="0">infrastruct_::f66f1bd369679b5b077dcdf006089556||OpenAIRE</measure>
public static String usageMeasureAsXmlElement(String name, Measure measure) {
HashSet<String> dsIds = Optional
.ofNullable(measure.getUnit())
.map(
m -> m
.stream()
.map(KeyValue::getKey)
.collect(Collectors.toCollection(HashSet::new)))
.orElse(new HashSet<>());
StringBuilder sb = new StringBuilder();
dsIds.forEach(dsId -> {
sb
.append("<")
.append(name);
for (KeyValue kv : measure.getUnit()) {
sb.append(" ").append(attr(measure.getId(), kv.getValue()));
}
sb
.append(">")
.append(dsId)
.append("</")
.append(name)
.append(">");
});
return sb.toString();
}
public static String mapEoscIf(EoscIfGuidelines e) { public static String mapEoscIf(EoscIfGuidelines e) {
return asXmlElement( return asXmlElement(
"eoscifguidelines", Lists "eoscifguidelines", Lists

View File

@ -594,7 +594,7 @@
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>create_payloads</name> <name>create_payloads</name>
<class>eu.dnetlib.dhp.oa.provision.XmlConverterJob</class> <class>eu.dnetlib.dhp.oa.provision.PayloadConverterJob</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar> <jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}

View File

@ -50,7 +50,7 @@ public class EOSCFuture_Test {
final ContextMapper contextMapper = new ContextMapper(); final ContextMapper contextMapper = new ContextMapper();
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false, final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
XmlConverterJob.schemaLocation); PayloadConverterJob.schemaLocation);
final OtherResearchProduct p = OBJECT_MAPPER final OtherResearchProduct p = OBJECT_MAPPER
.readValue( .readValue(

View File

@ -57,7 +57,7 @@ public class IndexRecordTransformerTest {
public void testPublicationRecordTransformation() throws IOException, TransformerException { public void testPublicationRecordTransformation() throws IOException, TransformerException {
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false, final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
XmlConverterJob.schemaLocation); PayloadConverterJob.schemaLocation);
final Publication p = load("publication.json", Publication.class); final Publication p = load("publication.json", Publication.class);
final Project pj = load("project.json", Project.class); final Project pj = load("project.json", Project.class);
@ -82,7 +82,7 @@ public class IndexRecordTransformerTest {
void testPeerReviewed() throws IOException, TransformerException { void testPeerReviewed() throws IOException, TransformerException {
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false, final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
XmlConverterJob.schemaLocation); PayloadConverterJob.schemaLocation);
final Publication p = load("publication.json", Publication.class); final Publication p = load("publication.json", Publication.class);
@ -98,7 +98,7 @@ public class IndexRecordTransformerTest {
public void testRiunet() throws IOException, TransformerException { public void testRiunet() throws IOException, TransformerException {
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false, final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
XmlConverterJob.schemaLocation); PayloadConverterJob.schemaLocation);
final Publication p = load("riunet.json", Publication.class); final Publication p = load("riunet.json", Publication.class);

View File

@ -37,7 +37,7 @@ public class XmlRecordFactoryTest {
final ContextMapper contextMapper = new ContextMapper(); final ContextMapper contextMapper = new ContextMapper();
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false, final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
XmlConverterJob.schemaLocation); PayloadConverterJob.schemaLocation);
final Publication p = OBJECT_MAPPER final Publication p = OBJECT_MAPPER
.readValue(IOUtils.toString(getClass().getResourceAsStream("publication.json")), Publication.class); .readValue(IOUtils.toString(getClass().getResourceAsStream("publication.json")), Publication.class);
@ -105,7 +105,7 @@ public class XmlRecordFactoryTest {
final ContextMapper contextMapper = new ContextMapper(); final ContextMapper contextMapper = new ContextMapper();
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false, final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
XmlConverterJob.schemaLocation); PayloadConverterJob.schemaLocation);
final Publication p = OBJECT_MAPPER final Publication p = OBJECT_MAPPER
.readValue(IOUtils.toString(getClass().getResourceAsStream("publication.json")), Publication.class); .readValue(IOUtils.toString(getClass().getResourceAsStream("publication.json")), Publication.class);
@ -136,7 +136,7 @@ public class XmlRecordFactoryTest {
final ContextMapper contextMapper = new ContextMapper(); final ContextMapper contextMapper = new ContextMapper();
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false, final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
XmlConverterJob.schemaLocation); PayloadConverterJob.schemaLocation);
final Publication p = OBJECT_MAPPER final Publication p = OBJECT_MAPPER
.readValue(IOUtils.toString(getClass().getResourceAsStream("publication.json")), Publication.class); .readValue(IOUtils.toString(getClass().getResourceAsStream("publication.json")), Publication.class);
@ -166,7 +166,7 @@ public class XmlRecordFactoryTest {
final ContextMapper contextMapper = new ContextMapper(); final ContextMapper contextMapper = new ContextMapper();
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false, final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
XmlConverterJob.schemaLocation); PayloadConverterJob.schemaLocation);
final Datasource d = OBJECT_MAPPER final Datasource d = OBJECT_MAPPER
.readValue(IOUtils.toString(getClass().getResourceAsStream("datasource.json")), Datasource.class); .readValue(IOUtils.toString(getClass().getResourceAsStream("datasource.json")), Datasource.class);
@ -203,7 +203,7 @@ public class XmlRecordFactoryTest {
final ContextMapper contextMapper = new ContextMapper(); final ContextMapper contextMapper = new ContextMapper();
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false, final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
XmlConverterJob.schemaLocation); PayloadConverterJob.schemaLocation);
final OtherResearchProduct p = OBJECT_MAPPER final OtherResearchProduct p = OBJECT_MAPPER
.readValue( .readValue(
@ -226,7 +226,7 @@ public class XmlRecordFactoryTest {
final ContextMapper contextMapper = new ContextMapper(); final ContextMapper contextMapper = new ContextMapper();
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false, final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
XmlConverterJob.schemaLocation); PayloadConverterJob.schemaLocation);
final OtherResearchProduct p = OBJECT_MAPPER final OtherResearchProduct p = OBJECT_MAPPER
.readValue( .readValue(
@ -249,7 +249,7 @@ public class XmlRecordFactoryTest {
final ContextMapper contextMapper = new ContextMapper(); final ContextMapper contextMapper = new ContextMapper();
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false, final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
XmlConverterJob.schemaLocation); PayloadConverterJob.schemaLocation);
final Publication p = OBJECT_MAPPER final Publication p = OBJECT_MAPPER
.readValue( .readValue(