From fe88904df06723898358940ece6eb37dfcb486f3 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 10 Aug 2020 12:01:14 +0200 Subject: [PATCH] changed the wf definition --- .../dump/community/CommunitySplitS3.java | 88 ------------------- .../community/SparkSplitForCommunityS3.java | 64 -------------- .../dhp/oa/graph/dump/oozie_app/workflow.xml | 36 +++++--- 3 files changed, 26 insertions(+), 162 deletions(-) delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/CommunitySplitS3.java delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkSplitForCommunityS3.java diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/CommunitySplitS3.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/CommunitySplitS3.java deleted file mode 100644 index 2e43a4c71..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/CommunitySplitS3.java +++ /dev/null @@ -1,88 +0,0 @@ - -package eu.dnetlib.dhp.oa.graph.dump.community; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - -import java.io.Serializable; -import java.util.Optional; -import java.util.Set; -import java.util.stream.Collectors; - -import org.apache.spark.SparkConf; -import org.apache.spark.SparkContext; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import eu.dnetlib.dhp.oa.graph.dump.Utils; -import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult; - -public class CommunitySplitS3 implements Serializable { - private static final Logger log = LoggerFactory.getLogger(CommunitySplitS3.class); - - public void run(Boolean isSparkSessionManaged, String inputPath, String outputPath, String communityMapPath) { - // public void run(Boolean isSparkSessionManaged, String inputPath, String outputPath, CommunityMap - // communityMap) { - SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - SparkContext sc = spark.sparkContext(); - sc.hadoopConfiguration().set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem"); - sc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", "AK0MM6C2BYA0K1PNJYYX"); - sc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey", "fpeiqUUpKAUOtO6JWMWLTxxlSxJ+yGYwHozm3jHK"); - sc.hadoopConfiguration().set("fs.s3.endpoint", "s3.acm.edu.pl"); - execSplit(spark, inputPath, outputPath, communityMapPath); // communityMap.keySet());// , - - }); - } - - private static void execSplit(SparkSession spark, String inputPath, String outputPath, - String communityMapPath) { - // Set communities) { - - Set communities = Utils.getCommunityMap(spark, communityMapPath).keySet(); - - Dataset result = Utils - .readPath(spark, inputPath + "/publication", CommunityResult.class) - .union(Utils.readPath(spark, inputPath + "/dataset", CommunityResult.class)) - .union(Utils.readPath(spark, inputPath + "/orp", CommunityResult.class)) - .union(Utils.readPath(spark, inputPath + "/software", CommunityResult.class)); - - communities - .stream() - .forEach(c -> printResult(c, result, outputPath)); - - } - - private static void printResult(String c, Dataset result, String outputPath) { - Dataset community_products = result - .filter(r -> containsCommunity(r, c)); - - if (community_products.count() > 0) { - log.info("Writing dump for community: {} ", c); - community_products - .repartition(1) - .write() - .option("compression", "gzip") - .mode(SaveMode.Overwrite) - .json(outputPath + "/" + c); - } - - } - - private static boolean containsCommunity(CommunityResult r, String c) { - if (Optional.ofNullable(r.getContext()).isPresent()) { - return r - .getContext() - .stream() - .filter(con -> con.getCode().equals(c)) - .collect(Collectors.toList()) - .size() > 0; - } - return false; - } -} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkSplitForCommunityS3.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkSplitForCommunityS3.java deleted file mode 100644 index 68c832d63..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkSplitForCommunityS3.java +++ /dev/null @@ -1,64 +0,0 @@ - -package eu.dnetlib.dhp.oa.graph.dump.community; - -import java.io.Serializable; -import java.util.Optional; - -import org.apache.commons.io.IOUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.oa.graph.dump.QueryInformationSystem; -import eu.dnetlib.dhp.utils.ISLookupClientFactory; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; - -public class SparkSplitForCommunityS3 implements Serializable { - - private static final Logger log = LoggerFactory.getLogger(SparkSplitForCommunityS3.class); - - public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - SparkSplitForCommunityS3.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/dump/split_parameters.json")); - - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); - - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - - final String inputPath = parser.get("sourcePath"); - log.info("inputPath: {}", inputPath); - - final String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); - - final String communityMapPath = parser.get("communityMapPath"); - - final String isLookUpUrl = parser.get("isLookUpUrl"); - log.info("isLookUpUrl: {}", isLookUpUrl); - - CommunitySplitS3 split = new CommunitySplitS3(); - - // CommunityMap communityMap; - -// QueryInformationSystem queryInformationSystem = new QueryInformationSystem(); -// queryInformationSystem.setIsLookUp(getIsLookUpService(isLookUpUrl)); -// communityMap = queryInformationSystem.getCommunityMap(); - - split.run(isSparkSessionManaged, inputPath, outputPath, communityMapPath); - // split.run(isSparkSessionManaged, inputPath, outputPath, communityMap); - - } - - public static ISLookUpService getIsLookUpService(String isLookUpUrl) { - return ISLookupClientFactory.getLookUpService(isLookUpUrl); - } - -} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/oozie_app/workflow.xml index fc3debb64..18c8bb682 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/oozie_app/workflow.xml @@ -13,6 +13,26 @@ outputPath the output path + + accessToken + the access token used for the deposition in Zenodo + + + connectionUrl + the connection url for Zenodo + + + metadata + the metadata associated to the deposition + + + newDeposition + true if it is a brand new depositon. false for new version of an old deposition + + + conceptRecordId + for new version, the id of the record for the old deposition + hiveDbName the target hive database name @@ -81,7 +101,7 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] @@ -135,7 +155,6 @@ --resultTableNameeu.dnetlib.dhp.schema.oaf.Publication --outputPath${workingDir}/publication --communityMapPath${workingDir}/communityMap - --isLookUpUrl${isLookUpUrl} @@ -162,7 +181,6 @@ --resultTableNameeu.dnetlib.dhp.schema.oaf.Dataset --outputPath${workingDir}/dataset --communityMapPath${workingDir}/communityMap - --isLookUpUrl${isLookUpUrl} @@ -189,7 +207,6 @@ --resultTableNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct --outputPath${workingDir}/otherresearchproduct --communityMapPath${workingDir}/communityMap - --isLookUpUrl${isLookUpUrl} @@ -216,7 +233,6 @@ --resultTableNameeu.dnetlib.dhp.schema.oaf.Software --outputPath${workingDir}/software --communityMapPath${workingDir}/communityMap - --isLookUpUrl${isLookUpUrl} @@ -360,7 +376,7 @@ yarn cluster Split dumped result for community - eu.dnetlib.dhp.oa.graph.dump.community.SparkSplitForCommunityS3 + eu.dnetlib.dhp.oa.graph.dump.community.SparkSplitForCommunity dhp-graph-mapper-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} @@ -375,22 +391,22 @@ --sourcePath${workingDir}/ext --outputPath${outputPath} --communityMapPath${workingDir}/communityMap - --isLookUpUrl${isLookUpUrl} - + - eu.dnetlib.dhp.oa.graph.dump.SendToZenodo + eu.dnetlib.dhp.oa.graph.dump.SendToZenodoHDFS --hdfsPath${outputPath} --hdfsNameNode${nameNode} --accessToken${accessToken} --connectionUrl${connectionUrl} --metadata${metadata} --communityMapPath${workingDir}/communityMap - --isLookUpUrl${isLookUpUrl} + --conceptRecordId${conceptRecordId} + --newDeposition${newDeposition}