fixed bug on missing relation in ANDS

This commit is contained in:
Sandro La Bruzzo 2020-11-06 17:12:31 +01:00
parent 3581244daf
commit cd27df91a1
11 changed files with 216 additions and 139 deletions

View File

@ -1,6 +1,6 @@
package eu.dnetlib.dhp.doiboost
import eu.dnetlib.dhp.schema.oaf.Publication
import eu.dnetlib.dhp.schema.oaf.{Publication, Relation}
import org.apache.spark.SparkContext
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig}
@ -21,6 +21,13 @@ class QueryTest {
}
def has_ands(r:Relation) :Boolean = {
r.getCollectedfrom!= null && r.getCollectedfrom.asScala.count(k => k.getValue.contains("Australian")) > 0
}
def hasInstanceWithUrl(p:Publication):Boolean = {

View File

@ -109,20 +109,20 @@ public class CleaningFunctions {
}
if (Objects.nonNull(r.getPid())) {
r
.setPid(
r
.getPid()
.stream()
.filter(Objects::nonNull)
.filter(sp -> StringUtils.isNotBlank(StringUtils.trim(sp.getValue())))
.filter(sp -> NONE.equalsIgnoreCase(sp.getValue()))
.filter(sp -> Objects.nonNull(sp.getQualifier()))
.filter(sp -> StringUtils.isNotBlank(sp.getQualifier().getClassid()))
.map(sp -> {
sp.setValue(StringUtils.trim(sp.getValue()));
return sp;
})
.collect(Collectors.toList()));
.setPid(
r
.getPid()
.stream()
.filter(Objects::nonNull)
.filter(sp -> StringUtils.isNotBlank(StringUtils.trim(sp.getValue())))
.filter(sp -> NONE.equalsIgnoreCase(sp.getValue()))
.filter(sp -> Objects.nonNull(sp.getQualifier()))
.filter(sp -> StringUtils.isNotBlank(sp.getQualifier().getClassid()))
.map(sp -> {
sp.setValue(StringUtils.trim(sp.getValue()));
return sp;
})
.collect(Collectors.toList()));
}
if (Objects.isNull(r.getResourcetype()) || StringUtils.isBlank(r.getResourcetype().getClassid())) {
r

View File

@ -0,0 +1,3 @@
package eu.dnetlib.dhp.sx.graph
case class IdReplace(newId:String, oldId:String) {}

View File

@ -1,12 +1,15 @@
package eu.dnetlib.dhp.sx.graph
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation}
import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation, Result}
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIUnknown}
import eu.dnetlib.dhp.sx.ebi.EBIAggregator
import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.LoggerFactory
import org.apache.spark.sql.functions.col
object SparkSplitOafTODLIEntities {
@ -83,14 +86,42 @@ object SparkSplitOafTODLIEntities {
}
def extract_ids(o:Oaf) :(String, String) = {
o match {
case p: DLIPublication =>
val prefix = StringUtils.substringBefore(p.getId, "|")
val original = StringUtils.substringAfter(p.getOriginalObjIdentifier, "::")
(p.getId, s"$prefix|$original")
case p: DLIDataset =>
val prefix = StringUtils.substringBefore(p.getId, "|")
val original = StringUtils.substringAfter(p.getOriginalObjIdentifier, "::")
(p.getId, s"$prefix|$original")
case _ =>null
}
}
def extract_relations(spark:SparkSession, workingPath:String) :Unit = {
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
implicit val relEncoder: Encoder[Relation] = Encoders.kryo[Relation]
import spark.implicits._
val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/input/OAFDataset").as[Oaf]
val ebi_relation:Dataset[Relation] = spark.read.load(s"$workingPath/ebi/baseline_relation_ebi").as[Relation].repartition(2000)
OAFDataset
.filter(o => o.isInstanceOf[Result])
.map(extract_ids)(Encoders.tuple(Encoders.STRING, Encoders.STRING))
.filter(r => r != null)
.where("_1 != _2")
.select(col("_1").alias("newId"), col("_2").alias("oldId"))
.distinct()
.map(f => IdReplace(f.getString(0), f.getString(1)))
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/id_replace")
OAFDataset
.filter(s => s != null && s.isInstanceOf[Relation])
.map(s =>s.asInstanceOf[Relation])
@ -100,7 +131,41 @@ object SparkSplitOafTODLIEntities {
.agg(EBIAggregator.getRelationAggregator().toColumn)
.map(p => p._2)
.repartition(4000)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/relation")
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/relation_unfixed")
val relations = spark.read.load(s"$workingPath/graph/relation_unfixed").as[Relation]
val ids = spark.read.load(s"$workingPath/graph/id_replace").as[IdReplace]
relations
.map(r => (r.getSource, r))(Encoders.tuple(Encoders.STRING, relEncoder))
.joinWith(ids, col("_1").equalTo(ids("oldId")), "left")
.map(i =>{
val r = i._1._2
if (i._2 != null)
{
val id = i._2.newId
r.setSource(id)
}
r
}).write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/rel_f_source")
val rel_source:Dataset[Relation] = spark.read.load(s"$workingPath/graph/rel_f_source").as[Relation]
rel_source
.map(r => (r.getTarget, r))(Encoders.tuple(Encoders.STRING, relEncoder))
.joinWith(ids, col("_1").equalTo(ids("oldId")), "left")
.map(i =>{
val r:Relation = i._1._2
if (i._2 != null)
{
val id = i._2.newId
r.setTarget(id)
}
r
}).write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/relation")
}

View File

@ -12,6 +12,7 @@ import com.fasterxml.jackson.databind.SerializationFeature;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.sx.graph.parser.DatasetScholexplorerParser;
import eu.dnetlib.dhp.sx.graph.parser.PublicationScholexplorerParser;
import eu.dnetlib.scholexplorer.relation.RelationMapper;
public class ScholexplorerParserTest {
@ -37,4 +38,26 @@ public class ScholexplorerParserTest {
}
});
}
@Test
public void testPublicationParser() throws Exception {
String xml = IOUtils.toString(this.getClass().getResourceAsStream("pmf.xml"));
PublicationScholexplorerParser p = new PublicationScholexplorerParser();
List<Oaf> oaves = p.parseObject(xml, RelationMapper.load());
ObjectMapper m = new ObjectMapper();
m.enable(SerializationFeature.INDENT_OUTPUT);
oaves
.forEach(
oaf -> {
try {
System.out.println(m.writeValueAsString(oaf));
System.out.println("----------------------------");
} catch (JsonProcessingException e) {
}
});
}
}

