diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithOrganization.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithOrganization.java new file mode 100644 index 0000000..09cc1c7 --- /dev/null +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithOrganization.java @@ -0,0 +1,160 @@ + +package eu.dnetlib.dhp.oa.graph.dump.eosc; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.*; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.MapGroupsFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.eosc.model.OrganizationPid; +import eu.dnetlib.dhp.eosc.model.Result; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.Organization; +import eu.dnetlib.dhp.schema.oaf.Relation; +import scala.Tuple2; + +/** + * @author miriam.baglioni + * @Date 27/07/22 + */ +public class ExtendEoscResultWithOrganization implements Serializable { + private static final Logger log = LoggerFactory.getLogger(ExtendEoscResultWithOrganization.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + ExtendEoscResultWithOrganization.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump/eosc_extend_result_with_organization_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String workingPath = parser.get("workingPath"); + log.info("workingPath: {}", workingPath); + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, workingPath + "/affiliation"); + addOrganizations(spark, inputPath, workingPath); + }); + } + + private static void addOrganizations(SparkSession spark, String inputPath, String workingPath) { + + List entities = Arrays.asList("publication", "dataset", "software", "otherresearchproduct"); + + entities + .parallelStream() + .forEach( + entity -> { + Dataset results = Utils + .readPath(spark, workingPath + "/" + entity, Result.class); + Dataset relations = Utils + .readPath(spark, inputPath + "/relation", Relation.class) + .filter( + (FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() && + !r.getDataInfo().getInvisible() + && r.getSubRelType().equalsIgnoreCase(ModelConstants.AFFILIATION)); + + Dataset organizations = Utils + .readPath(spark, inputPath + "/organization", Organization.class); + Dataset resultOrganization = relations + .joinWith(organizations, relations.col("source").equalTo(organizations.col("id")), "left") + .map((MapFunction, ResultOrganizations>) t2 -> { + if (t2._2() != null) { + ResultOrganizations rOrg = new ResultOrganizations(); + rOrg.setResultId(t2._1().getTarget()); + eu.dnetlib.dhp.eosc.model.Organization org = new eu.dnetlib.dhp.eosc.model.Organization(); + org.setId(t2._2().getId()); + if (Optional.ofNullable(t2._2().getLegalname()).isPresent()) { + org.setName(t2._2().getLegalname().getValue()); + } else { + org.setName(""); + } + HashMap> organizationPids = new HashMap<>(); + if (Optional.ofNullable(t2._2().getPid()).isPresent()) + t2._2().getPid().forEach(p -> { + if (!organizationPids.containsKey(p.getQualifier().getClassid())) + organizationPids.put(p.getQualifier().getClassid(), new HashSet<>()); + organizationPids.get(p.getQualifier().getClassid()).add(p.getValue()); + }); + List pids = new ArrayList<>(); + for (String key : organizationPids.keySet()) { + for (String value : organizationPids.get(key)) { + OrganizationPid pid = new OrganizationPid(); + pid.setValue(value); + pid.setType(key); + pids.add(pid); + } + } + org.setPid(pids); + rOrg.setAffiliation(org); + return rOrg; + } + return null; + + }, Encoders.bean(ResultOrganizations.class)) + .filter(Objects::nonNull); + results + .joinWith( + resultOrganization, results.col("id").equalTo(resultOrganization.col("resultId")), "left") + .groupByKey( + (MapFunction, String>) t2 -> t2._1().getId(), + Encoders.STRING()) + .mapGroups( + (MapGroupsFunction, Result>) (s, it) -> { + Tuple2 first = it.next(); + if (first._2() == null) { + return first._1(); + } + Result ret = first._1(); + List affiliation = new ArrayList<>(); + Set alreadyInsertedAffiliations = new HashSet<>(); + affiliation.add(first._2().getAffiliation()); + alreadyInsertedAffiliations.add(first._2().getAffiliation().getId()); + it.forEachRemaining(res -> { + if (!alreadyInsertedAffiliations.contains(res._2().getAffiliation().getId())) { + affiliation.add(res._2().getAffiliation()); + alreadyInsertedAffiliations.add(res._2().getAffiliation().getId()); + } + + }); + ret.setAffiliation(affiliation); + return ret; + }, Encoders.bean(Result.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingPath + "/affiliation/" + entity); + }); + + } + +} diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithProject.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithProject.java new file mode 100644 index 0000000..af5e379 --- /dev/null +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithProject.java @@ -0,0 +1,104 @@ + +package eu.dnetlib.dhp.oa.graph.dump.eosc; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.eosc.model.Result; +import eu.dnetlib.dhp.oa.graph.dump.Constants; +import scala.Tuple2; + +public class ExtendEoscResultWithProject implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(ExtendEoscResultWithProject.class); + public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + ExtendEoscResultWithProject.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump/eosc_project_input_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + final String preparedInfoPath = parser.get("preparedInfoPath"); + log.info("preparedInfoPath: {}", preparedInfoPath); + + final String dumpType = Optional + .ofNullable(parser.get("dumpType")) + .orElse(Constants.DUMPTYPE.COMMUNITY.getType()); + log.info("dumpType: {}", dumpType); + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + extend(spark, inputPath, outputPath, preparedInfoPath); + }); + } + + private static void extend( + SparkSession spark, + String inputPath, + String outputPath, + String preparedInfoPath) { + + List entities = Arrays.asList("publication", "dataset", "software", "otherresearchproduct"); + + entities.parallelStream().forEach(entity -> { + Dataset result = Utils.readPath(spark, inputPath + "/" + entity, Result.class); + + Dataset resultProject = Utils.readPath(spark, preparedInfoPath, ResultProject.class); + result + .joinWith( + resultProject, result.col("id").equalTo(resultProject.col("resultId")), + "left") + .map((MapFunction, Result>) value -> { + Result r = value._1(); + Optional.ofNullable(value._2()).ifPresent(rp -> r.setProjects(rp.getProjectsList())); + return r; + }, Encoders.bean(Result.class)) + .write() + .option("compression", "gzip") + .mode(SaveMode.Append) + .json(outputPath + "/" + entity); + + }); + + } + +} diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithRelation.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithRelation.java new file mode 100644 index 0000000..fe7422e --- /dev/null +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithRelation.java @@ -0,0 +1,104 @@ + +package eu.dnetlib.dhp.oa.graph.dump.eosc; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.MapGroupsFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.eosc.model.Relation; +import eu.dnetlib.dhp.eosc.model.Result; +import scala.Tuple2; + +/** + * @author miriam.baglioni + * @Date 02/02/23 + */ +public class ExtendEoscResultWithRelation implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(ExtendEoscResultWithRelation.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + ExtendEoscResultWithRelation.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump/eosc_input_extendwithrelation_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String resultPath = parser.get("resultPath"); + log.info("resultPath: {}", resultPath); + + final String relationPath = parser.get("relationPath"); + log.info("relationPath: {}", relationPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + extendResultWithRelation(spark, resultPath, relationPath, outputPath); + + }); + + } + + private static void extendResultWithRelation(SparkSession spark, String resultPath, String relationPath, + String outputPath) { + List entities = Arrays.asList("publication", "software", "dataset", "otherresearchproduct"); + + entities.parallelStream().forEach(entity -> { + Dataset relationDataset = Utils.readPath(spark, relationPath, Relation.class); + Dataset resultDataset = Utils.readPath(spark, resultPath + "/" + entity, Result.class); + + resultDataset + .joinWith(relationDataset, resultDataset.col("id").equalTo(relationDataset.col("source")), "left") + .groupByKey((MapFunction, String>) t2 -> t2._1().getId(), Encoders.STRING()) + .mapGroups((MapGroupsFunction, Result>) (k, it) -> { + Tuple2 first = it.next(); + Result r = first._1(); + if (Optional.ofNullable(first._2()).isPresent()) { + if (r.getRelations() == null) { + r.setRelations(new ArrayList<>()); + } + r.getRelations().add(first._2()); + it.forEachRemaining(t2 -> r.getRelations().add(t2._2())); + } + return r; + }, Encoders.bean(Result.class)) + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(outputPath + "/" + entity); + }); + + } +} diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SelectEoscResults.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SelectEoscResults.java new file mode 100644 index 0000000..382ce13 --- /dev/null +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SelectEoscResults.java @@ -0,0 +1,107 @@ + +package eu.dnetlib.dhp.oa.graph.dump.eosc; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.eosc.model.Result; +import eu.dnetlib.dhp.oa.graph.dump.ResultMapper; +import eu.dnetlib.dhp.schema.common.ModelSupport; + +/** + * @author miriam.baglioni + * @Date 27/07/22 + */ +public class SelectEoscResults implements Serializable { + private static final Logger log = LoggerFactory.getLogger(SelectEoscResults.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SelectEoscResults.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump/eosc_select_result_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + final String eoscDatasourceIdsPath = parser.get("eoscDatasourceIdsPath"); + log.info("eoscDatasourceIdsPath: {}", eoscDatasourceIdsPath); + + final String communityMapPath = parser.get("communityMapPath"); + log.info("communityMapPath: {}", communityMapPath); + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + selectEoscResults(spark, inputPath, outputPath, communityMapPath, eoscDatasourceIdsPath); + }); + } + + private static void selectEoscResults(SparkSession spark, + String inputPath, String outputPath, + String communityMapPath, String eoscDatasourceIdsPath) { + + List df = Utils + .readPath(spark, eoscDatasourceIdsPath, MasterDuplicate.class) + .collectAsList(); + + CommunityMap communityMap = Utils.getCommunityMap(spark, communityMapPath); + List entities = Arrays.asList("publication", "software", "dataset", "otherresearchproduct"); + + entities + .parallelStream() + .forEach( + entity -> Utils + .readPath(spark, inputPath + "/" + entity, ModelSupport.entityTypes.get(entity)) + .filter( + (FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() + && !r.getDataInfo().getInvisible() + && (r.getContext().stream().anyMatch(c -> c.getId().equals("eosc")) || + r + .getCollectedfrom() + .stream() + .anyMatch(cf -> cf.getValue().equalsIgnoreCase("B2FIND")))) + .map( + (MapFunction) r -> (Result) ResultMapper + .map(r, communityMap, df), + Encoders.bean(Result.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "/" + entity)); + + } + +} diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc/oozie_app/workflow.xml b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc/oozie_app/workflow.xml index 2e354d3..796aacb 100644 --- a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc/oozie_app/workflow.xml +++ b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc/oozie_app/workflow.xml @@ -112,29 +112,6 @@ - - - - - - - - - - - - - - - - - - - - - - - diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc_extend_result_with_organization_parameters.json b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc_extend_result_with_organization_parameters.json index d4ac670..3a448b6 100644 --- a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc_extend_result_with_organization_parameters.json +++ b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc_extend_result_with_organization_parameters.json @@ -10,7 +10,7 @@ "paramName": "out", "paramLongName": "outputPath", "paramDescription": "the path used to store temporary output files", - "paramRequired": true + "paramRequired": false }, { "paramName": "ssm", @@ -22,8 +22,13 @@ "paramName":"rp", "paramLongName":"resultPath", "paramDescription": "The path to the community map", - "paramRequired": true - } + "paramRequired": false + },{ + "paramName":"wp", + "paramLongName":"workingPath", + "paramDescription": "The path to the community map", + "paramRequired": false +} ] diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc_select_result_parameters.json b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc_select_result_parameters.json index 098495d..ee23095 100644 --- a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc_select_result_parameters.json +++ b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc_select_result_parameters.json @@ -22,7 +22,7 @@ "paramName":"tn", "paramLongName":"resultTableName", "paramDescription": "the name of the result table we are currently working on", - "paramRequired": true + "paramRequired": false }, { "paramName":"cmp", @@ -34,7 +34,7 @@ "paramName":"edip", "paramLongName":"eoscDatasourceIdsPath", "paramDescription": "The path to the community map", - "paramRequired": true + "paramRequired": false } ] diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eoscnew/oozie_app/workflow.xml b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eoscnew/oozie_app/workflow.xml new file mode 100644 index 0000000..a64cc45 --- /dev/null +++ b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eoscnew/oozie_app/workflow.xml @@ -0,0 +1,292 @@ + + + + sourcePath + the source path + + + outputPath + the output path + + + accessToken + the access token used for the deposition in Zenodo + + + connectionUrl + the connection url for Zenodo + + + metadata + the metadata associated to the deposition + + + depositionType + the type of deposition we want to perform. "new" for brand new deposition, "version" for a new version of a published deposition (in this case the concept record id must be provided), "upload" to upload content to an open deposition for which we already have the deposition id (in this case the deposition id should be provided) + + + conceptRecordId + for new version, the id of the record for the old deposition + + + depositionId + the depositionId of a deposition open that has to be added content + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + eu.dnetlib.dhp.oa.graph.dump.eosc.SaveCommunityMap + --outputPath${workingDir}/communityMap + --nameNode${nameNode} + --isLookUpUrl${isLookUpUrl} + + + + + + + eu.dnetlib.dhp.oa.graph.dump.eosc.EoscMasterDuplicate + --postgresUrl${postgresURL} + --postgresUser${postgresUser} + --postgresPassword${postgresPassword} + --hdfsPath${workingDir}/masterduplicate + --hdfsNameNode${nameNode} + + + + + + + + yarn + cluster + Dump Publication For EOSC + eu.dnetlib.dhp.oa.graph.dump.eosc.SelectEoscResults + dump-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --sourcePath${sourcePath} + --outputPath${workingDir}/dump + --communityMapPath${workingDir}/communityMap + --eoscDatasourceIdsPath${workingDir}/masterduplicate + + + + + + + yarn + cluster + Extend Dump Publication For EOSC with affiliations + eu.dnetlib.dhp.oa.graph.dump.eosc.ExtendEoscResultWithOrganization + dump-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --sourcePath${sourcePath} + --workingPath${workingDir}/dump + + + + + + + + yarn + cluster + Prepare association result subset of project info + eu.dnetlib.dhp.oa.graph.dump.eosc.SparkPrepareResultProject + dump-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --sourcePath${sourcePath} + --outputPath${workingDir}/preparedInfo + + + + + + + + yarn + cluster + Extend dumped publications with information about project + eu.dnetlib.dhp.oa.graph.dump.eosc.ExtendEoscResultWithProject + dump-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --sourcePath${workingDir}/dump/affiliation + --outputPath${workingDir}/dump/project + --preparedInfoPath${workingDir}/preparedInfo + --dumpTypeeosc + + + + + + + + yarn + cluster + Select the set of relations between the results in the selected set + eu.dnetlib.dhp.oa.graph.dump.eosc.SparkSelectRelation + dump-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --sourcePath${sourcePath} + --outputPath${workingDir}/dump + --removeSet${removeSet} + + + + + + + + yarn + cluster + Extends the publication adding the relations of which the publication is the source node + eu.dnetlib.dhp.oa.graph.dump.eosc.ExtendEoscResultWithRelation + dump-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --relationPath${workingDir}/dump/relation + --resultPath${workingDir}/dump/project + --outputPath${outputPath}/dump + + + + + + + + eu.dnetlib.dhp.oa.graph.dump.MakeTar + --hdfsPath${outputPath}/tar + --nameNode${nameNode} + --sourcePath${outputPath}/dump + + + + + + + + eu.dnetlib.dhp.oa.graph.dump.SendToZenodoHDFS + --hdfsPath${outputPath}/tar + --nameNode${nameNode} + --accessToken${accessToken} + --connectionUrl${connectionUrl} + --metadata${metadata} + --conceptRecordId${conceptRecordId} + --depositionType${depositionType} + --depositionId${depositionId} + + + + + + \ No newline at end of file