From 56d1810a66063b886a59e501790e064e6ac26750 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 14 Feb 2020 12:28:52 +0100 Subject: [PATCH] working procedure for records indexing using Spark, via lib com.lucidworks.spark:spark-solr --- .../eu/dnetlib/dhp/utils/saxon/PickFirst.java | 19 ++++++++++----- .../job-override.properties | 7 +++--- dhp-workflows/dhp-graph-provision/pom.xml | 7 +++++- .../dhp/graph/SparkXmlIndexingJob.java | 2 +- .../dhp/graph/utils/XmlRecordFactory.java | 24 +++++++++---------- .../dnetlib/dhp/graph/oozie_app/workflow.xml | 3 +-- dhp-workflows/pom.xml | 1 + pom.xml | 5 ++++ 8 files changed, 42 insertions(+), 26 deletions(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/utils/saxon/PickFirst.java b/dhp-common/src/main/java/eu/dnetlib/dhp/utils/saxon/PickFirst.java index 1f209bed09..a221e37c67 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/utils/saxon/PickFirst.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/utils/saxon/PickFirst.java @@ -1,6 +1,7 @@ package eu.dnetlib.dhp.utils.saxon; import net.sf.saxon.expr.XPathContext; +import net.sf.saxon.om.Item; import net.sf.saxon.om.Sequence; import net.sf.saxon.trans.XPathException; import net.sf.saxon.value.SequenceType; @@ -19,15 +20,21 @@ public class PickFirst extends AbstractExtensionFunction { if (arguments == null | arguments.length == 0) { return new StringValue(""); } - String s1 = arguments[0].head().getStringValue(); - if (arguments.length > 1) { - String s2 = arguments[1].head().getStringValue(); + final String s1 = getValue(arguments[0]); + final String s2 = getValue(arguments[1]); - return new StringValue(StringUtils.isNotBlank(s1) ? s1 : StringUtils.isNotBlank(s2) ? s2 : ""); - } else { - return new StringValue(StringUtils.isNotBlank(s1) ? s1 : ""); + return new StringValue(StringUtils.isNotBlank(s1) ? s1 : StringUtils.isNotBlank(s2) ? s2 : ""); + } + + private String getValue(final Sequence arg) throws XPathException { + if (arg != null) { + final Item item = arg.head(); + if (item != null) { + return item.getStringValue(); + } } + return ""; } @Override diff --git a/dhp-workflows/dhp-graph-provision/job-override.properties b/dhp-workflows/dhp-graph-provision/job-override.properties index c7b173a14f..b5ab079825 100644 --- a/dhp-workflows/dhp-graph-provision/job-override.properties +++ b/dhp-workflows/dhp-graph-provision/job-override.properties @@ -5,7 +5,6 @@ isLookupUrl=http://beta.services.openaire.eu:8280/is/services/isLookUp?wsdl sourcePath=/tmp/db_openaireplus_services.export_dhp.2020.02.03 outputPath=/tmp/openaire_provision format=TMF -batchSize=1000 -sparkExecutorCoresForIndexing=1 -sparkExecutorInstances=10 -reuseRecords=false \ No newline at end of file +batchSize=2000 +sparkExecutorCoresForIndexing=64 +reuseRecords=true \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/pom.xml b/dhp-workflows/dhp-graph-provision/pom.xml index 5e6beb2493..f74c9b6668 100644 --- a/dhp-workflows/dhp-graph-provision/pom.xml +++ b/dhp-workflows/dhp-graph-provision/pom.xml @@ -3,7 +3,7 @@ dhp-workflows eu.dnetlib.dhp - 1.0.5-SNAPSHOT + 1.1.6-SNAPSHOT 4.0.0 @@ -52,6 +52,11 @@ org.apache.httpcomponents httpclient + + org.apache.httpcomponents + httpmime + + org.noggit noggit diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkXmlIndexingJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkXmlIndexingJob.java index e13f8bbe20..2775d93b43 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkXmlIndexingJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkXmlIndexingJob.java @@ -45,7 +45,7 @@ public class SparkXmlIndexingJob { final String inputPath = parser.get("sourcePath"); final String isLookupUrl = parser.get("isLookupUrl"); final String format = parser.get("format"); - final Integer batchSize = parser.getObjectMap().containsKey("batckSize") ? Integer.valueOf(parser.get("batchSize")) : DEFAULT_BATCH_SIZE; + final Integer batchSize = parser.getObjectMap().containsKey("batchSize") ? Integer.valueOf(parser.get("batchSize")) : DEFAULT_BATCH_SIZE; final ISLookUpService isLookup = ISLookupClientFactory.getLookUpService(isLookupUrl); final String fields = getLayoutSource(isLookup, format); diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/XmlRecordFactory.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/XmlRecordFactory.java index bd4f8ec6c3..abcf2a7ecb 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/XmlRecordFactory.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/XmlRecordFactory.java @@ -129,6 +129,9 @@ public class XmlRecordFactory implements Serializable { .map(t -> mapStructuredProperty("title", t)) .collect(Collectors.toList())); } + if (r.getBestaccessright() != null) { + metadata.add(mapQualifier("bestaccessright", r.getBestaccessright())); + } if (r.getAuthor() != null) { metadata.addAll(r.getAuthor() .stream() @@ -230,15 +233,6 @@ public class XmlRecordFactory implements Serializable { if (r.getResourcetype() != null) { metadata.add(mapQualifier("resourcetype", r.getResourcetype())); } - if (r.getRefereed() != null) { - metadata.add(asXmlElement("refereed", r.getRefereed().getValue())); - } - if (r.getProcessingchargeamount() != null) { - metadata.add(asXmlElement("processingchargeamount", r.getProcessingchargeamount().getValue())); - } - if (r.getProcessingchargecurrency() != null) { - metadata.add(asXmlElement("processingchargecurrency", r.getProcessingchargecurrency().getValue())); - } metadata.add(mapQualifier("bestaccessright", getBestAccessright(r))); @@ -544,9 +538,6 @@ public class XmlRecordFactory implements Serializable { if (p.getDuration() != null) { metadata.add(asXmlElement("duration", p.getDuration().getValue())); } - if (p.getEcsc39() != null) { - metadata.add(asXmlElement("ecsc39", p.getEcsc39().getValue())); - } if (p.getEcarticle29_3() != null) { metadata.add(asXmlElement("ecarticle29_3", p.getEcarticle29_3().getValue())); } @@ -759,6 +750,15 @@ public class XmlRecordFactory implements Serializable { if (isNotBlank(instance.getDistributionlocation())) { fields.add(asXmlElement("distributionlocation", instance.getDistributionlocation())); } + if (instance.getRefereed() != null && isNotBlank(instance.getRefereed().getValue())) { + fields.add(asXmlElement("refereed", instance.getRefereed().getValue())); + } + if (instance.getProcessingchargeamount() != null && isNotBlank(instance.getProcessingchargeamount().getValue())) { + fields.add(asXmlElement("processingchargeamount", instance.getProcessingchargeamount().getValue())); + } + if (instance.getProcessingchargecurrency() != null && isNotBlank(instance.getProcessingchargecurrency().getValue())) { + fields.add(asXmlElement("processingchargecurrency", instance.getProcessingchargecurrency().getValue())); + } children.add(templateFactory.getInstance(instance.getHostedby().getKey(), fields, instance.getUrl())); } diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml index fee463868a..3503589441 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml @@ -78,9 +78,8 @@ dhp-graph-provision-${projectVersion}.jar --executor-memory ${sparkExecutorMemory} - --executor-cores ${sparkExecutorCoresForIndexing} --driver-memory=${sparkDriverMemory} - --conf spark.executor.instances=${sparkExecutorInstances} + --conf spark.dynamicAllocation.maxExecutors=${sparkExecutorCoresForIndexing} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} diff --git a/dhp-workflows/pom.xml b/dhp-workflows/pom.xml index 59f06bdc3c..05bfe677d8 100644 --- a/dhp-workflows/pom.xml +++ b/dhp-workflows/pom.xml @@ -18,6 +18,7 @@ dhp-distcp dhp-graph-mapper dhp-dedup + dhp-graph-provision diff --git a/pom.xml b/pom.xml index 4e12ba1a92..8b01741d6a 100644 --- a/pom.xml +++ b/pom.xml @@ -210,6 +210,11 @@ httpclient 4.5.3 + + org.apache.httpcomponents + httpmime + 4.5.3 + org.noggit noggit