re implementation

This commit is contained in:
Miriam Baglioni 2023-09-27 10:02:52 +02:00
parent 483214e78f
commit 669e5b645c
8 changed files with 777 additions and 28 deletions

View File

@ -0,0 +1,160 @@
package eu.dnetlib.dhp.oa.graph.dump.eosc;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.*;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.eosc.model.OrganizationPid;
import eu.dnetlib.dhp.eosc.model.Result;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.Relation;
import scala.Tuple2;
/**
* @author miriam.baglioni
* @Date 27/07/22
*/
public class ExtendEoscResultWithOrganization implements Serializable {
private static final Logger log = LoggerFactory.getLogger(ExtendEoscResultWithOrganization.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
ExtendEoscResultWithOrganization.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/eosc_extend_result_with_organization_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, workingPath + "/affiliation");
addOrganizations(spark, inputPath, workingPath);
});
}
private static void addOrganizations(SparkSession spark, String inputPath, String workingPath) {
List<String> entities = Arrays.asList("publication", "dataset", "software", "otherresearchproduct");
entities
.parallelStream()
.forEach(
entity -> {
Dataset<Result> results = Utils
.readPath(spark, workingPath + "/" + entity, Result.class);
Dataset<Relation> relations = Utils
.readPath(spark, inputPath + "/relation", Relation.class)
.filter(
(FilterFunction<Relation>) r -> !r.getDataInfo().getDeletedbyinference() &&
!r.getDataInfo().getInvisible()
&& r.getSubRelType().equalsIgnoreCase(ModelConstants.AFFILIATION));
Dataset<Organization> organizations = Utils
.readPath(spark, inputPath + "/organization", Organization.class);
Dataset<ResultOrganizations> resultOrganization = relations
.joinWith(organizations, relations.col("source").equalTo(organizations.col("id")), "left")
.map((MapFunction<Tuple2<Relation, Organization>, ResultOrganizations>) t2 -> {
if (t2._2() != null) {
ResultOrganizations rOrg = new ResultOrganizations();
rOrg.setResultId(t2._1().getTarget());
eu.dnetlib.dhp.eosc.model.Organization org = new eu.dnetlib.dhp.eosc.model.Organization();
org.setId(t2._2().getId());
if (Optional.ofNullable(t2._2().getLegalname()).isPresent()) {
org.setName(t2._2().getLegalname().getValue());
} else {
org.setName("");
}
HashMap<String, Set<String>> organizationPids = new HashMap<>();
if (Optional.ofNullable(t2._2().getPid()).isPresent())
t2._2().getPid().forEach(p -> {
if (!organizationPids.containsKey(p.getQualifier().getClassid()))
organizationPids.put(p.getQualifier().getClassid(), new HashSet<>());
organizationPids.get(p.getQualifier().getClassid()).add(p.getValue());
});
List<OrganizationPid> pids = new ArrayList<>();
for (String key : organizationPids.keySet()) {
for (String value : organizationPids.get(key)) {
OrganizationPid pid = new OrganizationPid();
pid.setValue(value);
pid.setType(key);
pids.add(pid);
}
}
org.setPid(pids);
rOrg.setAffiliation(org);
return rOrg;
}
return null;
}, Encoders.bean(ResultOrganizations.class))
.filter(Objects::nonNull);
results
.joinWith(
resultOrganization, results.col("id").equalTo(resultOrganization.col("resultId")), "left")
.groupByKey(
(MapFunction<Tuple2<Result, ResultOrganizations>, String>) t2 -> t2._1().getId(),
Encoders.STRING())
.mapGroups(
(MapGroupsFunction<String, Tuple2<Result, ResultOrganizations>, Result>) (s, it) -> {
Tuple2<Result, ResultOrganizations> first = it.next();
if (first._2() == null) {
return first._1();
}
Result ret = first._1();
List<eu.dnetlib.dhp.eosc.model.Organization> affiliation = new ArrayList<>();
Set<String> alreadyInsertedAffiliations = new HashSet<>();
affiliation.add(first._2().getAffiliation());
alreadyInsertedAffiliations.add(first._2().getAffiliation().getId());
it.forEachRemaining(res -> {
if (!alreadyInsertedAffiliations.contains(res._2().getAffiliation().getId())) {
affiliation.add(res._2().getAffiliation());
alreadyInsertedAffiliations.add(res._2().getAffiliation().getId());
}
});
ret.setAffiliation(affiliation);
return ret;
}, Encoders.bean(Result.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingPath + "/affiliation/" + entity);
});
}
}

View File

