working procedure for records indexing using Spark, via lib com.lucidworks.spark:spark-solr

This commit is contained in:
Claudio Atzori 2020-02-14 12:28:52 +01:00
parent 1ee1baa8c0
commit 56d1810a66
8 changed files with 42 additions and 26 deletions

View File

@ -1,6 +1,7 @@
package eu.dnetlib.dhp.utils.saxon; package eu.dnetlib.dhp.utils.saxon;
import net.sf.saxon.expr.XPathContext; import net.sf.saxon.expr.XPathContext;
import net.sf.saxon.om.Item;
import net.sf.saxon.om.Sequence; import net.sf.saxon.om.Sequence;
import net.sf.saxon.trans.XPathException; import net.sf.saxon.trans.XPathException;
import net.sf.saxon.value.SequenceType; import net.sf.saxon.value.SequenceType;
@ -19,15 +20,21 @@ public class PickFirst extends AbstractExtensionFunction {
if (arguments == null | arguments.length == 0) { if (arguments == null | arguments.length == 0) {
return new StringValue(""); return new StringValue("");
} }
String s1 = arguments[0].head().getStringValue();
if (arguments.length > 1) { final String s1 = getValue(arguments[0]);
String s2 = arguments[1].head().getStringValue(); final String s2 = getValue(arguments[1]);
return new StringValue(StringUtils.isNotBlank(s1) ? s1 : StringUtils.isNotBlank(s2) ? s2 : ""); return new StringValue(StringUtils.isNotBlank(s1) ? s1 : StringUtils.isNotBlank(s2) ? s2 : "");
} else {
return new StringValue(StringUtils.isNotBlank(s1) ? s1 : "");
} }
private String getValue(final Sequence arg) throws XPathException {
if (arg != null) {
final Item item = arg.head();
if (item != null) {
return item.getStringValue();
}
}
return "";
} }
@Override @Override

View File

@ -5,7 +5,6 @@ isLookupUrl=http://beta.services.openaire.eu:8280/is/services/isLookUp?wsdl
sourcePath=/tmp/db_openaireplus_services.export_dhp.2020.02.03 sourcePath=/tmp/db_openaireplus_services.export_dhp.2020.02.03
outputPath=/tmp/openaire_provision outputPath=/tmp/openaire_provision
format=TMF format=TMF
batchSize=1000 batchSize=2000
sparkExecutorCoresForIndexing=1 sparkExecutorCoresForIndexing=64
sparkExecutorInstances=10 reuseRecords=true
reuseRecords=false

View File

@ -3,7 +3,7 @@
<parent> <parent>
<artifactId>dhp-workflows</artifactId> <artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<version>1.0.5-SNAPSHOT</version> <version>1.1.6-SNAPSHOT</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
@ -52,6 +52,11 @@
<groupId>org.apache.httpcomponents</groupId> <groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId> <artifactId>httpclient</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpmime</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.noggit</groupId> <groupId>org.noggit</groupId>
<artifactId>noggit</artifactId> <artifactId>noggit</artifactId>

View File

