diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AbstractSolrRecordTransformJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AbstractSolrRecordTransformJob.java new file mode 100644 index 0000000000..fd7bf4d719 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AbstractSolrRecordTransformJob.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2024. + * SPDX-FileCopyrightText: © 2023 Consiglio Nazionale delle Ricerche + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package eu.dnetlib.dhp.oa.provision; + +import java.io.StringReader; +import java.io.StringWriter; + +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerException; +import javax.xml.transform.stream.StreamResult; +import javax.xml.transform.stream.StreamSource; + +import eu.dnetlib.dhp.utils.saxon.SaxonTransformerFactory; + +public abstract class AbstractSolrRecordTransformJob { + + protected static String toIndexRecord(Transformer tr, final String xmlRecord) { + final StreamResult res = new StreamResult(new StringWriter()); + try { + tr.transform(new StreamSource(new StringReader(xmlRecord)), res); + return res.getWriter().toString(); + } catch (TransformerException e) { + throw new IllegalArgumentException("XPathException on record: \n" + xmlRecord, e); + } + } + + /** + * Creates the XSLT responsible for building the index xml records. + * + * @param format Metadata format name (DMF|TMF) + * @param xslt xslt for building the index record transformer + * @param fields the list of fields + * @return the javax.xml.transform.Transformer + * @throws TransformerException could happen + */ + protected static String getLayoutTransformer(String format, String fields, String xslt) + throws TransformerException { + + final Transformer layoutTransformer = SaxonTransformerFactory.newInstance(xslt); + final StreamResult layoutToXsltXslt = new StreamResult(new StringWriter()); + + layoutTransformer.setParameter("format", format); + layoutTransformer.transform(new StreamSource(new StringReader(fields)), layoutToXsltXslt); + + return layoutToXsltXslt.getWriter().toString(); + } + +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrAdminApplication.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrAdminApplication.java index d4d414fed0..0033978bf8 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrAdminApplication.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrAdminApplication.java @@ -62,16 +62,8 @@ public class SolrAdminApplication implements Closeable { final String collection = ProvisionConstants.getCollectionName(format); log.info("collection: {}", collection); - final boolean shouldIndex = Optional - .ofNullable(parser.get("shouldIndex")) - .map(Boolean::valueOf) - .orElse(false); - log.info("shouldIndex: {}", shouldIndex); - - if (shouldIndex) { - try (SolrAdminApplication app = new SolrAdminApplication(zkHost)) { - app.execute(action, collection, query, commit); - } + try (SolrAdminApplication app = new SolrAdminApplication(zkHost)) { + app.execute(action, collection, query, commit); } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrRecordDumpJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrRecordDumpJob.java new file mode 100644 index 0000000000..faa18851bc --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrRecordDumpJob.java @@ -0,0 +1,144 @@ + +package eu.dnetlib.dhp.oa.provision; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.util.Optional; + +import javax.xml.transform.TransformerException; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.solr.common.SolrInputDocument; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.provision.model.SerializableSolrInputDocument; +import eu.dnetlib.dhp.oa.provision.model.TupleWrapper; +import eu.dnetlib.dhp.oa.provision.utils.ISLookupClient; +import eu.dnetlib.dhp.oa.provision.utils.StreamingInputDocumentFactory; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.dhp.utils.saxon.SaxonTransformerFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; + +public class SolrRecordDumpJob extends AbstractSolrRecordTransformJob { + + private static final Logger log = LoggerFactory.getLogger(SolrRecordDumpJob.class); + + private static final Integer DEFAULT_BATCH_SIZE = 1000; + + private final String inputPath; + + private final String format; + + private final String outputPath; + + private final SparkSession spark; + + public static void main(String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SolrRecordDumpJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/provision/input_params_solr_record_dump.json"))); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("inputPath"); + log.info("inputPath: {}", inputPath); + + final String format = parser.get("format"); + log.info("format: {}", format); + + final String outputPath = Optional + .ofNullable(parser.get("outputPath")) + .map(StringUtils::trim) + .orElse(null); + log.info("outputPath: {}", outputPath); + + final Integer batchSize = Optional + .ofNullable(parser.get("batchSize")) + .map(Integer::valueOf) + .orElse(DEFAULT_BATCH_SIZE); + log.info("batchSize: {}", batchSize); + + final boolean shouldIndex = Optional + .ofNullable(parser.get("shouldIndex")) + .map(Boolean::valueOf) + .orElse(false); + log.info("shouldIndex: {}", shouldIndex); + + final SparkConf conf = new SparkConf(); + + conf.registerKryoClasses(new Class[] { + SerializableSolrInputDocument.class + }); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + final String isLookupUrl = parser.get("isLookupUrl"); + log.info("isLookupUrl: {}", isLookupUrl); + final ISLookupClient isLookup = new ISLookupClient(ISLookupClientFactory.getLookUpService(isLookupUrl)); + new SolrRecordDumpJob(spark, inputPath, format, outputPath).run(isLookup); + }); + } + + public SolrRecordDumpJob(SparkSession spark, String inputPath, String format, String outputPath) { + this.spark = spark; + this.inputPath = inputPath; + this.format = format; + this.outputPath = outputPath; + } + + public void run(ISLookupClient isLookup) throws ISLookUpException, TransformerException { + final String fields = isLookup.getLayoutSource(format); + log.info("fields: {}", fields); + + final String xslt = isLookup.getLayoutTransformer(); + + final String dsId = isLookup.getDsId(format); + log.info("dsId: {}", dsId); + + final String indexRecordXslt = getLayoutTransformer(format, fields, xslt); + log.info("indexRecordTransformer {}", indexRecordXslt); + + final Encoder encoder = Encoders.bean(TupleWrapper.class); + spark + .read() + .schema(encoder.schema()) + .json(inputPath) + .as(encoder) + .map( + (MapFunction) t -> new TupleWrapper( + toIndexRecord(SaxonTransformerFactory.newInstance(indexRecordXslt), t.getXml()), + t.getJson()), + Encoders.bean(TupleWrapper.class)) + .map( + (MapFunction) t -> { + SolrInputDocument s = new StreamingInputDocumentFactory() + .parseDocument(t.getXml(), t.getJson()); + return new SerializableSolrInputDocument(s); + }, + Encoders.kryo(SerializableSolrInputDocument.class)) + .write() + .mode(SaveMode.Overwrite) + .parquet(outputPath); + } + +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java index 3c107a9859..d49a0596b7 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java @@ -3,26 +3,18 @@ package eu.dnetlib.dhp.oa.provision; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -import java.io.StringReader; -import java.io.StringWriter; -import java.text.SimpleDateFormat; -import java.util.Date; import java.util.Optional; -import javax.xml.transform.Transformer; import javax.xml.transform.TransformerException; -import javax.xml.transform.stream.StreamResult; -import javax.xml.transform.stream.StreamSource; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.io.Text; import org.apache.solr.common.SolrInputDocument; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.*; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,32 +28,19 @@ import eu.dnetlib.dhp.oa.provision.utils.StreamingInputDocumentFactory; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.dhp.utils.saxon.SaxonTransformerFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; -import scala.Tuple2; -public class XmlIndexingJob { +public class XmlIndexingJob extends AbstractSolrRecordTransformJob { private static final Logger log = LoggerFactory.getLogger(XmlIndexingJob.class); - public enum OutputFormat { - SOLR, HDFS - } - private static final Integer DEFAULT_BATCH_SIZE = 1000; - protected static final String DATE_FORMAT = "yyyy-MM-dd'T'hh:mm:ss'Z'"; - private final String inputPath; private final String format; private final int batchSize; - private final OutputFormat outputFormat; - - private final String outputPath; - - private boolean shouldIndex; - private final SparkSession spark; public static void main(String[] args) throws Exception { @@ -86,30 +65,12 @@ public class XmlIndexingJob { final String format = parser.get("format"); log.info("format: {}", format); - final String outputPath = Optional - .ofNullable(parser.get("outputPath")) - .map(StringUtils::trim) - .orElse(null); - log.info("outputPath: {}", outputPath); - final Integer batchSize = Optional .ofNullable(parser.get("batchSize")) .map(Integer::valueOf) .orElse(DEFAULT_BATCH_SIZE); log.info("batchSize: {}", batchSize); - final OutputFormat outputFormat = Optional - .ofNullable(parser.get("outputFormat")) - .map(OutputFormat::valueOf) - .orElse(OutputFormat.SOLR); - log.info("outputFormat: {}", outputFormat); - - final boolean shouldIndex = Optional - .ofNullable(parser.get("shouldIndex")) - .map(Boolean::valueOf) - .orElse(false); - log.info("shouldIndex: {}", shouldIndex); - final SparkConf conf = new SparkConf(); conf.registerKryoClasses(new Class[] { @@ -123,20 +84,16 @@ public class XmlIndexingJob { final String isLookupUrl = parser.get("isLookupUrl"); log.info("isLookupUrl: {}", isLookupUrl); final ISLookupClient isLookup = new ISLookupClient(ISLookupClientFactory.getLookUpService(isLookupUrl)); - new XmlIndexingJob(spark, inputPath, format, batchSize, outputFormat, shouldIndex, outputPath) + new XmlIndexingJob(spark, inputPath, format, batchSize) .run(isLookup); }); } - public XmlIndexingJob(SparkSession spark, String inputPath, String format, Integer batchSize, - OutputFormat outputFormat, boolean shouldIndex, String outputPath) { + public XmlIndexingJob(SparkSession spark, String inputPath, String format, Integer batchSize) { this.spark = spark; this.inputPath = inputPath; this.format = format; this.batchSize = batchSize; - this.outputFormat = outputFormat; - this.shouldIndex = shouldIndex; - this.outputPath = outputPath; } public void run(ISLookupClient isLookup) throws ISLookUpException, TransformerException { @@ -148,6 +105,9 @@ public class XmlIndexingJob { final String dsId = isLookup.getDsId(format); log.info("dsId: {}", dsId); + final String collection = ProvisionConstants.getCollectionName(format); + log.info("collection: {}", collection); + final String zkHost = isLookup.getZkHost(); log.info("zkHost: {}", zkHost); @@ -155,7 +115,8 @@ public class XmlIndexingJob { log.info("indexRecordTransformer {}", indexRecordXslt); final Encoder encoder = Encoders.bean(TupleWrapper.class); - final Dataset records = spark + + JavaRDD docs = spark .read() .schema(encoder.schema()) .json(inputPath) @@ -164,80 +125,11 @@ public class XmlIndexingJob { (MapFunction) t -> new TupleWrapper( toIndexRecord(SaxonTransformerFactory.newInstance(indexRecordXslt), t.getXml()), t.getJson()), - Encoders.bean(TupleWrapper.class)); - - switch (outputFormat) { - case SOLR: - if (shouldIndex) { - final String collection = ProvisionConstants.getCollectionName(format); - - // SparkSolr >= 4 - // com.lucidworks.spark.BatchSizeType bt = com.lucidworks.spark.BatchSizeType.NUM_DOCS; - // SolrSupport.indexDocs(zkHost, collection, batchSize, bt, docs.rdd()); - // SparkSolr < 4 - JavaRDD docs = records - .javaRDD() - .map( - t -> new StreamingInputDocumentFactory().parseDocument(t.getXml(), t.getJson())); - SolrSupport.indexDocs(zkHost, collection, batchSize, docs.rdd()); - } - break; - case HDFS: - records - .map( - (MapFunction) t -> { - SolrInputDocument s = new StreamingInputDocumentFactory() - .parseDocument(t.getXml(), t.getJson()); - return new SerializableSolrInputDocument(s); - }, - Encoders.kryo(SerializableSolrInputDocument.class)) - .write() - .mode(SaveMode.Overwrite) - .parquet(outputPath); - break; - default: - throw new IllegalArgumentException("invalid outputFormat: " + outputFormat); - } - } - - protected static String toIndexRecord(Transformer tr, final String xmlRecord) { - final StreamResult res = new StreamResult(new StringWriter()); - try { - tr.transform(new StreamSource(new StringReader(xmlRecord)), res); - return res.getWriter().toString(); - } catch (TransformerException e) { - throw new IllegalArgumentException("XPathException on record: \n" + xmlRecord, e); - } - } - - /** - * Creates the XSLT responsible for building the index xml records. - * - * @param format Metadata format name (DMF|TMF) - * @param xslt xslt for building the index record transformer - * @param fields the list of fields - * @return the javax.xml.transform.Transformer - * @throws TransformerException could happen - */ - protected static String getLayoutTransformer(String format, String fields, String xslt) - throws TransformerException { - - final Transformer layoutTransformer = SaxonTransformerFactory.newInstance(xslt); - final StreamResult layoutToXsltXslt = new StreamResult(new StringWriter()); - - layoutTransformer.setParameter("format", format); - layoutTransformer.transform(new StreamSource(new StringReader(fields)), layoutToXsltXslt); - - return layoutToXsltXslt.getWriter().toString(); - } - - /** - * method return a solr-compatible string representation of a date, used to mark all records as indexed today - * - * @return the parsed date - */ - public static String getRecordDatestamp() { - return new SimpleDateFormat(DATE_FORMAT).format(new Date()); + Encoders.bean(TupleWrapper.class)) + .javaRDD() + .map( + t -> new StreamingInputDocumentFactory().parseDocument(t.getXml(), t.getJson())); + SolrSupport.indexDocs(zkHost, collection, batchSize, docs.rdd()); } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java index 6af692ed91..7705c62c0e 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java @@ -63,27 +63,32 @@ public class ProvisionModelSupport { } public static SolrRecord transform(JoinedEntity je, ContextMapper contextMapper, VocabularyGroup vocs) { - SolrRecord s = new SolrRecord(); + SolrRecord record = new SolrRecord(); final OafEntity e = je.getEntity(); - s + final RecordType type = RecordType.valueOf(e.getClass().getSimpleName().toLowerCase()); + final Boolean deletedbyinference = Optional + .ofNullable(e.getDataInfo()) + .map(DataInfo::getDeletedbyinference) + .orElse(null); + record .setHeader( SolrRecordHeader .newInstance( - e.getId(), e.getOriginalId(), RecordType.valueOf(e.getClass().getSimpleName().toLowerCase()))); - s.setCollectedfrom(asProvenance(e.getCollectedfrom())); - s.setContext(asContext(e.getContext(), contextMapper)); - s.setPid(asPid(e.getPid())); + e.getId(), e.getOriginalId(), type, deletedbyinference)); + record.setCollectedfrom(asProvenance(e.getCollectedfrom())); + record.setContext(asContext(e.getContext(), contextMapper)); + record.setPid(asPid(e.getPid())); if (e instanceof eu.dnetlib.dhp.schema.oaf.Result) { - s.setResult(mapResult((eu.dnetlib.dhp.schema.oaf.Result) e)); + record.setResult(mapResult((eu.dnetlib.dhp.schema.oaf.Result) e)); } else if (e instanceof eu.dnetlib.dhp.schema.oaf.Datasource) { - s.setDatasource(mapDatasource((eu.dnetlib.dhp.schema.oaf.Datasource) e)); + record.setDatasource(mapDatasource((eu.dnetlib.dhp.schema.oaf.Datasource) e)); } else if (e instanceof eu.dnetlib.dhp.schema.oaf.Organization) { - s.setOrganization(mapOrganization((eu.dnetlib.dhp.schema.oaf.Organization) e)); + record.setOrganization(mapOrganization((eu.dnetlib.dhp.schema.oaf.Organization) e)); } else if (e instanceof eu.dnetlib.dhp.schema.oaf.Project) { - s.setProject(mapProject((eu.dnetlib.dhp.schema.oaf.Project) e, vocs)); + record.setProject(mapProject((eu.dnetlib.dhp.schema.oaf.Project) e, vocs)); } - s + record .setLinks( Optional .ofNullable(je.getLinks()) @@ -94,7 +99,7 @@ public class ProvisionModelSupport { .collect(Collectors.toList())) .orElse(null)); - return s; + return record; } private static RelatedRecord mapRelatedRecord(RelatedEntityWrapper rew, VocabularyGroup vocs) { diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/TupleWrapper.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/TupleWrapper.java index c23a7f550a..1556a5eee3 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/TupleWrapper.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/TupleWrapper.java @@ -10,6 +10,7 @@ import java.io.Serializable; public class TupleWrapper implements Serializable { + private static final long serialVersionUID = -1418439827125577822L; private String xml; private String json; diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_solr_record_dump.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_solr_record_dump.json new file mode 100644 index 0000000000..7e5734222b --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_solr_record_dump.json @@ -0,0 +1,26 @@ +[ + { + "paramName": "is", + "paramLongName": "isLookupUrl", + "paramDescription": "URL of the isLookUp Service", + "paramRequired": true + }, + { + "paramName": "i", + "paramLongName": "inputPath", + "paramDescription": "the path of the sequence file to read the XML records", + "paramRequired": true + }, + { + "paramName": "f", + "paramLongName": "format", + "paramDescription": "MDFormat name found in the IS profile", + "paramRequired": true + }, + { + "paramName": "op", + "paramLongName": "outputPath", + "paramDescription": "path on hdfs activating an alternative output for the SolrInputDocuments", + "paramRequired": false + } +] diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json index 2960e5aaa5..3396020e07 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json @@ -22,23 +22,5 @@ "paramLongName": "batchSize", "paramDescription": "size of the batch of documents sent to solr", "paramRequired": false - }, - { - "paramName": "of", - "paramLongName": "outputFormat", - "paramDescription": "decides the job output format, SOLR | HDFS", - "paramRequired": false - }, - { - "paramName": "si", - "paramLongName": "shouldIndex", - "paramDescription": "should the action actually index the records?", - "paramRequired": true - }, - { - "paramName": "op", - "paramLongName": "outputPath", - "paramDescription": "path on hdfs activating an alternative output for the SolrInputDocuments", - "paramRequired": false } ] diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_solradmin_parameters.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_solradmin_parameters.json index 53805d3c13..23eca2f7b4 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_solradmin_parameters.json +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_solradmin_parameters.json @@ -28,11 +28,5 @@ "paramLongName": "commit", "paramDescription": "should the action be followed by a commit?", "paramRequired": false - }, - { - "paramName": "i", - "paramLongName": "shouldIndex", - "paramDescription": "should the action actually index the records?", - "paramRequired": true } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml index 8908c5c838..f60c531e43 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml @@ -591,18 +591,23 @@ --conf spark.sql.shuffle.partitions=3840 --conf spark.network.timeout=${sparkNetworkTimeout} - - --inputPath/user/claudio.atzori/data/provision/join_entities - - - --outputPath/user/claudio.atzori/data/provision/xml_json_test + --inputPath${workingDir}/join_entities + --outputPath${workingDir}/xml_json --contextApiBaseUrl${contextApiBaseUrl} --isLookupUrl${isLookupUrl} - + + + + ${wf:conf('shouldIndex') eq 'true'} + ${wf:conf('shouldIndex') eq 'false'} + + + + @@ -617,7 +622,6 @@ --actionDELETE_BY_QUERY --query${solrDeletionQuery} --committrue - --shouldIndex${shouldIndex} @@ -643,17 +647,10 @@ --conf spark.hadoop.mapreduce.map.speculative=false --conf spark.hadoop.mapreduce.reduce.speculative=false - - --inputPath/user/claudio.atzori/data/provision/xml_json_test + --inputPath${workingDir}/xml_json --isLookupUrl${isLookupUrl} --format${format} --batchSize${batchSize} - --outputFormat${outputFormat} - - - --outputPath/user/claudio.atzori/data/provision/solr_documents - - --shouldIndex${shouldIndex} @@ -671,11 +668,35 @@ --isLookupUrl${isLookupUrl} --format${format} --actionCOMMIT - --shouldIndex${shouldIndex} + + + yarn + cluster + dump_solr_records_hdfs + eu.dnetlib.dhp.oa.provision.SolrRecordDumpJob + dhp-graph-provision-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --inputPath${workingDir}/xml_json + --isLookupUrl${isLookupUrl} + --format${format} + --outputPath${workingDir}/solr_documents + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrConfigExploreTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrConfigExploreTest.java index c749781ec7..424262eefe 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrConfigExploreTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrConfigExploreTest.java @@ -86,8 +86,7 @@ public class SolrConfigExploreTest extends SolrExploreTest { String inputPath = "src/test/resources/eu/dnetlib/dhp/oa/provision/xml"; - new XmlIndexingJob(spark, inputPath, FORMAT, batchSize, XmlIndexingJob.OutputFormat.SOLR, true, null) - .run(isLookupClient); + new XmlIndexingJob(spark, inputPath, FORMAT, batchSize).run(isLookupClient); Assertions.assertEquals(0, miniCluster.getSolrClient().commit().getStatus()); String[] queryStrings = { diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrConfigTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrConfigTest.java index 91dbf89785..625b6d131c 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrConfigTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrConfigTest.java @@ -95,7 +95,7 @@ public class SolrConfigTest extends SolrTest { String inputPath = "src/test/resources/eu/dnetlib/dhp/oa/provision/xml"; - new XmlIndexingJob(spark, inputPath, FORMAT, batchSize, XmlIndexingJob.OutputFormat.SOLR, false, null) + new XmlIndexingJob(spark, inputPath, FORMAT, batchSize) .run(isLookupClient); Assertions.assertEquals(0, miniCluster.getSolrClient().commit().getStatus()); diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrRecordDumpJobTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrRecordDumpJobTest.java new file mode 100644 index 0000000000..c2cd3497a5 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrRecordDumpJobTest.java @@ -0,0 +1,170 @@ +/* + * Copyright (c) 2024. + * SPDX-FileCopyrightText: © 2023 Consiglio Nazionale delle Ricerche + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +package eu.dnetlib.dhp.oa.provision; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.io.IOException; +import java.io.StringReader; +import java.nio.file.Path; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.solr.common.SolrInputField; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.dom4j.io.SAXReader; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.oa.provision.model.SerializableSolrInputDocument; +import eu.dnetlib.dhp.oa.provision.model.TupleWrapper; +import eu.dnetlib.dhp.oa.provision.utils.ISLookupClient; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; + +@ExtendWith(MockitoExtension.class) +class SolrRecordDumpJobTest { + + protected static final Logger log = LoggerFactory.getLogger(SolrRecordDumpJobTest.class); + + protected static SparkSession spark; + + protected static final String FORMAT = "test"; + + @Mock + private ISLookUpService isLookUpService; + + @Mock + private ISLookupClient isLookupClient; + + @TempDir + public static Path workingDir; + + @BeforeAll + public static void before() { + + SparkConf conf = new SparkConf(); + conf.setAppName(SolrRecordDumpJobTest.class.getSimpleName()); + + conf.registerKryoClasses(new Class[] { + SerializableSolrInputDocument.class + }); + + conf.setMaster("local[1]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.resolve("spark").toString()); + + spark = SparkSession + .builder() + .appName(SolrRecordDumpJobTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } + + @AfterAll + public static void shutDown() throws Exception { + spark.stop(); + FileUtils.deleteDirectory(workingDir.toFile()); + } + + @BeforeEach + public void prepareMocks() throws ISLookUpException, IOException { + isLookupClient.setIsLookup(isLookUpService); + + Mockito + .when(isLookupClient.getDsId(Mockito.anyString())) + .thenReturn("313f0381-23b6-466f-a0b8-c72a9679ac4b_SW5kZXhEU1Jlc291cmNlcy9JbmRleERTUmVzb3VyY2VUeXBl"); + Mockito + .when(isLookupClient.getLayoutSource(Mockito.anyString())) + .thenReturn(IOUtils.toString(getClass().getResourceAsStream("fields.xml"))); + Mockito + .when(isLookupClient.getLayoutTransformer()) + .thenReturn(IOUtils.toString(getClass().getResourceAsStream("layoutToRecordTransformer.xsl"))); + } + + @Test + void testXmlIndexingJob_saveOnHDFS() throws Exception { + final String ID_XPATH = "//*[local-name()='header']/*[local-name()='objIdentifier']"; + + String inputPath = "src/test/resources/eu/dnetlib/dhp/oa/provision/xml"; + // String inputPath = "/Users/claudio/workspace/data/index"; + + Dataset records = spark + .read() + .schema(Encoders.bean(TupleWrapper.class).schema()) + .json(inputPath) + .as(Encoders.bean(TupleWrapper.class)); + + records.printSchema(); + + long nRecord = records.count(); + log.info("found {} records", nRecord); + + final Dataset ids = records + .map((MapFunction) TupleWrapper::getXml, Encoders.STRING()) + .map( + (MapFunction) s -> new SAXReader().read(new StringReader(s)).valueOf(ID_XPATH), + Encoders.STRING()); + + log.info("found {} ids", ids.count()); + + long xmlIdUnique = ids + .distinct() + .count(); + + log.info("found {} unique ids", xmlIdUnique); + + assertEquals(nRecord, xmlIdUnique, "IDs should be unique among input records"); + + final String outputPath = workingDir.resolve("outputPath").toAbsolutePath().toString(); + new SolrRecordDumpJob(spark, inputPath, FORMAT, outputPath).run(isLookupClient); + + final Dataset solrDocs = spark + .read() + .load(outputPath) + .as(Encoders.kryo(SerializableSolrInputDocument.class)); + + solrDocs.foreach(doc -> { + assertNotNull(doc.get("__result")); + assertNotNull(doc.get("__json")); + }); + + long docIdUnique = solrDocs.map((MapFunction) doc -> { + final SolrInputField id = doc.getField("__indexrecordidentifier"); + return id.getFirstValue().toString(); + }, Encoders.STRING()) + .distinct() + .count(); + assertEquals(xmlIdUnique, docIdUnique, "IDs should be unique among the output XML records"); + + long jsonUnique = solrDocs + .map( + (MapFunction) je -> (String) je.getField("__json").getValue(), + Encoders.STRING()) + .distinct() + .count(); + + assertEquals(jsonUnique, docIdUnique, "IDs should be unique among the output JSON records"); + + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrTest.java index 186cb964a0..79527b891a 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrTest.java @@ -92,7 +92,7 @@ public abstract class SolrTest { FileUtils.deleteDirectory(workingDir.toFile()); } - protected static NamedList createCollection(CloudSolrClient client, String name, int numShards, + public static NamedList createCollection(CloudSolrClient client, String name, int numShards, int replicationFactor, int maxShardsPerNode, String configName) throws Exception { ModifiableSolrParams modParams = new ModifiableSolrParams(); modParams.set(CoreAdminParams.ACTION, CollectionParams.CollectionAction.CREATE.name()); diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJobTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJobTest.java index be481cc37e..522c34ef1c 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJobTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJobTest.java @@ -1,24 +1,21 @@ package eu.dnetlib.dhp.oa.provision; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; -import java.io.StringReader; import java.net.URI; import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.response.QueryResponse; -import org.apache.solr.common.SolrInputField; import org.apache.solr.common.params.CommonParams; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; -import org.dom4j.io.SAXReader; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -106,8 +103,7 @@ public class XmlIndexingJobTest extends SolrTest { long nRecord = records.count(); - new XmlIndexingJob(spark, inputPath, FORMAT, batchSize, XmlIndexingJob.OutputFormat.SOLR, true, null) - .run(isLookupClient); + new XmlIndexingJob(spark, inputPath, FORMAT, batchSize).run(isLookupClient); assertEquals(0, miniCluster.getSolrClient().commit().getStatus()); @@ -164,71 +160,4 @@ public class XmlIndexingJobTest extends SolrTest { } - @Test - void testXmlIndexingJob_saveOnHDFS() throws Exception { - final String ID_XPATH = "//*[local-name()='header']/*[local-name()='objIdentifier']"; - - String inputPath = "src/test/resources/eu/dnetlib/dhp/oa/provision/xml"; - // String inputPath = "/Users/claudio/workspace/data/index"; - - Dataset records = spark - .read() - .schema(Encoders.bean(TupleWrapper.class).schema()) - .json(inputPath) - .as(Encoders.bean(TupleWrapper.class)); - - records.printSchema(); - - long nRecord = records.count(); - log.info("found {} records", nRecord); - - final Dataset ids = records - .map((MapFunction) TupleWrapper::getXml, Encoders.STRING()) - .map( - (MapFunction) s -> new SAXReader().read(new StringReader(s)).valueOf(ID_XPATH), - Encoders.STRING()); - - log.info("found {} ids", ids.count()); - - long xmlIdUnique = ids - .distinct() - .count(); - - log.info("found {} unique ids", xmlIdUnique); - - assertEquals(nRecord, xmlIdUnique, "IDs should be unique among input records"); - - final String outputPath = workingDir.resolve("outputPath").toAbsolutePath().toString(); - new XmlIndexingJob(spark, inputPath, FORMAT, batchSize, XmlIndexingJob.OutputFormat.HDFS, false, outputPath) - .run(isLookupClient); - - final Dataset solrDocs = spark - .read() - .load(outputPath) - .as(Encoders.kryo(SerializableSolrInputDocument.class)); - - solrDocs.foreach(doc -> { - assertNotNull(doc.get("__result")); - assertNotNull(doc.get("__json")); - }); - - long docIdUnique = solrDocs.map((MapFunction) doc -> { - final SolrInputField id = doc.getField("__indexrecordidentifier"); - return id.getFirstValue().toString(); - }, Encoders.STRING()) - .distinct() - .count(); - assertEquals(xmlIdUnique, docIdUnique, "IDs should be unique among the output XML records"); - - long jsonUnique = solrDocs - .map( - (MapFunction) je -> (String) je.getField("__json").getValue(), - Encoders.STRING()) - .distinct() - .count(); - - assertEquals(jsonUnique, docIdUnique, "IDs should be unique among the output JSON records"); - - } - } diff --git a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/solr/conf/exploreTestConfig/managed-schema b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/solr/conf/exploreTestConfig/managed-schema index 4e85ca3be1..9720d3f378 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/solr/conf/exploreTestConfig/managed-schema +++ b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/solr/conf/exploreTestConfig/managed-schema @@ -202,6 +202,7 @@ + @@ -223,17 +224,22 @@ + + + + +