@ -0,0 +1,104 @@
package eu.dnetlib.dhp.oa.graph.dump.eosc;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.eosc.model.Result;
import eu.dnetlib.dhp.oa.graph.dump.Constants;
import scala.Tuple2;
public class ExtendEoscResultWithProject implements Serializable {
private static final Logger log = LoggerFactory.getLogger(ExtendEoscResultWithProject.class);
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
ExtendEoscResultWithProject.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/eosc_project_input_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String preparedInfoPath = parser.get("preparedInfoPath");
log.info("preparedInfoPath: {}", preparedInfoPath);
final String dumpType = Optional
.ofNullable(parser.get("dumpType"))
.orElse(Constants.DUMPTYPE.COMMUNITY.getType());
log.info("dumpType: {}", dumpType);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
extend(spark, inputPath, outputPath, preparedInfoPath);
});
}
private static void extend(
SparkSession spark,
String inputPath,
String outputPath,
String preparedInfoPath) {
List<String> entities = Arrays.asList("publication", "dataset", "software", "otherresearchproduct");
entities.parallelStream().forEach(entity -> {
Dataset<Result> result = Utils.readPath(spark, inputPath + "/" + entity, Result.class);
Dataset<ResultProject> resultProject = Utils.readPath(spark, preparedInfoPath, ResultProject.class);
result
.joinWith(
resultProject, result.col("id").equalTo(resultProject.col("resultId")),
"left")
.map((MapFunction<Tuple2<Result, ResultProject>, Result>) value -> {
Result r = value._1();
Optional.ofNullable(value._2()).ifPresent(rp -> r.setProjects(rp.getProjectsList()));
return r;
}, Encoders.bean(Result.class))
.write()
.option("compression", "gzip")
.mode(SaveMode.Append)
.json(outputPath + "/" + entity);
});
}
}

View File

@ -0,0 +1,104 @@
package eu.dnetlib.dhp.oa.graph.dump.eosc;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.eosc.model.Relation;
import eu.dnetlib.dhp.eosc.model.Result;
import scala.Tuple2;
/**
* @author miriam.baglioni
* @Date 02/02/23
*/
public class ExtendEoscResultWithRelation implements Serializable {
private static final Logger log = LoggerFactory.getLogger(ExtendEoscResultWithRelation.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
ExtendEoscResultWithRelation.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/eosc_input_extendwithrelation_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String resultPath = parser.get("resultPath");
log.info("resultPath: {}", resultPath);
final String relationPath = parser.get("relationPath");
log.info("relationPath: {}", relationPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
extendResultWithRelation(spark, resultPath, relationPath, outputPath);
});
}
private static void extendResultWithRelation(SparkSession spark, String resultPath, String relationPath,
String outputPath) {
List<String> entities = Arrays.asList("publication", "software", "dataset", "otherresearchproduct");
entities.parallelStream().forEach(entity -> {
Dataset<Relation> relationDataset = Utils.readPath(spark, relationPath, Relation.class);
Dataset<Result> resultDataset = Utils.readPath(spark, resultPath + "/" + entity, Result.class);
resultDataset
.joinWith(relationDataset, resultDataset.col("id").equalTo(relationDataset.col("source")), "left")
.groupByKey((MapFunction<Tuple2<Result, Relation>, String>) t2 -> t2._1().getId(), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, Tuple2<Result, Relation>, Result>) (k, it) -> {
Tuple2<Result, Relation> first = it.next();
Result r = first._1();
if (Optional.ofNullable(first._2()).isPresent()) {
if (r.getRelations() == null) {
r.setRelations(new ArrayList<>());
}
r.getRelations().add(first._2());
it.forEachRemaining(t2 -> r.getRelations().add(t2._2()));
}
return r;
}, Encoders.bean(Result.class))
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(outputPath + "/" + entity);
});
}
}

View File

@ -0,0 +1,107 @@
package eu.dnetlib.dhp.oa.graph.dump.eosc;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.eosc.model.Result;
import eu.dnetlib.dhp.oa.graph.dump.ResultMapper;
import eu.dnetlib.dhp.schema.common.ModelSupport;
/**
* @author miriam.baglioni
* @Date 27/07/22
*/
public class SelectEoscResults implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SelectEoscResults.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SelectEoscResults.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/eosc_select_result_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String eoscDatasourceIdsPath = parser.get("eoscDatasourceIdsPath");
log.info("eoscDatasourceIdsPath: {}", eoscDatasourceIdsPath);
final String communityMapPath = parser.get("communityMapPath");
log.info("communityMapPath: {}", communityMapPath);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
selectEoscResults(spark, inputPath, outputPath, communityMapPath, eoscDatasourceIdsPath);
});
}
private static <R extends eu.dnetlib.dhp.schema.oaf.Result> void selectEoscResults(SparkSession spark,
String inputPath, String outputPath,
String communityMapPath, String eoscDatasourceIdsPath) {
List<MasterDuplicate> df = Utils
.readPath(spark, eoscDatasourceIdsPath, MasterDuplicate.class)
.collectAsList();
CommunityMap communityMap = Utils.getCommunityMap(spark, communityMapPath);
List<String> entities = Arrays.asList("publication", "software", "dataset", "otherresearchproduct");
entities
.parallelStream()
.forEach(
entity -> Utils
.readPath(spark, inputPath + "/" + entity, ModelSupport.entityTypes.get(entity))
.filter(
(FilterFunction<R>) r -> !r.getDataInfo().getDeletedbyinference()
&& !r.getDataInfo().getInvisible()
&& (r.getContext().stream().anyMatch(c -> c.getId().equals("eosc")) ||
r
.getCollectedfrom()
.stream()
.anyMatch(cf -> cf.getValue().equalsIgnoreCase("B2FIND"))))
.map(
(MapFunction<R, Result>) r -> (Result) ResultMapper
.map(r, communityMap, df),
Encoders.bean(Result.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "/" + entity));
}
}

