diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/utils/saxon/PickFirst.java b/dhp-common/src/main/java/eu/dnetlib/dhp/utils/saxon/PickFirst.java
index 1f209bed09..a221e37c67 100644
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/utils/saxon/PickFirst.java
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/utils/saxon/PickFirst.java
@@ -1,6 +1,7 @@
package eu.dnetlib.dhp.utils.saxon;
import net.sf.saxon.expr.XPathContext;
+import net.sf.saxon.om.Item;
import net.sf.saxon.om.Sequence;
import net.sf.saxon.trans.XPathException;
import net.sf.saxon.value.SequenceType;
@@ -19,15 +20,21 @@ public class PickFirst extends AbstractExtensionFunction {
if (arguments == null | arguments.length == 0) {
return new StringValue("");
}
- String s1 = arguments[0].head().getStringValue();
- if (arguments.length > 1) {
- String s2 = arguments[1].head().getStringValue();
+ final String s1 = getValue(arguments[0]);
+ final String s2 = getValue(arguments[1]);
- return new StringValue(StringUtils.isNotBlank(s1) ? s1 : StringUtils.isNotBlank(s2) ? s2 : "");
- } else {
- return new StringValue(StringUtils.isNotBlank(s1) ? s1 : "");
+ return new StringValue(StringUtils.isNotBlank(s1) ? s1 : StringUtils.isNotBlank(s2) ? s2 : "");
+ }
+
+ private String getValue(final Sequence arg) throws XPathException {
+ if (arg != null) {
+ final Item item = arg.head();
+ if (item != null) {
+ return item.getStringValue();
+ }
}
+ return "";
}
@Override
diff --git a/dhp-workflows/dhp-graph-provision/job-override.properties b/dhp-workflows/dhp-graph-provision/job-override.properties
index c7b173a14f..b5ab079825 100644
--- a/dhp-workflows/dhp-graph-provision/job-override.properties
+++ b/dhp-workflows/dhp-graph-provision/job-override.properties
@@ -5,7 +5,6 @@ isLookupUrl=http://beta.services.openaire.eu:8280/is/services/isLookUp?wsdl
sourcePath=/tmp/db_openaireplus_services.export_dhp.2020.02.03
outputPath=/tmp/openaire_provision
format=TMF
-batchSize=1000
-sparkExecutorCoresForIndexing=1
-sparkExecutorInstances=10
-reuseRecords=false
\ No newline at end of file
+batchSize=2000
+sparkExecutorCoresForIndexing=64
+reuseRecords=true
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-provision/pom.xml b/dhp-workflows/dhp-graph-provision/pom.xml
index 5e6beb2493..f74c9b6668 100644
--- a/dhp-workflows/dhp-graph-provision/pom.xml
+++ b/dhp-workflows/dhp-graph-provision/pom.xml
@@ -3,7 +3,7 @@
dhp-workflows
eu.dnetlib.dhp
- 1.0.5-SNAPSHOT
+ 1.1.6-SNAPSHOT
4.0.0
@@ -52,6 +52,11 @@
org.apache.httpcomponents
httpclient
+
+ org.apache.httpcomponents
+ httpmime
+
+
org.noggit
noggit
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkXmlIndexingJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkXmlIndexingJob.java
index e13f8bbe20..2775d93b43 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkXmlIndexingJob.java
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkXmlIndexingJob.java
@@ -45,7 +45,7 @@ public class SparkXmlIndexingJob {
final String inputPath = parser.get("sourcePath");
final String isLookupUrl = parser.get("isLookupUrl");
final String format = parser.get("format");
- final Integer batchSize = parser.getObjectMap().containsKey("batckSize") ? Integer.valueOf(parser.get("batchSize")) : DEFAULT_BATCH_SIZE;
+ final Integer batchSize = parser.getObjectMap().containsKey("batchSize") ? Integer.valueOf(parser.get("batchSize")) : DEFAULT_BATCH_SIZE;
final ISLookUpService isLookup = ISLookupClientFactory.getLookUpService(isLookupUrl);
final String fields = getLayoutSource(isLookup, format);
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/XmlRecordFactory.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/XmlRecordFactory.java
index bd4f8ec6c3..abcf2a7ecb 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/XmlRecordFactory.java
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/utils/XmlRecordFactory.java
@@ -129,6 +129,9 @@ public class XmlRecordFactory implements Serializable {
.map(t -> mapStructuredProperty("title", t))
.collect(Collectors.toList()));
}
+ if (r.getBestaccessright() != null) {
+ metadata.add(mapQualifier("bestaccessright", r.getBestaccessright()));
+ }
if (r.getAuthor() != null) {
metadata.addAll(r.getAuthor()
.stream()
@@ -230,15 +233,6 @@ public class XmlRecordFactory implements Serializable {
if (r.getResourcetype() != null) {
metadata.add(mapQualifier("resourcetype", r.getResourcetype()));
}
- if (r.getRefereed() != null) {
- metadata.add(asXmlElement("refereed", r.getRefereed().getValue()));
- }
- if (r.getProcessingchargeamount() != null) {
- metadata.add(asXmlElement("processingchargeamount", r.getProcessingchargeamount().getValue()));
- }
- if (r.getProcessingchargecurrency() != null) {
- metadata.add(asXmlElement("processingchargecurrency", r.getProcessingchargecurrency().getValue()));
- }
metadata.add(mapQualifier("bestaccessright", getBestAccessright(r)));
@@ -544,9 +538,6 @@ public class XmlRecordFactory implements Serializable {
if (p.getDuration() != null) {
metadata.add(asXmlElement("duration", p.getDuration().getValue()));
}
- if (p.getEcsc39() != null) {
- metadata.add(asXmlElement("ecsc39", p.getEcsc39().getValue()));
- }
if (p.getEcarticle29_3() != null) {
metadata.add(asXmlElement("ecarticle29_3", p.getEcarticle29_3().getValue()));
}
@@ -759,6 +750,15 @@ public class XmlRecordFactory implements Serializable {
if (isNotBlank(instance.getDistributionlocation())) {
fields.add(asXmlElement("distributionlocation", instance.getDistributionlocation()));
}
+ if (instance.getRefereed() != null && isNotBlank(instance.getRefereed().getValue())) {
+ fields.add(asXmlElement("refereed", instance.getRefereed().getValue()));
+ }
+ if (instance.getProcessingchargeamount() != null && isNotBlank(instance.getProcessingchargeamount().getValue())) {
+ fields.add(asXmlElement("processingchargeamount", instance.getProcessingchargeamount().getValue()));
+ }
+ if (instance.getProcessingchargecurrency() != null && isNotBlank(instance.getProcessingchargecurrency().getValue())) {
+ fields.add(asXmlElement("processingchargecurrency", instance.getProcessingchargecurrency().getValue()));
+ }
children.add(templateFactory.getInstance(instance.getHostedby().getKey(), fields, instance.getUrl()));
}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml
index fee463868a..3503589441 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml
@@ -78,9 +78,8 @@
dhp-graph-provision-${projectVersion}.jar
--executor-memory ${sparkExecutorMemory}
- --executor-cores ${sparkExecutorCoresForIndexing}
--driver-memory=${sparkDriverMemory}
- --conf spark.executor.instances=${sparkExecutorInstances}
+ --conf spark.dynamicAllocation.maxExecutors=${sparkExecutorCoresForIndexing}
--conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener"
--conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
diff --git a/dhp-workflows/pom.xml b/dhp-workflows/pom.xml
index 59f06bdc3c..05bfe677d8 100644
--- a/dhp-workflows/pom.xml
+++ b/dhp-workflows/pom.xml
@@ -18,6 +18,7 @@
dhp-distcp
dhp-graph-mapper
dhp-dedup
+ dhp-graph-provision
diff --git a/pom.xml b/pom.xml
index 4e12ba1a92..8b01741d6a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -210,6 +210,11 @@
httpclient
4.5.3
+
+ org.apache.httpcomponents
+ httpmime
+ 4.5.3
+
org.noggit
noggit