View File

@ -1,51 +1,38 @@
<?xml version="1.0" encoding="UTF-8"?>
<oai:record xmlns:oai="http://www.openarchives.org/OAI/2.0/"
xmlns:oaf="http://namespace.openaire.eu/oaf"
xmlns:dri="http://www.driver-repository.eu/namespace/dri"
xmlns:dc="http://purl.org/dc/elements/1.1/">
<oai:header>
<dri:repositoryId>aaadf8b3-01a8-4cc2-9964-63cfb19df3b4_UmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZXMvUmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZVR5cGU=</dri:repositoryId>
<dri:recordIdentifier>oai:pangaea.de:doi:10.1594/PANGAEA.432865</dri:recordIdentifier>
<dri:datasourceprefix>r3d100010134</dri:datasourceprefix>
<dri:objIdentifier>r3d100010134::00002f60593fd1f758fb838fafb46795</dri:objIdentifier>
<dri:dateOfCollection>2020-02-18T03:05:02.534Z</dri:dateOfCollection>
<oaf:datasourceprefix/>
<identifier>oai:pangaea.de:doi:10.1594/PANGAEA.432865</identifier>
<setSpec>citable topicOceans</setSpec>
xmlns="http://namespace.openaire.eu/">
<oai:header xmlns="">
<dri:objIdentifier xmlns:dri="http://www.driver-repository.eu/namespace/dri">r3d100010464::0002882a9d38c4f4612e7666ad768ccd</dri:objIdentifier>
<dri:recordIdentifier xmlns:dri="http://www.driver-repository.eu/namespace/dri">https://research.jcu.edu.au/researchdata/published/detail/9079e05370d830eb8d416c77c0b761ce::url</dri:recordIdentifier>
<dri:dateOfCollection xmlns:dri="http://www.driver-repository.eu/namespace/dri">2020-11-02T16:14:07.831Z</dri:dateOfCollection>
<dri:repositoryId xmlns:dri="http://www.driver-repository.eu/namespace/dri">ands_UmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZXMvUmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZVR5cGU=</dri:repositoryId>
<dri:datasourceprefix xmlns:dri="http://www.driver-repository.eu/namespace/dri">r3d100010464</dri:datasourceprefix>
</oai:header>
<oai:metadata>
<resource xmlns="http://datacite.org/schema/kernel-3">
<identifier identifierType="doi">10.1594/pangaea.432865</identifier>
<titles xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<title>Daily sea level from coastal tide gauge station Woods_Hole in 1978 (Research quality database)</title>
<metadata xmlns="">
<resource xmlns="http://datacite.org/schema/kernel-3"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://datacite.org/schema/kernel-3 http://schema.datacite.org/meta/kernel-3/metadata.xsd">
<identifier xmlns="" identifierType="url">https://research.jcu.edu.au/researchdata/published/detail/9079e05370d830eb8d416c77c0b761ce</identifier>
<titles xmlns="">
<title>Vertebrate monitoring in the Australian Wet Tropics rainforest at CU6A1 (145.30367623, -16.57767628, 600.0m above MSL) collected by Reptile Surveys</title>
</titles>
<publisher xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">PANGAEA - Data Publisher for Earth &amp; Environmental Science</publisher>
<publicationYear xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">2006</publicationYear>
<dates xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<date dateType="Collected">1978-01-01T12:00:00/1978-12-31T12:00:00</date>
<publisher xmlns="">James Cook University</publisher>
<dates xmlns="">
<date dateType="Collected">2013-05-07</date>
</dates>
<creators xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<creator>
<creatorName>WOCE Sea Level, WSL</creatorName>
</creator>
</creators>
<subjects xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<subject subjectScheme="Parameter">DATE/TIME</subject>
<subject subjectScheme="Parameter">Sea level</subject>
<subject subjectScheme="Method">Tide gauge station</subject>
<subject subjectScheme="Campaign">SeaLevel</subject>
<subject subjectScheme="Project">World Ocean Circulation Experiment (WOCE)</subject>
</subjects>
<resourceType resourceTypeGeneral="Dataset"/>
<relatedIdentifiers>
<relatedIdentifier relatedIdentifierType="URL" relationType="isDocumentedBy"
inverseRelationType="documents">http://store.pangaea.de/Projects/WOCE/SeaLevel_rqds/Woods_Hole.txt</relatedIdentifier>
<creators xmlns=""/>
<resourceType xmlns="" resourceTypeGeneral="Dataset">Dataset</resourceType>
<relatedIdentifiers xmlns="">
<relatedIdentifier entityType="publication" inverseRelationType="related"
relatedIdentifierType="dnet"
relationType="IsRelatedTo">r3d100010464::57793c5aa995172db237d9da17353f8b</relatedIdentifier>
</relatedIdentifiers>
</resource>
</oai:metadata>
<oaf:about>
</metadata>
<oaf:about xmlns:oaf="http://namespace.dnet.eu/oaf" xmlns="">
<oaf:datainfo>
<oaf:collectedFrom completionStatus="complete" id="dli_________::r3d100010134" name="Pangaea"/>
<oaf:collectedFrom completionStatus="complete" id="dli_________::r3d100010464"
name="Australian National Data Service"/>
<oaf:completionStatus>complete</oaf:completionStatus>
<oaf:provisionMode>collected</oaf:provisionMode>
</oaf:datainfo>

