From aa9f3d96981b72aa712bd8e0b6ef5d7c554a61a7 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 3 Aug 2020 18:06:18 +0200 Subject: [PATCH] changed logic for save in s3 directly --- .../dhp/oa/graph/dump/DumpProducts.java | 51 +++++++++++++++++-- .../eu/dnetlib/dhp/oa/graph/dump/Utils.java | 8 +++ .../dump/graph/CreateContextEntities.java | 2 +- .../graph/dump/graph/DumpGraphEntities.java | 10 +++- .../dhp/oa/graph/dump/graph/Extractor.java | 25 ++++++++- .../dhp/oa/graph/dump/graph/Process.java | 39 ++++++++------ .../dump/graph/SparkDumpEntitiesJob.java | 5 +- .../SparkExtractRelationFromEntities.java | 3 ++ 8 files changed, 118 insertions(+), 25 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/DumpProducts.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/DumpProducts.java index 5febfb33e..9f32d1d6a 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/DumpProducts.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/DumpProducts.java @@ -24,7 +24,7 @@ import eu.dnetlib.dhp.schema.oaf.*; public class DumpProducts implements Serializable { - public void run(Boolean isSparkSessionManaged, String inputPath, String outputPath, CommunityMap communityMap, + public void run(Boolean isSparkSessionManaged, String inputPath, String outputPath, String communityMapPath, Class inputClazz, Class outputClazz, boolean graph) { @@ -36,19 +36,60 @@ public class DumpProducts implements Serializable { isSparkSessionManaged, spark -> { Utils.removeOutputDir(spark, outputPath); - execDump(spark, inputPath, outputPath, communityMap, inputClazz, outputClazz, graph);// , dumpClazz); - + execDump(spark, inputPath, outputPath, communityMapPath, inputClazz, outputClazz, graph);// , + // dumpClazz); }); } +// public void run(Boolean isSparkSessionManaged, String inputPath, String outputPath, CommunityMap communityMap, +// Class inputClazz, +// Class outputClazz, +// boolean graph) { +// +// SparkConf conf = new SparkConf(); +// +// runWithSparkSession( +// conf, +// isSparkSessionManaged, +// spark -> { +// Utils.removeOutputDir(spark, outputPath); +// execDump(spark, inputPath, outputPath, communityMap, inputClazz, outputClazz, graph);// , +// // dumpClazz); +// }); +// } + +// public static void execDump( +// SparkSession spark, +// String inputPath, +// String outputPath, +// CommunityMap communityMap, +// Class inputClazz, +// Class outputClazz, +// boolean graph) { +// +// // CommunityMap communityMap = Utils.getCommunityMap(spark, communityMapPath); +// +// Utils +// .readPath(spark, inputPath, inputClazz) +// .map(value -> execMap(value, communityMap, graph), Encoders.bean(outputClazz)) +// .filter(Objects::nonNull) +// .write() +// .mode(SaveMode.Overwrite) +// .option("compression", "gzip") +// .json(outputPath); +// +// } + public static void execDump( SparkSession spark, String inputPath, String outputPath, - CommunityMap communityMap, + String communityMapPath, Class inputClazz, Class outputClazz, - boolean graph) throws ClassNotFoundException { + boolean graph) { + + CommunityMap communityMap = Utils.getCommunityMap(spark, communityMapPath); Utils .readPath(spark, inputPath, inputClazz) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Utils.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Utils.java index ec18752a6..83cb6a3cc 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Utils.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Utils.java @@ -7,8 +7,10 @@ import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.gson.Gson; import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap; import eu.dnetlib.dhp.oa.graph.dump.graph.Constants; import eu.dnetlib.dhp.utils.DHPUtils; import eu.dnetlib.dhp.utils.ISLookupClientFactory; @@ -40,4 +42,10 @@ public class Utils { "%s|%s::%s", Constants.CONTEXT_ID, Constants.CONTEXT_NS_PREFIX, DHPUtils.md5(id)); } + + public static CommunityMap getCommunityMap(SparkSession spark, String communityMapPath) { + + return new Gson().fromJson(spark.read().textFile(communityMapPath).collectAsList().get(0), CommunityMap.class); + + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/CreateContextEntities.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/CreateContextEntities.java index be2b028bc..8cda0c8a2 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/CreateContextEntities.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/CreateContextEntities.java @@ -100,7 +100,7 @@ public class CreateContextEntities implements Serializable { protected void writeEntity(final R r) { try { writer.write(Utils.OBJECT_MAPPER.writeValueAsString(r)); - //log.info("writing context : {}", new Gson().toJson(r)); + // log.info("writing context : {}", new Gson().toJson(r)); writer.newLine(); } catch (final Exception e) { throw new RuntimeException(e); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/DumpGraphEntities.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/DumpGraphEntities.java index ee5881366..262ce4b8a 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/DumpGraphEntities.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/DumpGraphEntities.java @@ -35,13 +35,19 @@ public class DumpGraphEntities implements Serializable { String inputPath, String outputPath, Class inputClazz, - CommunityMap communityMap) { + String communityMapPath) { + // CommunityMap communityMap) { SparkConf conf = new SparkConf(); + switch (ModelSupport.idPrefixMap.get(inputClazz)) { case "50": DumpProducts d = new DumpProducts(); - d.run(isSparkSessionManaged, inputPath, outputPath, communityMap, inputClazz, Result.class, true); + d + .run( + isSparkSessionManaged, inputPath, outputPath, communityMapPath, inputClazz, Result.class, + true); + // d.run(isSparkSessionManaged, inputPath, outputPath, communityMap, inputClazz, Result.class, true); break; case "40": runWithSparkSession( diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Extractor.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Extractor.java index ee8734f93..940975893 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Extractor.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Extractor.java @@ -8,13 +8,17 @@ import java.util.*; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; +import com.google.gson.Gson; + import eu.dnetlib.dhp.oa.graph.dump.DumpProducts; import eu.dnetlib.dhp.oa.graph.dump.Utils; import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap; +import eu.dnetlib.dhp.oa.graph.dump.zenodo.Community; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.dump.oaf.Provenance; @@ -28,6 +32,24 @@ import eu.dnetlib.dhp.schema.oaf.Result; public class Extractor implements Serializable { +// public void run(Boolean isSparkSessionManaged, +// String inputPath, +// String outputPath, +// Class inputClazz, +// String communityMapPath) { +// +// SparkConf conf = new SparkConf(); +// +// runWithSparkSession( +// conf, +// isSparkSessionManaged, +// spark -> { +// Utils.removeOutputDir(spark, outputPath); +// extractRelationResult( +// spark, inputPath, outputPath, inputClazz, Utils.getCommunityMap(spark, communityMapPath)); +// }); +// } + public void run(Boolean isSparkSessionManaged, String inputPath, String outputPath, @@ -41,7 +63,8 @@ public class Extractor implements Serializable { isSparkSessionManaged, spark -> { Utils.removeOutputDir(spark, outputPath); - extractRelationResult(spark, inputPath, outputPath, inputClazz, communityMap); + extractRelationResult( + spark, inputPath, outputPath, inputClazz, communityMap); }); } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Process.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Process.java index 4dee8b296..43d7b2d56 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Process.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Process.java @@ -56,24 +56,33 @@ public class Process implements Serializable { String nodeType = ModelSupport.idPrefixEntity.get(ds.substring(0, 2)); String contextId = Utils.getContextId(ci.getId()); - relationList.add(Relation.newInstance( - Node.newInstance(contextId, eu.dnetlib.dhp.schema.dump.oaf.graph.Constants.CONTEXT_ENTITY), - Node.newInstance(ds, nodeType), - RelType.newInstance(ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP), - Provenance - .newInstance( + relationList + .add( + Relation + .newInstance( + Node + .newInstance( + contextId, eu.dnetlib.dhp.schema.dump.oaf.graph.Constants.CONTEXT_ENTITY), + Node.newInstance(ds, nodeType), + RelType.newInstance(ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP), + Provenance + .newInstance( eu.dnetlib.dhp.oa.graph.dump.graph.Constants.USER_CLAIM, - eu.dnetlib.dhp.oa.graph.dump.graph.Constants.DEFAULT_TRUST) - )); + eu.dnetlib.dhp.oa.graph.dump.graph.Constants.DEFAULT_TRUST))); - relationList.add(Relation.newInstance(Node.newInstance(ds, nodeType), - Node.newInstance(contextId, eu.dnetlib.dhp.schema.dump.oaf.graph.Constants.CONTEXT_ENTITY), - RelType.newInstance(ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP), - Provenance - .newInstance( + relationList + .add( + Relation + .newInstance( + Node.newInstance(ds, nodeType), + Node + .newInstance( + contextId, eu.dnetlib.dhp.schema.dump.oaf.graph.Constants.CONTEXT_ENTITY), + RelType.newInstance(ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP), + Provenance + .newInstance( eu.dnetlib.dhp.oa.graph.dump.graph.Constants.USER_CLAIM, - eu.dnetlib.dhp.oa.graph.dump.graph.Constants.DEFAULT_TRUST) - )); + eu.dnetlib.dhp.oa.graph.dump.graph.Constants.DEFAULT_TRUST))); }); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkDumpEntitiesJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkDumpEntitiesJob.java index a09d5eb84..63caa4f69 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkDumpEntitiesJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkDumpEntitiesJob.java @@ -42,6 +42,8 @@ public class SparkDumpEntitiesJob implements Serializable { final String resultClassName = parser.get("resultTableName"); log.info("resultTableName: {}", resultClassName); + final String communityMapPath = parser.get("communityMapPath"); + final String isLookUpUrl = parser.get("isLookUpUrl"); log.info("isLookUpUrl: {}", isLookUpUrl); @@ -52,7 +54,8 @@ public class SparkDumpEntitiesJob implements Serializable { CommunityMap communityMap = queryInformationSystem.getCommunityMap(); DumpGraphEntities dg = new DumpGraphEntities(); - dg.run(isSparkSessionManaged, inputPath, outputPath, inputClazz, communityMap); + dg.run(isSparkSessionManaged, inputPath, outputPath, inputClazz, communityMapPath); + // dg.run(isSparkSessionManaged, inputPath, outputPath, inputClazz, communityMap); } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkExtractRelationFromEntities.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkExtractRelationFromEntities.java index a580e7de0..16c7c7d32 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkExtractRelationFromEntities.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkExtractRelationFromEntities.java @@ -42,6 +42,8 @@ public class SparkExtractRelationFromEntities implements Serializable { final String resultClassName = parser.get("resultTableName"); log.info("resultTableName: {}", resultClassName); +// final String communityMapPath = parser.get("communityMapPath"); + final String isLookUpUrl = parser.get("isLookUpUrl"); log.info("isLookUpUrl: {}", isLookUpUrl); @@ -52,6 +54,7 @@ public class SparkExtractRelationFromEntities implements Serializable { CommunityMap communityMap = queryInformationSystem.getCommunityMap(); Extractor extractor = new Extractor(); + // extractor.run(isSparkSessionManaged, inputPath, outputPath, inputClazz, communityMapPath); extractor.run(isSparkSessionManaged, inputPath, outputPath, inputClazz, communityMap); }