diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java index f01f90fe4..9386db933 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java @@ -506,6 +506,8 @@ public class GraphCleaningFunctions extends CleaningFunctions { .filter(Objects::nonNull) .filter(sp -> StringUtils.isNotBlank(sp.getValue())) .map(GraphCleaningFunctions::cleanValue) + .sorted((s1, s2) -> s2.getValue().length() - s1.getValue().length()) + .limit(ModelHardLimits.MAX_ABSTRACTS) .collect(Collectors.toList())); } if (Objects.isNull(r.getResourcetype()) || StringUtils.isBlank(r.getResourcetype().getClassid())) { diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AbstractSolrRecordTransformJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AbstractSolrRecordTransformJob.java new file mode 100644 index 000000000..fd7bf4d71 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AbstractSolrRecordTransformJob.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2024. + * SPDX-FileCopyrightText: © 2023 Consiglio Nazionale delle Ricerche + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package eu.dnetlib.dhp.oa.provision; + +import java.io.StringReader; +import java.io.StringWriter; + +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerException; +import javax.xml.transform.stream.StreamResult; +import javax.xml.transform.stream.StreamSource; + +import eu.dnetlib.dhp.utils.saxon.SaxonTransformerFactory; + +public abstract class AbstractSolrRecordTransformJob { + + protected static String toIndexRecord(Transformer tr, final String xmlRecord) { + final StreamResult res = new StreamResult(new StringWriter()); + try { + tr.transform(new StreamSource(new StringReader(xmlRecord)), res); + return res.getWriter().toString(); + } catch (TransformerException e) { + throw new IllegalArgumentException("XPathException on record: \n" + xmlRecord, e); + } + } + + /** + * Creates the XSLT responsible for building the index xml records. + * + * @param format Metadata format name (DMF|TMF) + * @param xslt xslt for building the index record transformer + * @param fields the list of fields + * @return the javax.xml.transform.Transformer + * @throws TransformerException could happen + */ + protected static String getLayoutTransformer(String format, String fields, String xslt) + throws TransformerException { + + final Transformer layoutTransformer = SaxonTransformerFactory.newInstance(xslt); + final StreamResult layoutToXsltXslt = new StreamResult(new StringWriter()); + + layoutTransformer.setParameter("format", format); + layoutTransformer.transform(new StreamSource(new StringReader(fields)), layoutToXsltXslt); + + return layoutToXsltXslt.getWriter().toString(); + } + +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java index 48e5945c0..da80deee0 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java @@ -27,14 +27,7 @@ import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport; import eu.dnetlib.dhp.oa.provision.model.RelatedEntity; import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper; import eu.dnetlib.dhp.schema.common.EntityType; -import eu.dnetlib.dhp.schema.oaf.Datasource; -import eu.dnetlib.dhp.schema.oaf.Field; -import eu.dnetlib.dhp.schema.oaf.OafEntity; -import eu.dnetlib.dhp.schema.oaf.Organization; -import eu.dnetlib.dhp.schema.oaf.Project; -import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.dhp.schema.oaf.Result; -import eu.dnetlib.dhp.schema.oaf.StructuredProperty; +import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.utils.ModelHardLimits; import scala.Tuple2; @@ -156,10 +149,33 @@ public class CreateRelatedEntitiesJob_phase1 { case software: final Result result = (Result) entity; - if (result.getTitle() != null && !result.getTitle().isEmpty()) { - final StructuredProperty title = result.getTitle().stream().findFirst().get(); - title.setValue(StringUtils.left(title.getValue(), ModelHardLimits.MAX_TITLE_LENGTH)); - re.setTitle(title); + if (Objects.nonNull(result.getTitle()) && !result.getTitle().isEmpty()) { + result + .getTitle() + .stream() + .findFirst() + .map(StructuredProperty::getValue) + .ifPresent( + title -> re.getTitle().setValue(StringUtils.left(title, ModelHardLimits.MAX_TITLE_LENGTH))); + } + if (Objects.nonNull(result.getDescription()) && !result.getDescription().isEmpty()) { + result + .getDescription() + .stream() + .findFirst() + .map(Field::getValue) + .ifPresent( + d -> re.setDescription(StringUtils.left(d, ModelHardLimits.MAX_RELATED_ABSTRACT_LENGTH))); + } + if (Objects.nonNull(result.getAuthor()) && !result.getAuthor().isEmpty()) { + re + .setAuthor( + result + .getAuthor() + .stream() + .map(Author::getFullname) + .filter(StringUtils::isNotBlank) + .collect(Collectors.toList())); } re.setDateofacceptance(getValue(result.getDateofacceptance())); diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrRecordDumpJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrRecordDumpJob.java new file mode 100644 index 000000000..faa18851b --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrRecordDumpJob.java @@ -0,0 +1,144 @@ + +package eu.dnetlib.dhp.oa.provision; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.util.Optional; + +import javax.xml.transform.TransformerException; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.solr.common.SolrInputDocument; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.provision.model.SerializableSolrInputDocument; +import eu.dnetlib.dhp.oa.provision.model.TupleWrapper; +import eu.dnetlib.dhp.oa.provision.utils.ISLookupClient; +import eu.dnetlib.dhp.oa.provision.utils.StreamingInputDocumentFactory; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.dhp.utils.saxon.SaxonTransformerFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; + +public class SolrRecordDumpJob extends AbstractSolrRecordTransformJob { + + private static final Logger log = LoggerFactory.getLogger(SolrRecordDumpJob.class); + + private static final Integer DEFAULT_BATCH_SIZE = 1000; + + private final String inputPath; + + private final String format; + + private final String outputPath; + + private final SparkSession spark; + + public static void main(String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SolrRecordDumpJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/provision/input_params_solr_record_dump.json"))); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("inputPath"); + log.info("inputPath: {}", inputPath); + + final String format = parser.get("format"); + log.info("format: {}", format); + + final String outputPath = Optional + .ofNullable(parser.get("outputPath")) + .map(StringUtils::trim) + .orElse(null); + log.info("outputPath: {}", outputPath); + + final Integer batchSize = Optional + .ofNullable(parser.get("batchSize")) + .map(Integer::valueOf) + .orElse(DEFAULT_BATCH_SIZE); + log.info("batchSize: {}", batchSize); + + final boolean shouldIndex = Optional + .ofNullable(parser.get("shouldIndex")) + .map(Boolean::valueOf) + .orElse(false); + log.info("shouldIndex: {}", shouldIndex); + + final SparkConf conf = new SparkConf(); + + conf.registerKryoClasses(new Class[] { + SerializableSolrInputDocument.class + }); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + final String isLookupUrl = parser.get("isLookupUrl"); + log.info("isLookupUrl: {}", isLookupUrl); + final ISLookupClient isLookup = new ISLookupClient(ISLookupClientFactory.getLookUpService(isLookupUrl)); + new SolrRecordDumpJob(spark, inputPath, format, outputPath).run(isLookup); + }); + } + + public SolrRecordDumpJob(SparkSession spark, String inputPath, String format, String outputPath) { + this.spark = spark; + this.inputPath = inputPath; + this.format = format; + this.outputPath = outputPath; + } + + public void run(ISLookupClient isLookup) throws ISLookUpException, TransformerException { + final String fields = isLookup.getLayoutSource(format); + log.info("fields: {}", fields); + + final String xslt = isLookup.getLayoutTransformer(); + + final String dsId = isLookup.getDsId(format); + log.info("dsId: {}", dsId); + + final String indexRecordXslt = getLayoutTransformer(format, fields, xslt); + log.info("indexRecordTransformer {}", indexRecordXslt); + + final Encoder encoder = Encoders.bean(TupleWrapper.class); + spark + .read() + .schema(encoder.schema()) + .json(inputPath) + .as(encoder) + .map( + (MapFunction) t -> new TupleWrapper( + toIndexRecord(SaxonTransformerFactory.newInstance(indexRecordXslt), t.getXml()), + t.getJson()), + Encoders.bean(TupleWrapper.class)) + .map( + (MapFunction) t -> { + SolrInputDocument s = new StreamingInputDocumentFactory() + .parseDocument(t.getXml(), t.getJson()); + return new SerializableSolrInputDocument(s); + }, + Encoders.kryo(SerializableSolrInputDocument.class)) + .write() + .mode(SaveMode.Overwrite) + .parquet(outputPath); + } + +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java index 6f43ca3f7..4353e863f 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java @@ -3,6 +3,7 @@ package eu.dnetlib.dhp.oa.provision; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.utils.DHPUtils.toSeq; +import static org.apache.spark.sql.functions.*; import java.util.List; import java.util.Map; @@ -14,22 +15,31 @@ import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; +import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.*; +import org.apache.spark.sql.expressions.UserDefinedFunction; +import org.apache.spark.sql.types.DataTypes; import org.apache.spark.util.LongAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Maps; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.oa.provision.model.JoinedEntity; import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport; +import eu.dnetlib.dhp.oa.provision.model.TupleWrapper; import eu.dnetlib.dhp.oa.provision.utils.ContextMapper; import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory; +import eu.dnetlib.dhp.schema.solr.SolrRecord; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import scala.Tuple2; /** @@ -65,13 +75,20 @@ public class XmlConverterJob { final String contextApiBaseUrl = parser.get("contextApiBaseUrl"); log.info("contextApiBaseUrl: {}", contextApiBaseUrl); + String isLookupUrl = parser.get("isLookupUrl"); + log.info("isLookupUrl: {}", isLookupUrl); + + ISLookUpService isLookup = ISLookupClientFactory.getLookUpService(isLookupUrl); + final SparkConf conf = new SparkConf(); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.registerKryoClasses(ProvisionModelSupport.getModelClasses()); runWithSparkSession(conf, isSparkSessionManaged, spark -> { removeOutputDir(spark, outputPath); - convertToXml(spark, inputPath, outputPath, ContextMapper.fromAPI(contextApiBaseUrl)); + convertToXml( + spark, inputPath, outputPath, ContextMapper.fromAPI(contextApiBaseUrl), + VocabularyGroup.loadVocsFromIS(isLookup)); }); } @@ -79,7 +96,8 @@ public class XmlConverterJob { final SparkSession spark, final String inputPath, final String outputPath, - final ContextMapper contextMapper) { + final ContextMapper contextMapper, + final VocabularyGroup vocabularies) { final XmlRecordFactory recordFactory = new XmlRecordFactory( prepareAccumulators(spark.sparkContext()), @@ -92,20 +110,25 @@ public class XmlConverterJob { log.info("Found paths: {}", String.join(",", paths)); + final ObjectMapper mapper = new ObjectMapper(); + mapper.setSerializationInclusion(JsonInclude.Include.NON_EMPTY); spark .read() .load(toSeq(paths)) .as(Encoders.kryo(JoinedEntity.class)) .map( - (MapFunction>) je -> new Tuple2<>( - je.getEntity().getId(), - recordFactory.build(je)), - Encoders.tuple(Encoders.STRING(), Encoders.STRING())) - .javaRDD() - .mapToPair( - (PairFunction, Text, Text>) t -> new Tuple2<>(new Text(t._1()), - new Text(t._2()))) - .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); + (MapFunction>) je -> new Tuple2<>( + recordFactory.build(je), + ProvisionModelSupport.transform(je, contextMapper, vocabularies)), + Encoders.tuple(Encoders.STRING(), Encoders.bean(SolrRecord.class))) + .map( + (MapFunction, TupleWrapper>) t -> new TupleWrapper( + t._1(), mapper.writeValueAsString(t._2())), + Encoders.bean(TupleWrapper.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); } private static void removeOutputDir(final SparkSession spark, final String path) { diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java index cd401c6cb..d49a0596b 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java @@ -3,26 +3,17 @@ package eu.dnetlib.dhp.oa.provision; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -import java.io.StringReader; -import java.io.StringWriter; -import java.text.SimpleDateFormat; -import java.util.Date; import java.util.Optional; -import javax.xml.transform.Transformer; import javax.xml.transform.TransformerException; -import javax.xml.transform.stream.StreamResult; -import javax.xml.transform.stream.StreamSource; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.io.Text; import org.apache.solr.common.SolrInputDocument; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,34 +22,25 @@ import com.lucidworks.spark.util.SolrSupport; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.oa.provision.model.SerializableSolrInputDocument; +import eu.dnetlib.dhp.oa.provision.model.TupleWrapper; import eu.dnetlib.dhp.oa.provision.utils.ISLookupClient; import eu.dnetlib.dhp.oa.provision.utils.StreamingInputDocumentFactory; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.dhp.utils.saxon.SaxonTransformerFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; -public class XmlIndexingJob { +public class XmlIndexingJob extends AbstractSolrRecordTransformJob { private static final Logger log = LoggerFactory.getLogger(XmlIndexingJob.class); - public enum OutputFormat { - SOLR, HDFS - } - private static final Integer DEFAULT_BATCH_SIZE = 1000; - protected static final String DATE_FORMAT = "yyyy-MM-dd'T'hh:mm:ss'Z'"; - private final String inputPath; private final String format; private final int batchSize; - private final OutputFormat outputFormat; - - private final String outputPath; - private final SparkSession spark; public static void main(String[] args) throws Exception { @@ -83,25 +65,14 @@ public class XmlIndexingJob { final String format = parser.get("format"); log.info("format: {}", format); - final String outputPath = Optional - .ofNullable(parser.get("outputPath")) - .map(StringUtils::trim) - .orElse(null); - log.info("outputPath: {}", outputPath); - final Integer batchSize = Optional .ofNullable(parser.get("batchSize")) .map(Integer::valueOf) .orElse(DEFAULT_BATCH_SIZE); log.info("batchSize: {}", batchSize); - final OutputFormat outputFormat = Optional - .ofNullable(parser.get("outputFormat")) - .map(OutputFormat::valueOf) - .orElse(OutputFormat.SOLR); - log.info("outputFormat: {}", outputFormat); - final SparkConf conf = new SparkConf(); + conf.registerKryoClasses(new Class[] { SerializableSolrInputDocument.class }); @@ -113,19 +84,16 @@ public class XmlIndexingJob { final String isLookupUrl = parser.get("isLookupUrl"); log.info("isLookupUrl: {}", isLookupUrl); final ISLookupClient isLookup = new ISLookupClient(ISLookupClientFactory.getLookUpService(isLookupUrl)); - new XmlIndexingJob(spark, inputPath, format, batchSize, outputFormat, outputPath).run(isLookup); + new XmlIndexingJob(spark, inputPath, format, batchSize) + .run(isLookup); }); } - public XmlIndexingJob(SparkSession spark, String inputPath, String format, Integer batchSize, - OutputFormat outputFormat, - String outputPath) { + public XmlIndexingJob(SparkSession spark, String inputPath, String format, Integer batchSize) { this.spark = spark; this.inputPath = inputPath; this.format = format; this.batchSize = batchSize; - this.outputFormat = outputFormat; - this.outputPath = outputPath; } public void run(ISLookupClient isLookup) throws ISLookUpException, TransformerException { @@ -137,84 +105,31 @@ public class XmlIndexingJob { final String dsId = isLookup.getDsId(format); log.info("dsId: {}", dsId); + final String collection = ProvisionConstants.getCollectionName(format); + log.info("collection: {}", collection); + final String zkHost = isLookup.getZkHost(); log.info("zkHost: {}", zkHost); - final String version = getRecordDatestamp(); - final String indexRecordXslt = getLayoutTransformer(format, fields, xslt); log.info("indexRecordTransformer {}", indexRecordXslt); - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + final Encoder encoder = Encoders.bean(TupleWrapper.class); - JavaRDD docs = sc - .sequenceFile(inputPath, Text.class, Text.class) - .map(t -> t._2().toString()) - .map(s -> toIndexRecord(SaxonTransformerFactory.newInstance(indexRecordXslt), s)) - .map(s -> new StreamingInputDocumentFactory().parseDocument(s)); - - switch (outputFormat) { - case SOLR: - final String collection = ProvisionConstants.getCollectionName(format); - - // SparkSolr >= 4 - // com.lucidworks.spark.BatchSizeType bt = com.lucidworks.spark.BatchSizeType.NUM_DOCS; - // SolrSupport.indexDocs(zkHost, collection, batchSize, bt, docs.rdd()); - // SparkSolr < 4 - SolrSupport.indexDocs(zkHost, collection, batchSize, docs.rdd()); - break; - case HDFS: - spark - .createDataset( - docs.map(SerializableSolrInputDocument::new).rdd(), - Encoders.kryo(SerializableSolrInputDocument.class)) - .write() - .mode(SaveMode.Overwrite) - .parquet(outputPath); - break; - default: - throw new IllegalArgumentException("invalid outputFormat: " + outputFormat); - } - } - - protected static String toIndexRecord(Transformer tr, final String xmlRecord) { - final StreamResult res = new StreamResult(new StringWriter()); - try { - tr.transform(new StreamSource(new StringReader(xmlRecord)), res); - return res.getWriter().toString(); - } catch (TransformerException e) { - throw new IllegalArgumentException("XPathException on record: \n" + xmlRecord, e); - } - } - - /** - * Creates the XSLT responsible for building the index xml records. - * - * @param format Metadata format name (DMF|TMF) - * @param xslt xslt for building the index record transformer - * @param fields the list of fields - * @return the javax.xml.transform.Transformer - * @throws TransformerException could happen - */ - protected static String getLayoutTransformer(String format, String fields, String xslt) - throws TransformerException { - - final Transformer layoutTransformer = SaxonTransformerFactory.newInstance(xslt); - final StreamResult layoutToXsltXslt = new StreamResult(new StringWriter()); - - layoutTransformer.setParameter("format", format); - layoutTransformer.transform(new StreamSource(new StringReader(fields)), layoutToXsltXslt); - - return layoutToXsltXslt.getWriter().toString(); - } - - /** - * method return a solr-compatible string representation of a date, used to mark all records as indexed today - * - * @return the parsed date - */ - public static String getRecordDatestamp() { - return new SimpleDateFormat(DATE_FORMAT).format(new Date()); + JavaRDD docs = spark + .read() + .schema(encoder.schema()) + .json(inputPath) + .as(encoder) + .map( + (MapFunction) t -> new TupleWrapper( + toIndexRecord(SaxonTransformerFactory.newInstance(indexRecordXslt), t.getXml()), + t.getJson()), + Encoders.bean(TupleWrapper.class)) + .javaRDD() + .map( + t -> new StreamingInputDocumentFactory().parseDocument(t.getXml(), t.getJson())); + SolrSupport.indexDocs(zkHost, collection, batchSize, docs.rdd()); } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java index 0fb109fbb..da3915aee 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java @@ -5,11 +5,15 @@ import java.io.Serializable; import java.util.LinkedList; import java.util.List; -import eu.dnetlib.dhp.schema.oaf.OafEntity; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; -public class JoinedEntity implements Serializable { +import eu.dnetlib.dhp.schema.common.EntityType; +import eu.dnetlib.dhp.schema.oaf.*; - private E entity; +public class JoinedEntity implements Serializable { + + private OafEntity entity; private List links; @@ -17,16 +21,16 @@ public class JoinedEntity implements Serializable { links = new LinkedList<>(); } - public JoinedEntity(E entity) { + public JoinedEntity(OafEntity entity) { this(); this.entity = entity; } - public E getEntity() { + public OafEntity getEntity() { return entity; } - public void setEntity(E entity) { + public void setEntity(OafEntity entity) { this.entity = entity; } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java index d4ee24c14..0e6e95de5 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java @@ -1,13 +1,46 @@ package eu.dnetlib.dhp.oa.provision.model; -import java.util.List; +import static org.apache.commons.lang3.StringUtils.substringBefore; +import java.io.StringReader; +import java.util.*; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; +import org.dom4j.Document; +import org.dom4j.DocumentException; +import org.dom4j.io.SAXReader; +import org.jetbrains.annotations.Nullable; + +import com.google.common.base.Splitter; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; +import eu.dnetlib.dhp.common.vocabulary.VocabularyTerm; import eu.dnetlib.dhp.oa.provision.RelationList; import eu.dnetlib.dhp.oa.provision.SortableRelation; +import eu.dnetlib.dhp.oa.provision.utils.ContextDef; +import eu.dnetlib.dhp.oa.provision.utils.ContextMapper; import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.solr.*; +import eu.dnetlib.dhp.schema.solr.AccessRight; +import eu.dnetlib.dhp.schema.solr.Author; +import eu.dnetlib.dhp.schema.solr.Context; +import eu.dnetlib.dhp.schema.solr.Country; +import eu.dnetlib.dhp.schema.solr.Datasource; +import eu.dnetlib.dhp.schema.solr.EoscIfGuidelines; +import eu.dnetlib.dhp.schema.solr.Instance; +import eu.dnetlib.dhp.schema.solr.Journal; +import eu.dnetlib.dhp.schema.solr.OpenAccessColor; +import eu.dnetlib.dhp.schema.solr.OpenAccessRoute; +import eu.dnetlib.dhp.schema.solr.Organization; +import eu.dnetlib.dhp.schema.solr.Project; +import eu.dnetlib.dhp.schema.solr.Result; +import eu.dnetlib.dhp.schema.solr.Subject; public class ProvisionModelSupport { @@ -28,4 +61,636 @@ public class ProvisionModelSupport { RelationList.class)); return modelClasses.toArray(new Class[] {}); } + + public static SolrRecord transform(JoinedEntity je, ContextMapper contextMapper, VocabularyGroup vocs) { + SolrRecord r = new SolrRecord(); + final OafEntity e = je.getEntity(); + final RecordType type = RecordType.valueOf(e.getClass().getSimpleName().toLowerCase()); + final Boolean deletedbyinference = Optional + .ofNullable(e.getDataInfo()) + .map(DataInfo::getDeletedbyinference) + .orElse(null); + r + .setHeader( + SolrRecordHeader + .newInstance( + e.getId(), e.getOriginalId(), type, deletedbyinference)); + r.setCollectedfrom(asProvenance(e.getCollectedfrom())); + r.setContext(asContext(e.getContext(), contextMapper)); + r.setPid(asPid(e.getPid())); + + if (e instanceof eu.dnetlib.dhp.schema.oaf.Result) { + r.setResult(mapResult((eu.dnetlib.dhp.schema.oaf.Result) e)); + } else if (e instanceof eu.dnetlib.dhp.schema.oaf.Datasource) { + r.setDatasource(mapDatasource((eu.dnetlib.dhp.schema.oaf.Datasource) e)); + } else if (e instanceof eu.dnetlib.dhp.schema.oaf.Organization) { + r.setOrganization(mapOrganization((eu.dnetlib.dhp.schema.oaf.Organization) e)); + } else if (e instanceof eu.dnetlib.dhp.schema.oaf.Project) { + r.setProject(mapProject((eu.dnetlib.dhp.schema.oaf.Project) e, vocs)); + } + r + .setLinks( + Optional + .ofNullable(je.getLinks()) + .map( + links -> links + .stream() + .map(rew -> mapRelatedRecord(rew, vocs)) + .collect(Collectors.toList())) + .orElse(null)); + + return r; + } + + private static RelatedRecord mapRelatedRecord(RelatedEntityWrapper rew, VocabularyGroup vocs) { + RelatedRecord rr = new RelatedRecord(); + + final RelatedEntity re = rew.getTarget(); + final RecordType relatedRecordType = RecordType.valueOf(re.getType()); + final Relation relation = rew.getRelation(); + rr + .setHeader( + RelatedRecordHeader + .newInstance( + relation.getRelType(), + relation.getRelClass(), + relation.getTarget(), relatedRecordType)); + + rr.setAcronym(re.getAcronym()); + rr.setCode(re.getCode()); + rr.setContracttype(mapCodeLabel(re.getContracttype())); + rr.setCollectedfrom(asProvenance(re.getCollectedfrom())); + rr.setCodeRepositoryUrl(re.getCodeRepositoryUrl()); + rr.setCountry(asCountry(re.getCountry())); + rr.setDatasourcetype(mapCodeLabel(re.getDatasourcetype())); + rr.setDatasourcetypeui(mapCodeLabel(re.getDatasourcetypeui())); + rr.setDateofacceptance(re.getDateofacceptance()); + rr.setFunding(mapFunding(re.getFundingtree(), vocs)); + rr.setInstances(mapInstances(re.getInstances())); + rr.setLegalname(re.getLegalname()); + rr.setLegalshortname(re.getLegalshortname()); + rr.setOfficialname(re.getOfficialname()); + rr.setOpenairecompatibility(mapCodeLabel(re.getOpenairecompatibility())); + rr.setPid(asPid(re.getPid())); + rr.setProjectTitle(rr.getProjectTitle()); + rr.setPublisher(re.getPublisher()); + rr.setResulttype(mapQualifier(re.getResulttype())); + rr.setTitle(Optional.ofNullable(re.getTitle()).map(StructuredProperty::getValue).orElse(null)); + + return rr; + } + + private static Project mapProject(eu.dnetlib.dhp.schema.oaf.Project p, VocabularyGroup vocs) { + Project ps = new Project(); + ps.setAcronym(mapField(p.getAcronym())); + ps.setCode(mapField(p.getCode())); + ps.setContracttype(mapCodeLabel(p.getContracttype())); + ps.setCurrency(mapField(p.getCurrency())); + ps.setDuration(mapField(p.getDuration())); + ps.setCallidentifier(mapField(p.getCallidentifier())); + ps.setEcarticle29_3(mapField(p.getEcarticle29_3())); + ps.setEnddate(mapField(p.getEnddate())); + ps.setFundedamount(p.getFundedamount()); + ps.setKeywords(mapField(p.getKeywords())); + ps.setStartdate(mapField(p.getStartdate())); + ps.setSubjects(asSubjectSP(p.getSubjects())); + ps.setSummary(mapField(p.getSummary())); + ps.setTitle(mapField(p.getTitle())); + ps.setTotalcost(p.getTotalcost()); + ps.setWebsiteurl(mapField(p.getWebsiteurl())); + ps.setFunding(mapFundingField(p.getFundingtree(), vocs)); + return ps; + } + + private static Funding mapFunding(List fundingtree, VocabularyGroup vocs) { + SAXReader reader = new SAXReader(); + return Optional + .ofNullable(fundingtree) + .flatMap( + ftree -> ftree + .stream() + .map(ft -> { + try { + Document doc = reader.read(new StringReader(ft)); + String countryCode = doc.valueOf("/fundingtree/funder/jurisdiction/text()"); + Country country = vocs + .find("dnet:countries") + .map(voc -> voc.getTerm(countryCode)) + .map(VocabularyTerm::getName) + .map(label -> Country.newInstance(countryCode, label)) + .orElse(null); + + String level0_id = doc.valueOf("//funding_level_0/id/text()"); + String level1_id = doc.valueOf("//funding_level_1/id/text()"); + String level2_id = doc.valueOf("//funding_level_2/id/text()"); + + return Funding + .newInstance( + Funder + .newInstance( + doc.valueOf("/fundingtree/funder/id/text()"), + doc.valueOf("/fundingtree/funder/shortname/text()"), + doc.valueOf("/fundingtree/funder/name/text()"), + country, new ArrayList<>()), + Optional + .ofNullable(level0_id) + .map( + id -> FundingLevel + .newInstance( + id, + doc.valueOf("//funding_level_0/description/text()"), + doc.valueOf("//funding_level_0/name/text()"))) + .orElse(null), + Optional + .ofNullable(level1_id) + .map( + id -> FundingLevel + .newInstance( + id, + doc.valueOf("//funding_level_1/description/text()"), + doc.valueOf("//funding_level_1/name/text()"))) + .orElse(null), + Optional + .ofNullable(level2_id) + .map( + id -> FundingLevel + .newInstance( + id, + doc.valueOf("//funding_level_2/description/text()"), + doc.valueOf("//funding_level_2/name/text()"))) + .orElse(null)); + + } catch (DocumentException e) { + throw new IllegalArgumentException(e); + } + }) + .findFirst()) + .orElse(null); + } + + private static Funding mapFundingField(List> fundingtree, VocabularyGroup vocs) { + return mapFunding( + Optional + .ofNullable(fundingtree) + .map(fts -> fts.stream().map(Field::getValue).collect(Collectors.toList())) + .orElse(null), + vocs); + } + + private static Organization mapOrganization(eu.dnetlib.dhp.schema.oaf.Organization o) { + Organization org = new Organization(); + org.setCountry(mapCodeLabel(o.getCountry())); + org.setLegalname(mapField(o.getLegalname())); + org.setLegalshortname(mapField(o.getLegalshortname())); + org.setAlternativeNames(mapFieldList(o.getAlternativeNames())); + org.setWebsiteurl(mapField(o.getWebsiteurl())); + org.setLogourl(mapField(o.getLogourl())); + + org.setEcenterprise(mapField(o.getEcenterprise())); + org.setEchighereducation(mapField(o.getEchighereducation())); + org.setEclegalbody(mapField(o.getEclegalbody())); + org.setEcinternationalorganization(mapField(o.getEcinternationalorganization())); + org.setEcinternationalorganizationeurinterests(mapField(o.getEcinternationalorganizationeurinterests())); + org.setEclegalperson(mapField(o.getEclegalperson())); + org.setEcnonprofit(mapField(o.getEcnonprofit())); + org.setEcnutscode(mapField(o.getEcnutscode())); + org.setEcresearchorganization(mapField(o.getEcresearchorganization())); + org.setEcsmevalidated(mapField(o.getEcsmevalidated())); + + return org; + } + + private static Datasource mapDatasource(eu.dnetlib.dhp.schema.oaf.Datasource d) { + Datasource ds = new Datasource(); + ds.setEnglishname(mapField(d.getEnglishname())); + ds.setOfficialname(mapField(d.getOfficialname())); + ds.setDescription(mapField(d.getDescription())); + ds.setJournal(mapJournal(d.getJournal())); + ds.setLogourl(mapField(d.getLogourl())); + ds.setAccessinfopackage(mapFieldList(d.getAccessinfopackage())); + ds.setCertificates(mapField(d.getCertificates())); + ds.setCitationguidelineurl(mapField(d.getCitationguidelineurl())); + ds.setConsenttermsofuse(d.getConsenttermsofuse()); + ds.setConsenttermsofusedate(d.getConsenttermsofusedate()); + ds.setContactemail(mapField(d.getContactemail())); + ds.setContentpolicies(mapCodeLabel(d.getContentpolicies())); + ds.setDatabaseaccessrestriction(mapField(d.getDatabaseaccessrestriction())); + ds.setDatabaseaccesstype(mapField(d.getDatabaseaccesstype())); + ds.setDataprovider(mapField(d.getDataprovider())); + ds.setDatasourcetype(mapCodeLabel(d.getDatasourcetype())); + ds.setDatasourcetypeui(mapCodeLabel(d.getDatasourcetypeui())); + ds.setDatauploadrestriction(mapField(d.getDatauploadrestriction())); + ds.setDatauploadtype(mapField(d.getDatauploadtype())); + ds.setDateofvalidation(mapField(d.getDateofvalidation())); + ds.setEoscdatasourcetype(mapCodeLabel(d.getEoscdatasourcetype())); + ds.setEosctype(mapCodeLabel(d.getEosctype())); + ds.setFulltextdownload(d.getFulltextdownload()); + ds.setJurisdiction(mapCodeLabel(d.getJurisdiction())); + ds.setLanguages(d.getLanguages()); + ds.setLatitude(mapField(d.getLatitude())); + ds.setLongitude(mapField(d.getLongitude())); + ds.setLastconsenttermsofusedate(d.getLastconsenttermsofusedate()); + ds.setMissionstatementurl(mapField(d.getMissionstatementurl())); + ds.setNamespaceprefix(mapField(d.getNamespaceprefix())); + ds.setOdcontenttypes(mapFieldList(d.getOdcontenttypes())); + ds.setOdlanguages(mapFieldList(d.getOdlanguages())); + ds.setOdnumberofitems(mapField(d.getOdnumberofitems())); + ds.setOdnumberofitemsdate(mapField(d.getOdnumberofitemsdate())); + ds.setOdpolicies(mapField(d.getOdpolicies())); + ds.setOpenairecompatibility(mapCodeLabel(d.getOpenairecompatibility())); + ds.setPidsystems(mapField(d.getPidsystems())); + ds.setPolicies(mapCodeLabelKV(d.getPolicies())); + ds.setPreservationpolicyurl(d.getPreservationpolicyurl()); + ds.setProvidedproducttypes(ds.getProvidedproducttypes()); + ds.setReleaseenddate(mapField(d.getReleasestartdate())); + ds.setReleasestartdate(mapField(d.getReleasestartdate())); + ds.setResearchentitytypes(ds.getResearchentitytypes()); + ds.setResearchproductaccesspolicies(d.getResearchproductaccesspolicies()); + ds.setResearchproductmetadataaccesspolicies(d.getResearchproductmetadataaccesspolicies()); + ds.setServiceprovider(mapField(d.getServiceprovider())); + ds.setSubjects(asSubjectSP(d.getSubjects())); + ds.setSubmissionpolicyurl(d.getSubmissionpolicyurl()); + ds.setThematic(d.getThematic()); + ds.setVersioncontrol(d.getVersioncontrol()); + ds.setVersioning(mapField(d.getVersioning())); + + return ds; + } + + private static Result mapResult(eu.dnetlib.dhp.schema.oaf.Result r) { + Result rs = new Result(); + + rs.setResulttype(mapQualifier(r.getResulttype())); + rs.setAuthor(asAuthor(r.getAuthor())); + rs.setMaintitle(getMaintitle(r.getTitle())); + rs.setOtherTitles(getOtherTitles(r.getTitle())); + rs.setDescription(mapFieldList(r.getDescription())); + rs.setSubject(asSubject(r.getSubject())); + rs.setPublicationdate(mapField(r.getDateofacceptance())); + rs.setPublisher(mapField(r.getPublisher())); + rs.setEmbargoenddate(mapField(r.getEmbargoenddate())); + rs.setSource(mapFieldList(r.getSource())); + rs.setFormat(mapFieldList(r.getFormat())); + rs.setContributor(mapFieldList(r.getContributor())); + rs.setCoverage(mapFieldList(r.getCoverage())); + rs + .setBestaccessright( + BestAccessRight + .newInstance(r.getBestaccessright().getClassid(), r.getBestaccessright().getClassname())); + rs.setFulltext(mapFieldList(r.getFulltext())); + rs.setCountry(asCountry(r.getCountry())); + rs.setEoscifguidelines(asEOSCIF(r.getEoscifguidelines())); + + rs.setGreen(r.getIsGreen()); + rs + .setOpenAccessColor( + Optional + .ofNullable(r.getOpenAccessColor()) + .map(color -> OpenAccessColor.valueOf(color.toString())) + .orElse(null)); + rs.setInDiamondJournal(r.getIsInDiamondJournal()); + rs.setPubliclyFunded(r.getPubliclyFunded()); + rs.setTransformativeAgreement(r.getTransformativeAgreement()); + + rs.setInstance(mapInstances(r.getInstance())); + + if (r instanceof Publication) { + Publication pub = (Publication) r; + rs.setJournal(mapJournal(pub.getJournal())); + } else if (r instanceof Dataset) { + Dataset d = (Dataset) r; + rs.setSize(mapField(d.getSize())); + rs.setVersion(mapField(d.getVersion())); + } else if (r instanceof Software) { + Software sw = (Software) r; + rs.setCodeRepositoryUrl(mapField(sw.getCodeRepositoryUrl())); + rs.setProgrammingLanguage(mapQualifier(sw.getProgrammingLanguage())); + rs.setDocumentationUrl(mapFieldList(sw.getDocumentationUrl())); + } else if (r instanceof OtherResearchProduct) { + OtherResearchProduct orp = (OtherResearchProduct) r; + rs.setContactperson(mapFieldList(orp.getContactperson())); + rs.setContactgroup(mapFieldList(orp.getContactgroup())); + rs.setTool(mapFieldList(orp.getTool())); + } + return rs; + } + + @Nullable + private static List getOtherTitles(List titleList) { + return Optional + .ofNullable(titleList) + .map( + titles -> titles + .stream() + .filter( + t -> !"main title" + .equals( + Optional + .ofNullable(t.getQualifier()) + .map(Qualifier::getClassid) + .orElse(null))) + .map(StructuredProperty::getValue) + .collect(Collectors.toList())) + .orElse(null); + } + + private static String getMaintitle(List titleList) { + return Optional + .ofNullable(titleList) + .flatMap( + titles -> titles + .stream() + .filter( + t -> "main title" + .equals( + Optional + .ofNullable(t.getQualifier()) + .map(Qualifier::getClassid) + .orElse(null))) + .map(StructuredProperty::getValue) + .findFirst()) + .orElse(null); + } + + private static List mapInstances(List instanceList) { + return Optional + .ofNullable(instanceList) + .map( + instances -> instances + .stream() + .map(instance -> { + Instance i = new Instance(); + i.setCollectedfrom(asProvenance(instance.getCollectedfrom())); + i.setHostedby(asProvenance(instance.getHostedby())); + i.setFulltext(i.getFulltext()); + i.setPid(asPid(instance.getPid())); + i.setAlternateIdentifier(asPid(instance.getAlternateIdentifier())); + i.setAccessright(mapAccessRight(instance.getAccessright())); + i.setInstancetype(mapQualifier(instance.getInstancetype())); + i.setLicense(mapField(instance.getLicense())); + i.setUrl(instance.getUrl()); + i.setRefereed(mapQualifier(instance.getRefereed())); + i.setDateofacceptance(mapField(instance.getDateofacceptance())); + i.setDistributionlocation(instance.getDistributionlocation()); + i.setProcessingcharges(getProcessingcharges(instance)); + return i; + }) + .collect(Collectors.toList())) + .orElse(null); + } + + private static APC getProcessingcharges(eu.dnetlib.dhp.schema.oaf.Instance instance) { + return Optional + .of( + APC + .newInstance( + mapField(instance.getProcessingchargecurrency()), + mapField(instance.getProcessingchargeamount()))) + .filter(apc -> Objects.nonNull(apc.getAmount()) && Objects.nonNull(apc.getCurrency())) + .orElse(null); + } + + private static AccessRight mapAccessRight(eu.dnetlib.dhp.schema.oaf.AccessRight accessright) { + return AccessRight + .newInstance( + mapQualifier(accessright), + Optional + .ofNullable(accessright.getOpenAccessRoute()) + .map(route -> OpenAccessRoute.valueOf(route.toString())) + .orElse(null)); + } + + private static T mapField(eu.dnetlib.dhp.schema.oaf.Field f) { + return Optional.ofNullable(f).map(Field::getValue).orElse(null); + } + + private static List mapFieldList(List> fl) { + return Optional + .ofNullable(fl) + .map(v -> v.stream().map(Field::getValue).collect(Collectors.toList())) + .orElse(null); + } + + private static String mapQualifier(eu.dnetlib.dhp.schema.oaf.Qualifier q) { + return Optional.ofNullable(q).map(Qualifier::getClassid).orElse(null); + } + + private static Journal mapJournal(eu.dnetlib.dhp.schema.oaf.Journal joaf) { + return Optional + .ofNullable(joaf) + .map(jo -> { + Journal j = new Journal(); + j.setConferencedate(jo.getConferencedate()); + j.setConferenceplace(jo.getConferenceplace()); + j.setEdition(jo.getEdition()); + j.setSp(jo.getSp()); + j.setEp(jo.getEp()); + j.setVol(jo.getVol()); + j.setIss(jo.getEdition()); + j.setName(jo.getName()); + j.setIssnPrinted(jo.getIssnPrinted()); + j.setIssnOnline(jo.getIssnOnline()); + j.setIssnLinking(jo.getIssnLinking()); + return j; + }) + .orElse(null); + } + + private static List asProvenance(List keyValueList) { + return Optional + .ofNullable(keyValueList) + .map( + kvs -> kvs + .stream() + .map(ProvisionModelSupport::asProvenance) + .collect(Collectors.toList())) + .orElse(null); + } + + private static Provenance asProvenance(KeyValue keyValue) { + return Optional.ofNullable(keyValue).map(cf -> Provenance.newInstance(cf.getKey(), cf.getValue())).orElse(null); + } + + private static List asContext(List ctxList, + ContextMapper contextMapper) { + + final Set contexts = Optional + .ofNullable(ctxList) + .map( + ctx -> ctx + .stream() + .map(eu.dnetlib.dhp.schema.oaf.Context::getId) + .collect(Collectors.toCollection(HashSet::new))) + .orElse(new HashSet<>()); + + /* FIXME: Workaround for CLARIN mining issue: #3670#note-29 */ + if (contexts.contains("dh-ch::subcommunity::2")) { + contexts.add("clarin"); + } + + return Optional + .ofNullable(contexts) + .map( + ctx -> ctx + .stream() + .map(contextPath -> { + Context context = new Context(); + String id = ""; + Map categoryMap = Maps.newHashMap(); + for (final String token : Splitter.on("::").split(contextPath)) { + id += token; + + final ContextDef def = contextMapper.get(id); + + if (def == null) { + continue; + } + if (def.getName().equals("context")) { + context.setId(def.getId()); + context.setLabel(def.getLabel()); + context.setType(def.getType()); + } + if (def.getName().equals("category")) { + Category category = Category.newInstance(def.getId(), def.getLabel()); + if (Objects.isNull(context.getCategory())) { + context.setCategory(Lists.newArrayList()); + } + context.getCategory().add(category); + categoryMap.put(def.getId(), category); + } + if (def.getName().equals("concept")) { + String parentId = StringUtils.substringBeforeLast(def.getId(), "::"); + if (categoryMap.containsKey(parentId)) { + categoryMap + .get(parentId) + .getConcept() + .add(Concept.newInstance(def.getId(), def.getLabel())); + } + } + id += "::"; + } + return context; + }) + .collect(Collectors.toList())) + .orElse(null); + } + + private static List asPid(List pidList) { + return Optional + .ofNullable(pidList) + .map( + pids -> pids + .stream() + .map(p -> Pid.newInstance(p.getQualifier().getClassid(), p.getValue())) + .collect(Collectors.toList())) + .orElse(null); + } + + private static List asAuthor(List authorList) { + return Optional + .ofNullable(authorList) + .map( + authors -> authors + .stream() + .map( + a -> Author + .newInstance(a.getFullname(), a.getName(), a.getSurname(), a.getRank(), asPid(a.getPid()))) + .collect(Collectors.toList())) + .orElse(null); + } + + private static List asSubject(List subjectList) { + return Optional + .ofNullable(subjectList) + .map( + subjects -> subjects + .stream() + .filter(s -> Objects.nonNull(s.getQualifier())) + .filter(s -> Objects.nonNull(s.getQualifier().getClassid())) + .map(s -> Subject.newInstance(s.getValue(), s.getQualifier().getClassid())) + .collect(Collectors.toList())) + .orElse(null); + } + + private static List asSubjectSP(List subjectList) { + return Optional + .ofNullable(subjectList) + .map( + subjects -> subjects + .stream() + .filter(s -> Objects.nonNull(s.getQualifier())) + .filter(s -> Objects.nonNull(s.getQualifier().getClassid())) + .map(s -> Subject.newInstance(s.getValue(), s.getQualifier().getClassid())) + .collect(Collectors.toList())) + .orElse(null); + } + + private static Country asCountry(eu.dnetlib.dhp.schema.oaf.Qualifier country) { + return Optional + .ofNullable(country) + .filter(c -> Objects.nonNull(c.getClassid()) && Objects.nonNull(c.getClassname())) + .map(c -> Country.newInstance(c.getClassid(), c.getClassname())) + .orElse(null); + } + + private static List asCountry(List countryList) { + return Optional + .ofNullable(countryList) + .map( + countries -> countries + .stream() + .map(c -> Country.newInstance(c.getClassid(), c.getClassname())) + .collect(Collectors.toList())) + .orElse(null); + } + + private static List asEOSCIF(List eoscIfGuidelines) { + return Optional + .ofNullable(eoscIfGuidelines) + .map( + eoscif -> eoscif + .stream() + .map( + e -> EoscIfGuidelines + .newInstance(e.getCode(), e.getLabel(), e.getUrl(), e.getSemanticRelation())) + .collect(Collectors.toList())) + .orElse(null); + } + + private static List mapCodeLabelKV(List kvList) { + return Optional + .ofNullable(kvList) + .map( + kvs -> kvs + .stream() + .map(ProvisionModelSupport::mapCodeLabel) + .collect(Collectors.toList())) + .orElse(null); + } + + private static List mapCodeLabel(List qualifiers) { + return Optional + .ofNullable(qualifiers) + .map( + list -> list + .stream() + .map(ProvisionModelSupport::mapCodeLabel) + .collect(Collectors.toList())) + .orElse(null); + } + + private static CodeLabel mapCodeLabel(Qualifier qualifier) { + return Optional + .ofNullable(qualifier) + .map(q -> CodeLabel.newInstance(q.getClassid(), q.getClassname())) + .orElse(null); + } + + private static CodeLabel mapCodeLabel(KeyValue kv) { + return Optional + .ofNullable(kv) + .map(q -> CodeLabel.newInstance(kv.getKey(), kv.getValue())) + .orElse(null); + } + } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntity.java index 5c78d1826..ee010910c 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntity.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntity.java @@ -13,6 +13,8 @@ import eu.dnetlib.dhp.schema.oaf.StructuredProperty; public class RelatedEntity implements Serializable { + private static final long serialVersionUID = -4982643490443810597L; + private String id; private String type; @@ -21,6 +23,8 @@ public class RelatedEntity implements Serializable { private String websiteurl; // datasource, organizations, projects // results + private String description; + private List author; private String dateofacceptance; private String publisher; private List pid; @@ -75,6 +79,22 @@ public class RelatedEntity implements Serializable { return websiteurl; } + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + + public List getAuthor() { + return author; + } + + public void setAuthor(List author) { + this.author = author; + } + public void setWebsiteurl(String websiteurl) { this.websiteurl = websiteurl; } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper.java index 4a4a4a5be..4939ec8e9 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper.java @@ -9,6 +9,8 @@ import eu.dnetlib.dhp.schema.oaf.Relation; public class RelatedEntityWrapper implements Serializable { + private static final long serialVersionUID = -2624854064081757234L; + private Relation relation; private RelatedEntity target; diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/TupleWrapper.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/TupleWrapper.java new file mode 100644 index 000000000..1556a5eee --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/TupleWrapper.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2024. + * SPDX-FileCopyrightText: © 2023 Consiglio Nazionale delle Ricerche + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package eu.dnetlib.dhp.oa.provision.model; + +import java.io.Serializable; + +public class TupleWrapper implements Serializable { + + private static final long serialVersionUID = -1418439827125577822L; + private String xml; + + private String json; + + public TupleWrapper() { + } + + public TupleWrapper(String xml, String json) { + this.xml = xml; + this.json = json; + } + + public String getXml() { + return xml; + } + + public void setXml(String xml) { + this.xml = xml; + } + + public String getJson() { + return json; + } + + public void setJson(String json) { + this.json = json; + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/StreamingInputDocumentFactory.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/StreamingInputDocumentFactory.java index b42f9ee83..51f65c7ac 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/StreamingInputDocumentFactory.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/StreamingInputDocumentFactory.java @@ -1,6 +1,7 @@ package eu.dnetlib.dhp.oa.provision.utils; +import java.io.Serializable; import java.io.StringReader; import java.io.StringWriter; import java.util.HashMap; @@ -32,7 +33,7 @@ import com.google.common.collect.Lists; * * @author claudio */ -public class StreamingInputDocumentFactory { +public class StreamingInputDocumentFactory implements Serializable { private static final String INDEX_FIELD_PREFIX = "__"; @@ -40,6 +41,8 @@ public class StreamingInputDocumentFactory { private static final String INDEX_RESULT = INDEX_FIELD_PREFIX + RESULT; + private static final String INDEX_JSON_RESULT = INDEX_FIELD_PREFIX + "json"; + private static final String INDEX_RECORD_ID = INDEX_FIELD_PREFIX + "indexrecordidentifier"; private static final String DEFAULTDNETRESULT = "dnetResult"; @@ -71,13 +74,17 @@ public class StreamingInputDocumentFactory { this.resultName = resultName; } - public SolrInputDocument parseDocument(final String inputDocument) { + public SolrInputDocument parseDocument(final String xml) { + return parseDocument(xml, ""); + } + + public SolrInputDocument parseDocument(final String xml, final String json) { final StringWriter results = new StringWriter(); final List nsList = Lists.newLinkedList(); try { - XMLEventReader parser = inputFactory.get().createXMLEventReader(new StringReader(inputDocument)); + XMLEventReader parser = inputFactory.get().createXMLEventReader(new StringReader(xml)); final SolrInputDocument indexDocument = new SolrInputDocument(new HashMap<>()); @@ -95,13 +102,13 @@ public class StreamingInputDocumentFactory { } else if (TARGETFIELDS.equals(localName)) { parseTargetFields(indexDocument, parser); } else if (resultName.equals(localName)) { - copyResult(indexDocument, results, parser, nsList, resultName); + copyResult(indexDocument, json, results, parser, nsList, resultName); } } } if (!indexDocument.containsKey(INDEX_RECORD_ID)) { - throw new IllegalStateException("cannot extract record ID from: " + inputDocument); + throw new IllegalStateException("cannot extract record ID from: " + xml); } return indexDocument; @@ -171,6 +178,7 @@ public class StreamingInputDocumentFactory { */ protected void copyResult( final SolrInputDocument indexDocument, + final String json, final StringWriter results, final XMLEventReader parser, final List nsList, @@ -205,6 +213,7 @@ public class StreamingInputDocumentFactory { } writer.close(); indexDocument.addField(INDEX_RESULT, results.toString()); + indexDocument.addField(INDEX_JSON_RESULT, json); } finally { outputFactory.remove(); eventFactory.remove(); diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/TemplateFactory.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/TemplateFactory.java index 87c0261ac..befebe0bb 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/TemplateFactory.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/TemplateFactory.java @@ -100,13 +100,17 @@ public class TemplateFactory { public String getInstance( final List instancemetadata, final String url) { + return getInstance(instancemetadata, Lists.newArrayList(url)); + } + + public String getInstance( + final List instancemetadata, final List url) { return getTemplate(resources.getInstance()) .add("metadata", instancemetadata) .add( "webresources", Optional .ofNullable(url) - .map(u -> Lists.newArrayList(url)) .orElse(Lists.newArrayList()) .stream() .filter(StringUtils::isNotBlank) diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java index 4d9d9c341..e1f5addfd 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java @@ -49,6 +49,7 @@ import eu.dnetlib.dhp.schema.common.*; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; +import eu.dnetlib.dhp.schema.oaf.utils.ModelHardLimits; import scala.Tuple2; public class XmlRecordFactory implements Serializable { @@ -365,6 +366,7 @@ public class XmlRecordFactory implements Serializable { .getDescription() .stream() .filter(Objects::nonNull) + .limit(ModelHardLimits.MAX_ABSTRACTS) .map(c -> XmlSerializationUtils.asXmlElement("description", c.getValue())) .collect(Collectors.toCollection(HashSet::new))); } @@ -1057,7 +1059,8 @@ public class XmlRecordFactory implements Serializable { return kv != null && StringUtils.isNotBlank(kv.getKey()) && StringUtils.isNotBlank(kv.getValue()); } - private List mapFields(final RelatedEntityWrapper link, final Set contexts) { + private List mapFields(final TemplateFactory templateFactory, final RelatedEntityWrapper link, + final Set contexts) { final Relation rel = link.getRelation(); final RelatedEntity re = link.getTarget(); final String targetType = link.getTarget().getType(); @@ -1071,6 +1074,18 @@ public class XmlRecordFactory implements Serializable { if (re.getTitle() != null && isNotBlank(re.getTitle().getValue())) { metadata.add(XmlSerializationUtils.mapStructuredProperty("title", re.getTitle())); } + if (StringUtils.isNotBlank(re.getDescription())) { + metadata.add(XmlSerializationUtils.asXmlElement("description", re.getDescription())); + } + if (re.getAuthor() != null) { + metadata + .addAll( + re + .getAuthor() + .stream() + .map(author -> XmlSerializationUtils.asXmlElement("creator", author)) + .collect(Collectors.toList())); + } if (isNotBlank(re.getDateofacceptance())) { metadata .add(XmlSerializationUtils.asXmlElement("dateofacceptance", re.getDateofacceptance())); @@ -1104,6 +1119,54 @@ public class XmlRecordFactory implements Serializable { .map(p -> XmlSerializationUtils.mapStructuredProperty("pid", p)) .collect(Collectors.toList())); } + if (re.getInstances() != null) { + re + .getInstances() + .forEach(i -> { + final List instanceFields = Lists.newArrayList(); + if (i.getAccessright() != null && !i.getAccessright().isBlank()) { + instanceFields + .add(XmlSerializationUtils.mapQualifier("accessright", i.getAccessright())); + } + if (i.getHostedby() != null) { + instanceFields.add(XmlSerializationUtils.mapKeyValue("hostedby", i.getHostedby())); + } + if (i.getDateofacceptance() != null && isNotBlank(i.getDateofacceptance().getValue())) { + instanceFields + .add( + XmlSerializationUtils + .asXmlElement("dateofacceptance", i.getDateofacceptance().getValue())); + } + if (i.getInstancetype() != null && !i.getInstancetype().isBlank()) { + instanceFields + .add(XmlSerializationUtils.mapQualifier("instancetype", i.getInstancetype())); + } + + if (i.getRefereed() != null && !i.getRefereed().isBlank()) { + instanceFields.add(XmlSerializationUtils.mapQualifier("refereed", i.getRefereed())); + } + + if (i.getLicense() != null && isNotBlank(i.getLicense().getValue())) { + instanceFields + .add(XmlSerializationUtils.asXmlElement("license", i.getLicense().getValue())); + } + if (isNotBlank(i.getFulltext())) { + instanceFields.add(XmlSerializationUtils.asXmlElement("fulltext", i.getFulltext())); + } + if (i.getUrl() != null && !i.getUrl().isEmpty()) { + instanceFields + .addAll( + i + .getUrl() + .stream() + .filter(StringUtils::isNotBlank) + .map(url -> XmlSerializationUtils.asXmlElement("url", url)) + .collect(Collectors.toList())); + } + metadata.add(templateFactory.getInstance(instanceFields, i.getUrl())); + }); + } + break; case datasource: if (isNotBlank(re.getOfficialname())) { @@ -1133,6 +1196,9 @@ public class XmlRecordFactory implements Serializable { if (re.getCountry() != null && !re.getCountry().isBlank()) { metadata.add(XmlSerializationUtils.mapQualifier("country", re.getCountry())); } + if (StringUtils.isNotBlank(re.getWebsiteurl())) { + metadata.add(XmlSerializationUtils.asXmlElement("websiteurl", re.getWebsiteurl())); + } break; case project: if (isNotBlank(re.getProjectTitle())) { @@ -1182,7 +1248,7 @@ public class XmlRecordFactory implements Serializable { throw new IllegalArgumentException( String.format("missing scheme for: <%s - %s>", type, targetType)); } - final HashSet fields = Sets.newHashSet(mapFields(link, contexts)); + final HashSet fields = Sets.newHashSet(mapFields(templateFactory, link, contexts)); if (rel.getValidated() == null) { rel.setValidated(false); } @@ -1206,7 +1272,7 @@ public class XmlRecordFactory implements Serializable { .map(link -> { final String targetType = link.getTarget().getType(); final String name = ModelSupport.getMainType(EntityType.valueOf(targetType)); - final HashSet fields = Sets.newHashSet(mapFields(link, null)); + final HashSet fields = Sets.newHashSet(mapFields(templateFactory, link, null)); return templateFactory .getChild(name, link.getTarget().getId(), Lists.newArrayList(fields)); }) diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_solr_record_dump.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_solr_record_dump.json new file mode 100644 index 000000000..7e5734222 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_solr_record_dump.json @@ -0,0 +1,26 @@ +[ + { + "paramName": "is", + "paramLongName": "isLookupUrl", + "paramDescription": "URL of the isLookUp Service", + "paramRequired": true + }, + { + "paramName": "i", + "paramLongName": "inputPath", + "paramDescription": "the path of the sequence file to read the XML records", + "paramRequired": true + }, + { + "paramName": "f", + "paramLongName": "format", + "paramDescription": "MDFormat name found in the IS profile", + "paramRequired": true + }, + { + "paramName": "op", + "paramLongName": "outputPath", + "paramDescription": "path on hdfs activating an alternative output for the SolrInputDocuments", + "paramRequired": false + } +] diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json index 46286e06a..3396020e0 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json @@ -22,17 +22,5 @@ "paramLongName": "batchSize", "paramDescription": "size of the batch of documents sent to solr", "paramRequired": false - }, - { - "paramName": "of", - "paramLongName": "outputFormat", - "paramDescription": "decides the job output format, SOLR | HDFS", - "paramRequired": false - }, - { - "paramName": "op", - "paramLongName": "outputPath", - "paramDescription": "path on hdfs activating an alternative output for the SolrInputDocuments", - "paramRequired": false } ] diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_xml_converter.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_xml_converter.json index 653a69ed1..4509eb9de 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_xml_converter.json +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_xml_converter.json @@ -16,5 +16,11 @@ "paramLongName": "contextApiBaseUrl", "paramDescription": "URL of the context API", "paramRequired": true + }, + { + "paramName": "isu", + "paramLongName": "isLookupUrl", + "paramDescription": "URL of the context ISLookup Service", + "paramRequired": true } ] diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml index 9eab960f0..f60c531e4 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml @@ -592,8 +592,9 @@ --conf spark.network.timeout=${sparkNetworkTimeout} --inputPath${workingDir}/join_entities - --outputPath${workingDir}/xml + --outputPath${workingDir}/xml_json --contextApiBaseUrl${contextApiBaseUrl} + --isLookupUrl${isLookupUrl} @@ -602,8 +603,8 @@ ${wf:conf('shouldIndex') eq 'true'} - ${wf:conf('shouldIndex') eq 'false'} - + ${wf:conf('shouldIndex') eq 'false'} + @@ -646,12 +647,10 @@ --conf spark.hadoop.mapreduce.map.speculative=false --conf spark.hadoop.mapreduce.reduce.speculative=false - --inputPath${workingDir}/xml + --inputPath${workingDir}/xml_json --isLookupUrl${isLookupUrl} --format${format} --batchSize${batchSize} - --outputFormat${outputFormat} - --outputPath${workingDir}/solr_documents @@ -674,5 +673,30 @@ + + + yarn + cluster + dump_solr_records_hdfs + eu.dnetlib.dhp.oa.provision.SolrRecordDumpJob + dhp-graph-provision-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --inputPath${workingDir}/xml_json + --isLookupUrl${isLookupUrl} + --format${format} + --outputPath${workingDir}/solr_documents + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/EOSCFuture_Test.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/EOSCFuture_Test.java index 8800abf95..1a982ca39 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/EOSCFuture_Test.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/EOSCFuture_Test.java @@ -57,7 +57,7 @@ public class EOSCFuture_Test { IOUtils.toString(getClass().getResourceAsStream("eosc-future/photic-zone.json")), OtherResearchProduct.class); - final String xml = xmlRecordFactory.build(new JoinedEntity<>(p)); + final String xml = xmlRecordFactory.build(new JoinedEntity(p)); assertNotNull(xml); diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/IndexRecordTransformerTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/IndexRecordTransformerTest.java index e72883055..8d5aa3f3a 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/IndexRecordTransformerTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/IndexRecordTransformerTest.java @@ -63,7 +63,7 @@ public class IndexRecordTransformerTest { final Project pj = load("project.json", Project.class); final Relation rel = load("relToValidatedProject.json", Relation.class); - final JoinedEntity je = new JoinedEntity<>(p); + final JoinedEntity je = new JoinedEntity(p); je .setLinks( Lists @@ -86,7 +86,7 @@ public class IndexRecordTransformerTest { final Publication p = load("publication.json", Publication.class); - final JoinedEntity je = new JoinedEntity<>(p); + final JoinedEntity je = new JoinedEntity(p); final String record = xmlRecordFactory.build(je); assertNotNull(record); SolrInputDocument solrDoc = testRecordTransformation(record); @@ -102,7 +102,7 @@ public class IndexRecordTransformerTest { final Publication p = load("riunet.json", Publication.class); - final JoinedEntity je = new JoinedEntity<>(p); + final JoinedEntity je = new JoinedEntity(p); final String record = xmlRecordFactory.build(je); assertNotNull(record); testRecordTransformation(record); diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/JoinedEntityTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/JoinedEntityTest.java new file mode 100644 index 000000000..d836eddd8 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/JoinedEntityTest.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2024. + * SPDX-FileCopyrightText: © 2023 Consiglio Nazionale delle Ricerche + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package eu.dnetlib.dhp.oa.provision; + +import java.io.IOException; + +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.oa.provision.model.JoinedEntity; +import eu.dnetlib.dhp.schema.oaf.Field; +import eu.dnetlib.dhp.schema.oaf.Journal; +import eu.dnetlib.dhp.schema.oaf.Organization; +import eu.dnetlib.dhp.schema.oaf.Publication; + +class JoinedEntityTest { + + private static final Logger log = LoggerFactory.getLogger(JoinedEntityTest.class); + + @Test + void test_serialisation() throws IOException { + + Publication p = new Publication(); + p.setId("p1"); + Journal j = new Journal(); + j.setIss("1234-5678"); + p.setJournal(j); + + Organization o = new Organization(); + o.setId("o1"); + Field lName = new Field<>(); + lName.setValue("CNR"); + o.setLegalname(lName); + + ObjectMapper mapper = new ObjectMapper(); + mapper.setSerializationInclusion(JsonInclude.Include.NON_EMPTY); + + final String json = mapper.writeValueAsString(new JoinedEntity(p)); + log.info(json); + + } + +} diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrConfigExploreTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrConfigExploreTest.java index 3beca7e7e..424262eef 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrConfigExploreTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrConfigExploreTest.java @@ -86,8 +86,7 @@ public class SolrConfigExploreTest extends SolrExploreTest { String inputPath = "src/test/resources/eu/dnetlib/dhp/oa/provision/xml"; - new XmlIndexingJob(spark, inputPath, FORMAT, batchSize, XmlIndexingJob.OutputFormat.SOLR, null) - .run(isLookupClient); + new XmlIndexingJob(spark, inputPath, FORMAT, batchSize).run(isLookupClient); Assertions.assertEquals(0, miniCluster.getSolrClient().commit().getStatus()); String[] queryStrings = { diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrConfigTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrConfigTest.java index a9d885ecf..625b6d131 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrConfigTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrConfigTest.java @@ -95,7 +95,7 @@ public class SolrConfigTest extends SolrTest { String inputPath = "src/test/resources/eu/dnetlib/dhp/oa/provision/xml"; - new XmlIndexingJob(spark, inputPath, FORMAT, batchSize, XmlIndexingJob.OutputFormat.SOLR, null) + new XmlIndexingJob(spark, inputPath, FORMAT, batchSize) .run(isLookupClient); Assertions.assertEquals(0, miniCluster.getSolrClient().commit().getStatus()); diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrRecordDumpJobTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrRecordDumpJobTest.java new file mode 100644 index 000000000..c2cd3497a --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrRecordDumpJobTest.java @@ -0,0 +1,170 @@ +/* + * Copyright (c) 2024. + * SPDX-FileCopyrightText: © 2023 Consiglio Nazionale delle Ricerche + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package eu.dnetlib.dhp.oa.provision; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.io.IOException; +import java.io.StringReader; +import java.nio.file.Path; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.solr.common.SolrInputField; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.dom4j.io.SAXReader; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.oa.provision.model.SerializableSolrInputDocument; +import eu.dnetlib.dhp.oa.provision.model.TupleWrapper; +import eu.dnetlib.dhp.oa.provision.utils.ISLookupClient; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; + +@ExtendWith(MockitoExtension.class) +class SolrRecordDumpJobTest { + + protected static final Logger log = LoggerFactory.getLogger(SolrRecordDumpJobTest.class); + + protected static SparkSession spark; + + protected static final String FORMAT = "test"; + + @Mock + private ISLookUpService isLookUpService; + + @Mock + private ISLookupClient isLookupClient; + + @TempDir + public static Path workingDir; + + @BeforeAll + public static void before() { + + SparkConf conf = new SparkConf(); + conf.setAppName(SolrRecordDumpJobTest.class.getSimpleName()); + + conf.registerKryoClasses(new Class[] { + SerializableSolrInputDocument.class + }); + + conf.setMaster("local[1]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.resolve("spark").toString()); + + spark = SparkSession + .builder() + .appName(SolrRecordDumpJobTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } + + @AfterAll + public static void shutDown() throws Exception { + spark.stop(); + FileUtils.deleteDirectory(workingDir.toFile()); + } + + @BeforeEach + public void prepareMocks() throws ISLookUpException, IOException { + isLookupClient.setIsLookup(isLookUpService); + + Mockito + .when(isLookupClient.getDsId(Mockito.anyString())) + .thenReturn("313f0381-23b6-466f-a0b8-c72a9679ac4b_SW5kZXhEU1Jlc291cmNlcy9JbmRleERTUmVzb3VyY2VUeXBl"); + Mockito + .when(isLookupClient.getLayoutSource(Mockito.anyString())) + .thenReturn(IOUtils.toString(getClass().getResourceAsStream("fields.xml"))); + Mockito + .when(isLookupClient.getLayoutTransformer()) + .thenReturn(IOUtils.toString(getClass().getResourceAsStream("layoutToRecordTransformer.xsl"))); + } + + @Test + void testXmlIndexingJob_saveOnHDFS() throws Exception { + final String ID_XPATH = "//*[local-name()='header']/*[local-name()='objIdentifier']"; + + String inputPath = "src/test/resources/eu/dnetlib/dhp/oa/provision/xml"; + // String inputPath = "/Users/claudio/workspace/data/index"; + + Dataset records = spark + .read() + .schema(Encoders.bean(TupleWrapper.class).schema()) + .json(inputPath) + .as(Encoders.bean(TupleWrapper.class)); + + records.printSchema(); + + long nRecord = records.count(); + log.info("found {} records", nRecord); + + final Dataset ids = records + .map((MapFunction) TupleWrapper::getXml, Encoders.STRING()) + .map( + (MapFunction) s -> new SAXReader().read(new StringReader(s)).valueOf(ID_XPATH), + Encoders.STRING()); + + log.info("found {} ids", ids.count()); + + long xmlIdUnique = ids + .distinct() + .count(); + + log.info("found {} unique ids", xmlIdUnique); + + assertEquals(nRecord, xmlIdUnique, "IDs should be unique among input records"); + + final String outputPath = workingDir.resolve("outputPath").toAbsolutePath().toString(); + new SolrRecordDumpJob(spark, inputPath, FORMAT, outputPath).run(isLookupClient); + + final Dataset solrDocs = spark + .read() + .load(outputPath) + .as(Encoders.kryo(SerializableSolrInputDocument.class)); + + solrDocs.foreach(doc -> { + assertNotNull(doc.get("__result")); + assertNotNull(doc.get("__json")); + }); + + long docIdUnique = solrDocs.map((MapFunction) doc -> { + final SolrInputField id = doc.getField("__indexrecordidentifier"); + return id.getFirstValue().toString(); + }, Encoders.STRING()) + .distinct() + .count(); + assertEquals(xmlIdUnique, docIdUnique, "IDs should be unique among the output XML records"); + + long jsonUnique = solrDocs + .map( + (MapFunction) je -> (String) je.getField("__json").getValue(), + Encoders.STRING()) + .distinct() + .count(); + + assertEquals(jsonUnique, docIdUnique, "IDs should be unique among the output JSON records"); + + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrTest.java index 186cb964a..79527b891 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrTest.java @@ -92,7 +92,7 @@ public abstract class SolrTest { FileUtils.deleteDirectory(workingDir.toFile()); } - protected static NamedList createCollection(CloudSolrClient client, String name, int numShards, + public static NamedList createCollection(CloudSolrClient client, String name, int numShards, int replicationFactor, int maxShardsPerNode, String configName) throws Exception { ModifiableSolrParams modParams = new ModifiableSolrParams(); modParams.set(CoreAdminParams.ACTION, CollectionParams.CollectionAction.CREATE.name()); diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJobTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJobTest.java index a3a140cf6..522c34ef1 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJobTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJobTest.java @@ -2,34 +2,31 @@ package eu.dnetlib.dhp.oa.provision; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; -import java.io.StringReader; import java.net.URI; -import java.util.Map; +import java.util.Optional; import org.apache.commons.io.IOUtils; -import org.apache.hadoop.io.Text; import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.response.QueryResponse; -import org.apache.solr.common.SolrInputField; import org.apache.solr.common.params.CommonParams; -import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; -import org.dom4j.io.SAXReader; -import org.junit.jupiter.api.*; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import eu.dnetlib.dhp.oa.provision.model.SerializableSolrInputDocument; +import eu.dnetlib.dhp.oa.provision.model.TupleWrapper; import eu.dnetlib.dhp.oa.provision.utils.ISLookupClient; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; @@ -70,6 +67,7 @@ public class XmlIndexingJobTest extends SolrTest { SparkConf conf = new SparkConf(); conf.setAppName(XmlIndexingJobTest.class.getSimpleName()); + conf.registerKryoClasses(new Class[] { SerializableSolrInputDocument.class }); @@ -97,13 +95,15 @@ public class XmlIndexingJobTest extends SolrTest { String inputPath = "src/test/resources/eu/dnetlib/dhp/oa/provision/xml"; - long nRecord = JavaSparkContext - .fromSparkContext(spark.sparkContext()) - .sequenceFile(inputPath, Text.class, Text.class) - .count(); + Dataset records = spark + .read() + .schema(Encoders.bean(TupleWrapper.class).schema()) + .json(inputPath) + .as(Encoders.bean(TupleWrapper.class)); - new XmlIndexingJob(spark, inputPath, FORMAT, batchSize, XmlIndexingJob.OutputFormat.SOLR, null) - .run(isLookupClient); + long nRecord = records.count(); + + new XmlIndexingJob(spark, inputPath, FORMAT, batchSize).run(isLookupClient); assertEquals(0, miniCluster.getSolrClient().commit().getStatus()); @@ -137,40 +137,26 @@ public class XmlIndexingJobTest extends SolrTest { assertEquals( 0, rsp.getResults().getNumFound(), "the number of indexed records having peerreviewed = true"); - } - @Test - void testXmlIndexingJob_saveOnHDFS() throws Exception { - final String ID_XPATH = "//header/*[local-name()='objIdentifier']"; + rsp = miniCluster + .getSolrClient() + .query( + new SolrQuery() + .add(CommonParams.Q, "objidentifier:\"iddesignpres::ae77e56e84ad058d9e7f19fa2f7325db\"") + .add(CommonParams.FL, "__json")); + assertEquals( + 1, rsp.getResults().getNumFound(), + "the number of indexed records having the given identifier"); + Optional json = rsp + .getResults() + .stream() + .map(d -> d.getFieldValues("__json")) + .flatMap(d -> d.stream()) + .findFirst(); - String inputPath = "src/test/resources/eu/dnetlib/dhp/oa/provision/xml"; + assertTrue(json.isPresent()); - final JavaPairRDD xmlRecords = JavaSparkContext - .fromSparkContext(spark.sparkContext()) - .sequenceFile(inputPath, Text.class, Text.class); - long nRecord = xmlRecords.count(); - long xmlIdUnique = xmlRecords - .map(t -> t._2().toString()) - .map(s -> new SAXReader().read(new StringReader(s)).valueOf(ID_XPATH)) - .distinct() - .count(); - assertEquals(nRecord, xmlIdUnique, "IDs should be unique among input records"); - - final String outputPath = workingDir.resolve("outputPath").toAbsolutePath().toString(); - new XmlIndexingJob(spark, inputPath, FORMAT, batchSize, XmlIndexingJob.OutputFormat.HDFS, outputPath) - .run(isLookupClient); - - final Dataset solrDocs = spark - .read() - .load(outputPath) - .as(Encoders.kryo(SerializableSolrInputDocument.class)); - long docIdUnique = solrDocs.map((MapFunction) doc -> { - final SolrInputField id = doc.getField("__indexrecordidentifier"); - return id.getFirstValue().toString(); - }, Encoders.STRING()) - .distinct() - .count(); - assertEquals(xmlIdUnique, docIdUnique, "IDs should be unique among the output records"); + log.info((String) json.get()); } diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlRecordFactoryTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlRecordFactoryTest.java index ba9572b17..f26c384d2 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlRecordFactoryTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlRecordFactoryTest.java @@ -42,7 +42,7 @@ public class XmlRecordFactoryTest { final Publication p = OBJECT_MAPPER .readValue(IOUtils.toString(getClass().getResourceAsStream("publication.json")), Publication.class); - final String xml = xmlRecordFactory.build(new JoinedEntity<>(p)); + final String xml = xmlRecordFactory.build(new JoinedEntity(p)); assertNotNull(xml); @@ -117,7 +117,7 @@ public class XmlRecordFactoryTest { final List links = Lists.newArrayList(); final RelatedEntityWrapper rew = new RelatedEntityWrapper(rel, relatedProject); links.add(rew); - final JoinedEntity je = new JoinedEntity<>(p); + final JoinedEntity je = new JoinedEntity(p); je.setLinks(links); final String xml = xmlRecordFactory.build(je); @@ -148,7 +148,7 @@ public class XmlRecordFactoryTest { final List links = Lists.newArrayList(); final RelatedEntityWrapper rew = new RelatedEntityWrapper(rel, relatedProject); links.add(rew); - final JoinedEntity je = new JoinedEntity<>(p); + final JoinedEntity je = new JoinedEntity(p); je.setLinks(links); final String xml = xmlRecordFactory.build(je); @@ -171,7 +171,7 @@ public class XmlRecordFactoryTest { final Datasource d = OBJECT_MAPPER .readValue(IOUtils.toString(getClass().getResourceAsStream("datasource.json")), Datasource.class); - final String xml = xmlRecordFactory.build(new JoinedEntity<>(d)); + final String xml = xmlRecordFactory.build(new JoinedEntity(d)); assertNotNull(xml); @@ -210,7 +210,7 @@ public class XmlRecordFactoryTest { IOUtils.toString(getClass().getResourceAsStream("d4science-1-training.json")), OtherResearchProduct.class); - final String xml = xmlRecordFactory.build(new JoinedEntity<>(p)); + final String xml = xmlRecordFactory.build(new JoinedEntity(p)); assertNotNull(xml); @@ -233,7 +233,7 @@ public class XmlRecordFactoryTest { IOUtils.toString(getClass().getResourceAsStream("d4science-2-dataset.json")), OtherResearchProduct.class); - final String xml = xmlRecordFactory.build(new JoinedEntity<>(p)); + final String xml = xmlRecordFactory.build(new JoinedEntity(p)); assertNotNull(xml); @@ -256,7 +256,7 @@ public class XmlRecordFactoryTest { IOUtils.toString(getClass().getResourceAsStream("iris-odf-4.json")), Publication.class); - final String xml = xmlRecordFactory.build(new JoinedEntity<>(p)); + final String xml = xmlRecordFactory.build(new JoinedEntity(p)); assertNotNull(xml); diff --git a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/solr/conf/exploreTestConfig/managed-schema b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/solr/conf/exploreTestConfig/managed-schema index 4e85ca3be..9720d3f37 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/solr/conf/exploreTestConfig/managed-schema +++ b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/solr/conf/exploreTestConfig/managed-schema @@ -202,6 +202,7 @@ + @@ -223,17 +224,22 @@ + + + + + diff --git a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/solr/conf/testConfig/managed-schema b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/solr/conf/testConfig/managed-schema index e191c6223..d9381b58e 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/solr/conf/testConfig/managed-schema +++ b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/solr/conf/testConfig/managed-schema @@ -195,186 +195,147 @@ - - - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - - - - - - - - - + @@ -382,26 +343,20 @@ - - - - - - - - - + + + diff --git a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/xml/part-00000 b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/xml/part-00000 deleted file mode 100644 index ff4095a11..000000000 Binary files a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/xml/part-00000 and /dev/null differ diff --git a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/xml/part-00000.json.gz b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/xml/part-00000.json.gz new file mode 100644 index 000000000..8dfcea4fa Binary files /dev/null and b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/xml/part-00000.json.gz differ