View File

@ -0,0 +1,25 @@
<?xml version="1.0" encoding="UTF-8"?>
<oai:record xmlns:oai="http://www.openarchives.org/OAI/2.0/"
xmlns="http://namespace.openaire.eu/">
<oai:header xmlns="">
<dri:objIdentifier xmlns:dri="http://www.driver-repository.eu/namespace/dri">r3d100010464::57793c5aa995172db237d9da17353f8b</dri:objIdentifier>
<dri:recordIdentifier xmlns:dri="http://www.driver-repository.eu/namespace/dri">10.1111/j.1365-2486.2005.00995.x::doi</dri:recordIdentifier>
<dri:dateOfCollection xmlns:dri="http://www.driver-repository.eu/namespace/dri">2020-11-02T16:14:07.831Z</dri:dateOfCollection>
<dri:repositoryId xmlns:dri="http://www.driver-repository.eu/namespace/dri">ands_UmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZXMvUmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZVR5cGU=</dri:repositoryId>
<dri:datasourceprefix xmlns:dri="http://www.driver-repository.eu/namespace/dri">r3d100010464</dri:datasourceprefix>
</oai:header>
<metadata xmlns="">
<oaf:pid xmlns:oaf="http://namespace.dnet.eu/oaf" type="doi">10.1111/j.1365-2486.2005.00995.x</oaf:pid>
<dc:identifier xmlns:dc="http://purl.org/dc/elements/1.1/">10.1111/j.1365-2486.2005.00995.x</dc:identifier>
<dc:title xmlns:dc="http://purl.org/dc/elements/1.1/">Potential decoupling of trends in distribution area and population size of species with climate change.</dc:title>
<dc:type xmlns:dc="http://purl.org/dc/elements/1.1/">publication</dc:type>
</metadata>
<oaf:about xmlns:oaf="http://namespace.dnet.eu/oaf" xmlns="">
<oaf:datainfo>
<oaf:collectedFrom completionStatus="complete" id="dli_________::r3d100010464"
name="Australian National Data Service"/>
<oaf:completionStatus>complete</oaf:completionStatus>
<oaf:provisionMode>collected</oaf:provisionMode>
</oaf:datainfo>
</oaf:about>
</oai:record>