View File

@ -112,29 +112,6 @@
<ok to="fork_dump_eosc_result"/>
<error to="Kill"/>
</action>
<!-- <action name="find_datasource_eosc_id">-->
<!-- <spark xmlns="uri:oozie:spark-action:0.2">-->
<!-- <master>yarn</master>-->
<!-- <mode>cluster</mode>-->
<!-- <name>Dump Publication For EOSC </name>-->
<!-- <class>eu.dnetlib.dhp.oa.graph.dump.eosc.EoscDatasourceId</class>-->
<!-- <jar>dump-${projectVersion}.jar</jar>-->
<!-- <spark-opts>-->
<!-- &#45;&#45;executor-memory=${sparkExecutorMemory}-->
<!-- &#45;&#45;executor-cores=${sparkExecutorCores}-->
<!-- &#45;&#45;driver-memory=${sparkDriverMemory}-->
<!-- &#45;&#45;conf spark.extraListeners=${spark2ExtraListeners}-->
<!-- &#45;&#45;conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}-->
<!-- &#45;&#45;conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}-->
<!-- &#45;&#45;conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}-->
<!-- &#45;&#45;conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}-->
<!-- </spark-opts>-->
<!-- <arg>&#45;&#45;sourcePath</arg><arg>${workingDir}/masterduplicate</arg>-->
<!-- <arg>&#45;&#45;outputPath</arg><arg>${workingDir}/eosc/datasourceids</arg>-->
<!-- </spark>-->
<!-- <ok to="fork_dump_eosc_result"/>-->
<!-- <error to="Kill"/>-->
<!-- </action>-->
<fork name="fork_dump_eosc_result">
<path start="dump_eosc_publication"/>

View File

@ -10,7 +10,7 @@
"paramName": "out",
"paramLongName": "outputPath",
"paramDescription": "the path used to store temporary output files",
"paramRequired": true
"paramRequired": false
},
{
"paramName": "ssm",
@ -22,8 +22,13 @@
"paramName":"rp",
"paramLongName":"resultPath",
"paramDescription": "The path to the community map",
"paramRequired": true
}
"paramRequired": false
},{
"paramName":"wp",
"paramLongName":"workingPath",
"paramDescription": "The path to the community map",
"paramRequired": false
}
]

View File

@ -22,7 +22,7 @@
"paramName":"tn",
"paramLongName":"resultTableName",
"paramDescription": "the name of the result table we are currently working on",
"paramRequired": true
"paramRequired": false
},
{
"paramName":"cmp",
@ -34,7 +34,7 @@
"paramName":"edip",
"paramLongName":"eoscDatasourceIdsPath",
"paramDescription": "The path to the community map",
"paramRequired": true
"paramRequired": false
}
]

View File