@ -45,7 +45,7 @@ public class SparkXmlIndexingJob {
final String inputPath = parser.get("sourcePath"); final String inputPath = parser.get("sourcePath");
final String isLookupUrl = parser.get("isLookupUrl"); final String isLookupUrl = parser.get("isLookupUrl");
final String format = parser.get("format"); final String format = parser.get("format");
final Integer batchSize = parser.getObjectMap().containsKey("batckSize") ? Integer.valueOf(parser.get("batchSize")) : DEFAULT_BATCH_SIZE; final Integer batchSize = parser.getObjectMap().containsKey("batchSize") ? Integer.valueOf(parser.get("batchSize")) : DEFAULT_BATCH_SIZE;
final ISLookUpService isLookup = ISLookupClientFactory.getLookUpService(isLookupUrl); final ISLookUpService isLookup = ISLookupClientFactory.getLookUpService(isLookupUrl);
final String fields = getLayoutSource(isLookup, format); final String fields = getLayoutSource(isLookup, format);

View File

@ -129,6 +129,9 @@ public class XmlRecordFactory implements Serializable {
.map(t -> mapStructuredProperty("title", t)) .map(t -> mapStructuredProperty("title", t))
.collect(Collectors.toList())); .collect(Collectors.toList()));
} }
if (r.getBestaccessright() != null) {
metadata.add(mapQualifier("bestaccessright", r.getBestaccessright()));
}
if (r.getAuthor() != null) { if (r.getAuthor() != null) {
metadata.addAll(r.getAuthor() metadata.addAll(r.getAuthor()
.stream() .stream()
@ -230,15 +233,6 @@ public class XmlRecordFactory implements Serializable {
if (r.getResourcetype() != null) { if (r.getResourcetype() != null) {
metadata.add(mapQualifier("resourcetype", r.getResourcetype())); metadata.add(mapQualifier("resourcetype", r.getResourcetype()));
} }
if (r.getRefereed() != null) {
metadata.add(asXmlElement("refereed", r.getRefereed().getValue()));
}
if (r.getProcessingchargeamount() != null) {
metadata.add(asXmlElement("processingchargeamount", r.getProcessingchargeamount().getValue()));
}
if (r.getProcessingchargecurrency() != null) {
metadata.add(asXmlElement("processingchargecurrency", r.getProcessingchargecurrency().getValue()));
}
metadata.add(mapQualifier("bestaccessright", getBestAccessright(r))); metadata.add(mapQualifier("bestaccessright", getBestAccessright(r)));
@ -544,9 +538,6 @@ public class XmlRecordFactory implements Serializable {
if (p.getDuration() != null) { if (p.getDuration() != null) {
metadata.add(asXmlElement("duration", p.getDuration().getValue())); metadata.add(asXmlElement("duration", p.getDuration().getValue()));
} }
if (p.getEcsc39() != null) {
metadata.add(asXmlElement("ecsc39", p.getEcsc39().getValue()));
}
if (p.getEcarticle29_3() != null) { if (p.getEcarticle29_3() != null) {
metadata.add(asXmlElement("ecarticle29_3", p.getEcarticle29_3().getValue())); metadata.add(asXmlElement("ecarticle29_3", p.getEcarticle29_3().getValue()));
} }
@ -759,6 +750,15 @@ public class XmlRecordFactory implements Serializable {
if (isNotBlank(instance.getDistributionlocation())) { if (isNotBlank(instance.getDistributionlocation())) {
fields.add(asXmlElement("distributionlocation", instance.getDistributionlocation())); fields.add(asXmlElement("distributionlocation", instance.getDistributionlocation()));
} }
if (instance.getRefereed() != null && isNotBlank(instance.getRefereed().getValue())) {
fields.add(asXmlElement("refereed", instance.getRefereed().getValue()));
}
if (instance.getProcessingchargeamount() != null && isNotBlank(instance.getProcessingchargeamount().getValue())) {
fields.add(asXmlElement("processingchargeamount", instance.getProcessingchargeamount().getValue()));
}
if (instance.getProcessingchargecurrency() != null && isNotBlank(instance.getProcessingchargecurrency().getValue())) {
fields.add(asXmlElement("processingchargecurrency", instance.getProcessingchargecurrency().getValue()));
}
children.add(templateFactory.getInstance(instance.getHostedby().getKey(), fields, instance.getUrl())); children.add(templateFactory.getInstance(instance.getHostedby().getKey(), fields, instance.getUrl()));
} }

View File

@ -78,9 +78,8 @@
<jar>dhp-graph-provision-${projectVersion}.jar</jar> <jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-memory ${sparkExecutorMemory} --executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCoresForIndexing}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.executor.instances=${sparkExecutorInstances} --conf spark.dynamicAllocation.maxExecutors=${sparkExecutorCoresForIndexing}
--conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener"
--conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}

View File

@ -18,6 +18,7 @@
<module>dhp-distcp</module> <module>dhp-distcp</module>
<module>dhp-graph-mapper</module> <module>dhp-graph-mapper</module>
<module>dhp-dedup</module> <module>dhp-dedup</module>
<module>dhp-graph-provision</module>
</modules> </modules>
<pluginRepositories> <pluginRepositories>

View File

@ -210,6 +210,11 @@
<artifactId>httpclient</artifactId> <artifactId>httpclient</artifactId>
<version>4.5.3</version> <version>4.5.3</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpmime</artifactId>
<version>4.5.3</version>
</dependency>
<dependency> <dependency>
<groupId>org.noggit</groupId> <groupId>org.noggit</groupId>
<artifactId>noggit</artifactId> <artifactId>noggit</artifactId>