View File

@ -97,12 +97,17 @@ public class Scholix implements Serializable {
}
private List<ScholixEntityId> mergeScholixEntityId(final List<ScholixEntityId> a, final List<ScholixEntityId> b) {
final List<ScholixEntityId> m = new ArrayList<>(a);
final List<ScholixEntityId> m = a != null ? new ArrayList<>(a) : new ArrayList<>();
if (b != null)
b.forEach(s -> {
int tt = (int) m.stream().filter(t -> t.getName().equalsIgnoreCase(s.getName())).count();
if (tt == 0) {
m.add(s);
if (s != null) {
int tt = (int) m
.stream()
.filter(t -> t != null && t.getName() != null && t.getName().equalsIgnoreCase(s.getName()))
.count();
if (tt == 0) {
m.add(s);
}
}
});
return m;
@ -110,7 +115,7 @@ public class Scholix implements Serializable {
private List<ScholixIdentifier> mergeScholixIdnetifier(final List<ScholixIdentifier> a,
final List<ScholixIdentifier> b) {
final List<ScholixIdentifier> m = new ArrayList<>(a);
final List<ScholixIdentifier> m = a != null ? new ArrayList<>(a) : new ArrayList<>();
if (b != null)
b.forEach(s -> {
int tt = (int) m.stream().filter(t -> t.getIdentifier().equalsIgnoreCase(s.getIdentifier())).count();
@ -123,7 +128,7 @@ public class Scholix implements Serializable {
private List<ScholixCollectedFrom> mergeScholixCollectedFrom(final List<ScholixCollectedFrom> a,
final List<ScholixCollectedFrom> b) {
final List<ScholixCollectedFrom> m = new ArrayList<>(a);
final List<ScholixCollectedFrom> m = a != null ? new ArrayList<>(a) : new ArrayList<>();
if (b != null)
b.forEach(s -> {
int tt = (int) m
@ -139,14 +144,15 @@ public class Scholix implements Serializable {
private ScholixRelationship mergeRelationships(final ScholixRelationship a, final ScholixRelationship b) {
ScholixRelationship result = new ScholixRelationship();
result.setName(StringUtils.isEmpty(a.getName()) ? b.getName() : a.getName());
result.setInverse(StringUtils.isEmpty(a.getInverse()) ? b.getInverse() : a.getInverse());
result.setSchema(StringUtils.isEmpty(a.getSchema()) ? b.getSchema() : a.getSchema());
result.setName(a == null || StringUtils.isEmpty(a.getName()) ? b.getName() : a.getName());
result.setInverse(a == null || StringUtils.isEmpty(a.getInverse()) ? b.getInverse() : a.getInverse());
result.setSchema(a == null || StringUtils.isEmpty(a.getSchema()) ? b.getSchema() : a.getSchema());
return result;
}
private ScholixResource mergeResource(final ScholixResource a, final ScholixResource b) {
if (a == null)
return b;
final ScholixResource result = new ScholixResource();
result.setCollectedFrom(mergeScholixCollectedFrom(a.getCollectedFrom(), b.getCollectedFrom()));
result.setCreator(mergeScholixEntityId(a.getCreator(), b.getCreator()));

View File

@ -7,4 +7,8 @@
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>

View File

@ -1,9 +1,17 @@
<workflow-app name="Index graph to ElasticSearch" xmlns="uri:oozie:workflow:0.5">
<workflow-app name="Materialize and Index graph to ElasticSearch" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>workingDirPath</name>
<description>the source path</description>
</property>
<property>
<name>index</name>
<description>the index name</description>
</property>
<property>
<name>esCluster</name>
<description>the Index cluster</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
@ -12,39 +20,43 @@
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>index</name>
<description>index name</description>
</property>
<property>
<name>indexHost</name>
<description>index host name</description>
</property>
</parameters>
<start to="indexSummary"/>
<start to="DropAndCreateIndex"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="DropAndCreateIndex">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.provision.DropAndCreateESIndex</main-class>
<arg>-i</arg><arg>${index}</arg>
<arg>-c</arg><arg>${esCluster}</arg>
</java>
<ok to="indexSummary"/>
<error to="Kill"/>
</action>
<action name="indexSummary">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>index Summary</name>
<name>index summary</name>
<class>eu.dnetlib.dhp.provision.SparkIndexCollectionOnES</class>
<jar>dhp-graph-provision-scholexplorer-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="32" </spark-opts>
<spark-opts>--executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="8" </spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${workingDirPath}/summary</arg>
<arg>--sourcePath</arg><arg>${workingDirPath}/summary_json</arg>
<arg>--index</arg><arg>${index}_object</arg>
<arg>--esHost</arg><arg>${indexHost}</arg>
<arg>--idPath</arg><arg>id</arg>
<arg>--type</arg><arg>summary</arg>
<arg>--cluster</arg><arg>${esCluster}</arg>
</spark>
<ok to="indexScholix"/>
<error to="Kill"/>
@ -63,9 +75,8 @@
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${workingDirPath}/scholix_json</arg>
<arg>--index</arg><arg>${index}_scholix</arg>
<arg>--esHost</arg><arg>${indexHost}</arg>
<arg>--idPath</arg><arg>identifier</arg>
<arg>--type</arg><arg>scholix</arg>
<arg>--cluster</arg><arg>${esCluster}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>

View File

@ -112,59 +112,5 @@
<error to="Kill"/>
</action>
<action name="DropAndCreateIndex">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.provision.DropAndCreateESIndex</main-class>
<arg>-i</arg><arg>${index}</arg>
<arg>-c</arg><arg>${esCluster}</arg>
</java>
<ok to="indexSummary"/>
<error to="Kill"/>
</action>
<action name="indexSummary">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>index summary</name>
<class>eu.dnetlib.dhp.provision.SparkIndexCollectionOnES</class>
<jar>dhp-graph-provision-scholexplorer-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="8" </spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${workingDirPath}/summary_json</arg>
<arg>--index</arg><arg>${index}_object</arg>
<arg>--idPath</arg><arg>id</arg>
<arg>--cluster</arg><arg>${esCluster}</arg>
</spark>
<ok to="indexScholix"/>
<error to="Kill"/>
</action>
<action name="indexScholix">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>index scholix</name>
<class>eu.dnetlib.dhp.provision.SparkIndexCollectionOnES</class>
<jar>dhp-graph-provision-scholexplorer-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="8" </spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${workingDirPath}/scholix_json</arg>
<arg>--index</arg><arg>${index}_scholix</arg>
<arg>--idPath</arg><arg>identifier</arg>
<arg>--cluster</arg><arg>${esCluster}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>