@ -0,0 +1,292 @@
<workflow-app name="dump_graph_for_eosc" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>
<description>the source path</description>
</property>
<property>
<name>outputPath</name>
<description>the output path</description>
</property>
<property>
<name>accessToken</name>
<description>the access token used for the deposition in Zenodo</description>
</property>
<property>
<name>connectionUrl</name>
<description>the connection url for Zenodo</description>
</property>
<property>
<name>metadata</name>
<description> the metadata associated to the deposition</description>
</property>
<property>
<name>depositionType</name>
<description>the type of deposition we want to perform. "new" for brand new deposition, "version" for a new version of a published deposition (in this case the concept record id must be provided), "upload" to upload content to an open deposition for which we already have the deposition id (in this case the deposition id should be provided)</description>
</property>
<property>
<name>conceptRecordId</name>
<description>for new version, the id of the record for the old deposition</description>
</property>
<property>
<name>depositionId</name>
<description>the depositionId of a deposition open that has to be added content</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="save_community_map"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="save_community_map">
<java>
<main-class>eu.dnetlib.dhp.oa.graph.dump.eosc.SaveCommunityMap</main-class>
<arg>--outputPath</arg><arg>${workingDir}/communityMap</arg>
<arg>--nameNode</arg><arg>${nameNode}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
</java>
<ok to="get_ds_master_duplicate"/>
<error to="Kill"/>
</action>
<action name="get_ds_master_duplicate">
<java>
<main-class>eu.dnetlib.dhp.oa.graph.dump.eosc.EoscMasterDuplicate</main-class>
<arg>--postgresUrl</arg><arg>${postgresURL}</arg>
<arg>--postgresUser</arg><arg>${postgresUser}</arg>
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
<arg>--hdfsPath</arg><arg>${workingDir}/masterduplicate</arg>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
</java>
<ok to="dump_eosc_result"/>
<error to="Kill"/>
</action>
<action name="dump_eosc_result">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump Publication For EOSC </name>
<class>eu.dnetlib.dhp.oa.graph.dump.eosc.SelectEoscResults</class>
<jar>dump-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--outputPath</arg><arg>${workingDir}/dump</arg>
<arg>--communityMapPath</arg><arg>${workingDir}/communityMap</arg>
<arg>--eoscDatasourceIdsPath</arg><arg>${workingDir}/masterduplicate</arg>
</spark>
<ok to="extend_result_with_affiliation"/>
<error to="Kill"/>
</action>
<action name="extend_result_with_affiliation">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Extend Dump Publication For EOSC with affiliations </name>
<class>eu.dnetlib.dhp.oa.graph.dump.eosc.ExtendEoscResultWithOrganization</class>
<jar>dump-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--workingPath</arg><arg>${workingDir}/dump</arg>
</spark>
<ok to="prepareResultProject"/>
<error to="Kill"/>
</action>
<action name="prepareResultProject">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Prepare association result subset of project info</name>
<class>eu.dnetlib.dhp.oa.graph.dump.eosc.SparkPrepareResultProject</class>
<jar>dump-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--outputPath</arg><arg>${workingDir}/preparedInfo</arg>
</spark>
<ok to="extend_result_with_projects"/>
<error to="Kill"/>
</action>
<action name="extend_result_with_projects">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Extend dumped publications with information about project</name>
<class>eu.dnetlib.dhp.oa.graph.dump.eosc.ExtendEoscResultWithProject</class>
<jar>dump-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/dump/affiliation</arg>
<arg>--outputPath</arg><arg>${workingDir}/dump/project</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg>
<arg>--dumpType</arg><arg>eosc</arg>
</spark>
<ok to="select_relation"/>
<error to="Kill"/>
</action>
<action name="select_relation">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Select the set of relations between the results in the selected set</name>
<class>eu.dnetlib.dhp.oa.graph.dump.eosc.SparkSelectRelation</class>
<jar>dump-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--outputPath</arg><arg>${workingDir}/dump</arg>
<arg>--removeSet</arg><arg>${removeSet}</arg>
</spark>
<ok to="extend_result_with_relation"/>
<error to="Kill"/>
</action>
<action name="extend_result_with_relation">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Extends the publication adding the relations of which the publication is the source node</name>
<class>eu.dnetlib.dhp.oa.graph.dump.eosc.ExtendEoscResultWithRelation</class>
<jar>dump-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--relationPath</arg><arg>${workingDir}/dump/relation</arg>
<arg>--resultPath</arg><arg>${workingDir}/dump/project</arg>
<arg>--outputPath</arg><arg>${outputPath}/dump</arg>
</spark>
<ok to="make_archive"/>
<error to="Kill"/>
</action>
<action name="make_archive">
<java>
<main-class>eu.dnetlib.dhp.oa.graph.dump.MakeTar</main-class>
<arg>--hdfsPath</arg><arg>${outputPath}/tar</arg>
<arg>--nameNode</arg><arg>${nameNode}</arg>
<arg>--sourcePath</arg><arg>${outputPath}/dump</arg>
</java>
<ok to="send_zenodo"/>
<error to="Kill"/>
</action>
<action name="send_zenodo">
<java>
<main-class>eu.dnetlib.dhp.oa.graph.dump.SendToZenodoHDFS</main-class>
<arg>--hdfsPath</arg><arg>${outputPath}/tar</arg>
<arg>--nameNode</arg><arg>${nameNode}</arg>
<arg>--accessToken</arg><arg>${accessToken}</arg>
<arg>--connectionUrl</arg><arg>${connectionUrl}</arg>
<arg>--metadata</arg><arg>${metadata}</arg>
<arg>--conceptRecordId</arg><arg>${conceptRecordId}</arg>
<arg>--depositionType</arg><arg>${depositionType}</arg>
<arg>--depositionId</arg><arg>${depositionId}</arg>
</java>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>