From 8d9b3c5de2909034c79dae215716547fe9f82bf5 Mon Sep 17 00:00:00 2001 From: pjacewicz Date: Fri, 13 Mar 2020 10:01:39 +0100 Subject: [PATCH] WIP action payload mapping into OAF type moved, (local) graph table name enum created, tests fixed --- .../PromoteActionSetFromHDFSJob.java | 214 +++++++++--------- .../PromoteActionSetFromHDFSJobTest.java | 42 ++-- 2 files changed, 122 insertions(+), 134 deletions(-) diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSJob.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSJob.java index daaf047d14..b0239a39c4 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSJob.java +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSJob.java @@ -9,6 +9,7 @@ import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.*; @@ -25,10 +26,22 @@ import static org.apache.spark.sql.functions.*; public class PromoteActionSetFromHDFSJob { + // TODO replace with project's common implementation + public enum GraphTableName { + DATASET, DATASOURCE, ORGANIZATION, OTHERRESEARCHPRODUCT, PROJECT, PUBLICATION, RELATION, SOFTWARE + } + + private static final StructType KV_SCHEMA = StructType$.MODULE$.apply( + Arrays.asList( + StructField$.MODULE$.apply("key", DataTypes.StringType, false, Metadata.empty()), + StructField$.MODULE$.apply("value", DataTypes.StringType, false, Metadata.empty()) + )); + public static void main(String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString( + String jsonConfiguration = IOUtils.toString( PromoteActionSetFromHDFSJob.class - .getResourceAsStream("/eu/dnetlib/dhp/actionmanager/actionmanager_input_parameters.json"))); + .getResourceAsStream("/eu/dnetlib/dhp/actionmanager/actionmanager_input_parameters.json")); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); Boolean isSparkSessionManaged = Optional @@ -37,9 +50,9 @@ public class PromoteActionSetFromHDFSJob { .orElse(Boolean.TRUE); String inputGraphPath = parser.get("inputGraphPath"); String inputActionSetPaths = parser.get("inputActionSetPaths"); - String graphTableName = parser.get("graphTableName"); + GraphTableName graphTableName = GraphTableName.valueOf(parser.get("graphTableName").toUpperCase()); String outputGraphPath = parser.get("outputGraphPath"); - OafMergeAndGet.Strategy strategy = OafMergeAndGet.Strategy.valueOf(parser.get("mergeAndGetStrategy")); + OafMergeAndGet.Strategy strategy = OafMergeAndGet.Strategy.valueOf(parser.get("mergeAndGetStrategy").toUpperCase()); SparkConf conf = new SparkConf(); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); @@ -75,78 +88,71 @@ public class PromoteActionSetFromHDFSJob { SparkSession spark = null; try { spark = SparkSession.builder().config(conf).getOrCreate(); + String inputGraphTablePath = String.format("%s/%s", inputGraphPath, graphTableName.name().toLowerCase()); + String outputGraphTablePath = String.format("%s/%s", outputGraphPath, graphTableName.name().toLowerCase()); - //TODO make graph table generic using enums switch (graphTableName) { - case "dataset": - processWith(spark, - String.format("%s/%s", inputGraphPath, graphTableName), + case DATASET: + processGraphTable(spark, + inputGraphTablePath, inputActionSetPaths, - outputGraphPath, - graphTableName, + outputGraphTablePath, strategy, eu.dnetlib.dhp.schema.oaf.Dataset.class); break; - case "datasource": - processWith(spark, - String.format("%s/%s", inputGraphPath, graphTableName), + case DATASOURCE: + processGraphTable(spark, + inputGraphTablePath, inputActionSetPaths, - outputGraphPath, - graphTableName, + outputGraphTablePath, strategy, Datasource.class); break; - case "organization": - processWith(spark, - String.format("%s/%s", inputGraphPath, graphTableName), + case ORGANIZATION: + processGraphTable(spark, + inputGraphTablePath, inputActionSetPaths, - outputGraphPath, - graphTableName, + outputGraphTablePath, strategy, Organization.class); break; - case "otherresearchproduct": - processWith(spark, - String.format("%s/%s", inputGraphPath, graphTableName), + case OTHERRESEARCHPRODUCT: + processGraphTable(spark, + inputGraphTablePath, inputActionSetPaths, - outputGraphPath, - graphTableName, + outputGraphTablePath, strategy, OtherResearchProduct.class); break; - case "project": - processWith(spark, - String.format("%s/%s", inputGraphPath, graphTableName), + case PROJECT: + processGraphTable(spark, + inputGraphTablePath, inputActionSetPaths, - outputGraphPath, - graphTableName, + outputGraphTablePath, strategy, Project.class); break; - case "publication": - processWith(spark, - String.format("%s/%s", inputGraphPath, graphTableName), + case PUBLICATION: + processGraphTable(spark, + inputGraphTablePath, inputActionSetPaths, - outputGraphPath, - graphTableName, + outputGraphTablePath, strategy, Publication.class); break; - case "relation": - processWith(spark, - String.format("%s/%s", inputGraphPath, graphTableName), + case RELATION: + processGraphTable(spark, + inputGraphTablePath, inputActionSetPaths, - outputGraphPath, - graphTableName, + outputGraphTablePath, strategy, Relation.class); break; - case "software": - processWith(spark, - String.format("%s/%s", inputGraphPath, graphTableName), + case SOFTWARE: + processGraphTable(spark, + inputGraphTablePath, inputActionSetPaths, - outputGraphPath, - graphTableName, + outputGraphTablePath, strategy, Software.class); break; @@ -160,91 +166,74 @@ public class PromoteActionSetFromHDFSJob { } } - private static final StructType KV_SCHEMA = StructType$.MODULE$.apply( - Arrays.asList( - StructField$.MODULE$.apply("key", DataTypes.StringType, false, Metadata.empty()), - StructField$.MODULE$.apply("value", DataTypes.StringType, false, Metadata.empty()) - )); + private static void processGraphTable(SparkSession spark, + String inputGraphTablePath, + String inputActionSetPaths, + String outputGraphTablePath, + OafMergeAndGet.Strategy strategy, + Class clazz) { + Dataset tableDS = readGraphTable(spark, inputGraphTablePath, clazz) + .cache(); + Dataset actionPayloadDS = readActionSetPayloads(spark, inputActionSetPaths, clazz) + .cache(); - private static Dataset readGraphTable(SparkSession spark, String path, Class clazz) { + Dataset result = promoteActionSetForGraphTable(tableDS, actionPayloadDS, strategy, clazz) + .map((MapFunction) value -> value, Encoders.bean(clazz)); + + saveGraphTableAsParquet(result, outputGraphTablePath); + } + + private static Dataset readGraphTable(SparkSession spark, + String inputGraphTablePath, + Class clazz) { JavaRDD rows = JavaSparkContext .fromSparkContext(spark.sparkContext()) - .sequenceFile(path, Text.class, Text.class) + .sequenceFile(inputGraphTablePath, Text.class, Text.class) .map(x -> RowFactory.create(x._1().toString(), x._2().toString())); return spark.createDataFrame(rows, KV_SCHEMA) .map((MapFunction) row -> new ObjectMapper().readValue(row.getAs("value"), clazz), - Encoders.kryo(clazz)); + Encoders.bean(clazz)); } - private static Dataset readActionSetPayloads(SparkSession spark, String inputActionSetPaths) { + private static Dataset readActionSetPayloads(SparkSession spark, + String inputActionSetPaths, + Class clazz) { return Arrays .stream(inputActionSetPaths.split(",")) - .map(inputActionSetPath -> readActionSetPayload(spark, inputActionSetPath)) + .map(inputActionSetPath -> readActionSetPayload(spark, inputActionSetPath, clazz)) .reduce(Dataset::union) - .get(); + .orElseThrow(() -> new RuntimeException("error reading action sets: " + inputActionSetPaths)); } - private static Dataset readActionSetPayload(SparkSession spark, String inputActionSetPath) { + private static Dataset readActionSetPayload(SparkSession spark, + String inputActionSetPath, + Class clazz) { JavaRDD actionsRDD = JavaSparkContext .fromSparkContext(spark.sparkContext()) .sequenceFile(inputActionSetPath, Text.class, Text.class) .map(x -> RowFactory.create(x._1().toString(), x._2().toString())); + SerializableSupplier, T>> actionPayloadToOafFn = () -> (json, c) -> { + try { + return new ObjectMapper().disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES).readValue(json, c); + } catch (IOException e) { + return null; + } + }; + return spark.createDataFrame(actionsRDD, KV_SCHEMA) .select(unbase64(get_json_object(col("value"), "$.TargetValue")) .cast(DataTypes.StringType).as("target_value_json")) - .as(Encoders.STRING()); + .as(Encoders.STRING()) + .map((MapFunction) value -> actionPayloadToOafFn.get().apply(value, clazz), Encoders.bean(clazz)) + .filter((FilterFunction) Objects::nonNull); } - private static void processWith(SparkSession spark, - String inputGraphTablePath, - String inputActionSetPaths, - String outputGraphPath, - String graphTableName, - OafMergeAndGet.Strategy strategy, - Class clazz) { -// System.out.println("===== tableDS ====="); - Dataset tableDS = readGraphTable(spark, inputGraphTablePath, clazz) - .cache(); -// tableDS.printSchema(); -// tableDS.show(); -// tableDS.explain(); -// System.out.println("DEBUG: tableDS.partitions=" + tableDS.rdd().getNumPartitions()); - -// System.out.println("===== actionPayloadDS ====="); - Dataset actionPayloadDS = readActionSetPayloads(spark, inputActionSetPaths) - .cache(); -// actionPayloadDS.printSchema(); -// actionPayloadDS.show(); -// actionPayloadDS.explain(); -// System.out.println("DEBUG: actionPayloadDS.partitions=" + actionPayloadDS.rdd().getNumPartitions()); - -// System.out.println("===== processed ====="); - Dataset processed = processGraphTable(tableDS, actionPayloadDS, strategy, clazz); -// processed.printSchema(); -// processed.show(); -// processed.explain(); -// System.out.println("DEBUG: processed.partitions=" + processed.rdd().getNumPartitions()); - -// System.out.println("===== result ====="); - Dataset result = processed - .map((MapFunction) value -> value, Encoders.bean(clazz)); -// result.printSchema(); -// result.show(); -// result.explain(); -// System.out.println("DEBUG: result.partitions=" + result.rdd().getNumPartitions()); - - String outputGraphTablePath = String.format("%s/%s", outputGraphPath, graphTableName); - result.write() - .format("parquet") - .save(outputGraphTablePath); - } - - private static Dataset processGraphTable(Dataset tableDS, - Dataset actionPayloadDS, - OafMergeAndGet.Strategy strategy, - Class clazz) { + private static Dataset promoteActionSetForGraphTable(Dataset tableDS, + Dataset actionPayloadDS, + OafMergeAndGet.Strategy strategy, + Class clazz) { SerializableSupplier> oafIdFn = () -> x -> { if (x instanceof Relation) { Relation r = (Relation) x; @@ -274,13 +263,6 @@ public class PromoteActionSetFromHDFSJob { tableDS, actionPayloadDS, oafIdFn, - () -> (json, c) -> { - try { - return new ObjectMapper().disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES).readValue(json, clazz); - } catch (IOException e) { - return null; - } - }, mergeAndGetFn, clazz); @@ -292,4 +274,10 @@ public class PromoteActionSetFromHDFSJob { clazz ); } + + private static void saveGraphTableAsParquet(Dataset result, String outputGraphTablePath) { + result.write() + .format("parquet") + .save(outputGraphTablePath); + } } diff --git a/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSJobTest.java b/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSJobTest.java index 465102b96f..61c954af32 100644 --- a/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSJobTest.java +++ b/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/PromoteActionSetFromHDFSJobTest.java @@ -109,67 +109,67 @@ public class PromoteActionSetFromHDFSJobTest { @Test public void shouldReadActionsFromHDFSAndPromoteThemForDatasetsUsingMergeFromStrategy() throws Exception { - readActionsFromHDFSAndPromoteThemFor("dataset", + readActionsFromHDFSAndPromoteThemFor(PromoteActionSetFromHDFSJob.GraphTableName.DATASET, OafMergeAndGet.Strategy.MERGE_FROM_AND_GET, eu.dnetlib.dhp.schema.oaf.Dataset.class); } @Test public void shouldReadActionsFromHDFSAndPromoteThemForDatasourcesUsingMergeFromStrategy() throws Exception { - readActionsFromHDFSAndPromoteThemFor("datasource", + readActionsFromHDFSAndPromoteThemFor(PromoteActionSetFromHDFSJob.GraphTableName.DATASOURCE, OafMergeAndGet.Strategy.MERGE_FROM_AND_GET, Datasource.class); } @Test - public void shouldReadActionsFromHDFSAndPromoteThemForOrganizationUsingMergeFromStrategy() throws Exception { - readActionsFromHDFSAndPromoteThemFor("organization", + public void shouldReadActionsFromHDFSAndPromoteThemForOrganizationsUsingMergeFromStrategy() throws Exception { + readActionsFromHDFSAndPromoteThemFor(PromoteActionSetFromHDFSJob.GraphTableName.ORGANIZATION, OafMergeAndGet.Strategy.MERGE_FROM_AND_GET, Organization.class); } @Test - public void shouldReadActionsFromHDFSAndPromoteThemForOtherResearchProductUsingMergeFromStrategy() throws Exception { - readActionsFromHDFSAndPromoteThemFor("otherresearchproduct", + public void shouldReadActionsFromHDFSAndPromoteThemForOtherResearchProductsUsingMergeFromStrategy() throws Exception { + readActionsFromHDFSAndPromoteThemFor(PromoteActionSetFromHDFSJob.GraphTableName.OTHERRESEARCHPRODUCT, OafMergeAndGet.Strategy.MERGE_FROM_AND_GET, OtherResearchProduct.class); } @Test - public void shouldReadActionsFromHDFSAndPromoteThemForProjectUsingMergeFromStrategy() throws Exception { - readActionsFromHDFSAndPromoteThemFor("project", + public void shouldReadActionsFromHDFSAndPromoteThemForProjectsUsingMergeFromStrategy() throws Exception { + readActionsFromHDFSAndPromoteThemFor(PromoteActionSetFromHDFSJob.GraphTableName.PROJECT, OafMergeAndGet.Strategy.MERGE_FROM_AND_GET, Project.class); } @Test - public void shouldReadActionsFromHDFSAndPromoteThemForPublicationUsingMergeFromStrategy() throws Exception { - readActionsFromHDFSAndPromoteThemFor("publication", + public void shouldReadActionsFromHDFSAndPromoteThemForPublicationsUsingMergeFromStrategy() throws Exception { + readActionsFromHDFSAndPromoteThemFor(PromoteActionSetFromHDFSJob.GraphTableName.PUBLICATION, OafMergeAndGet.Strategy.MERGE_FROM_AND_GET, Publication.class); } @Test - public void shouldReadActionsFromHDFSAndPromoteThemForRelationUsingMergeFromStrategy() throws Exception { - readActionsFromHDFSAndPromoteThemFor("relation", + public void shouldReadActionsFromHDFSAndPromoteThemForRelationsUsingMergeFromStrategy() throws Exception { + readActionsFromHDFSAndPromoteThemFor(PromoteActionSetFromHDFSJob.GraphTableName.RELATION, OafMergeAndGet.Strategy.MERGE_FROM_AND_GET, Relation.class); } @Test - public void shouldReadActionsFromHDFSAndPromoteThemForSoftwareUsingMergeFromStrategy() throws Exception { - readActionsFromHDFSAndPromoteThemFor("software", + public void shouldReadActionsFromHDFSAndPromoteThemForSoftwaresUsingMergeFromStrategy() throws Exception { + readActionsFromHDFSAndPromoteThemFor(PromoteActionSetFromHDFSJob.GraphTableName.SOFTWARE, OafMergeAndGet.Strategy.MERGE_FROM_AND_GET, Software.class); } - private void readActionsFromHDFSAndPromoteThemFor(String graphTableName, + private void readActionsFromHDFSAndPromoteThemFor(PromoteActionSetFromHDFSJob.GraphTableName graphTableName, OafMergeAndGet.Strategy strategy, Class clazz) throws Exception { // given String inputGraphTableJsonDumpPath = - String.format("%s/%s.json", "eu/dnetlib/dhp/actionmanager/input/graph", graphTableName); - createGraphTableFor(inputGraphTableJsonDumpPath, graphTableName, clazz); + String.format("%s/%s.json", "eu/dnetlib/dhp/actionmanager/input/graph", graphTableName.name().toLowerCase()); + createGraphTableFor(inputGraphTableJsonDumpPath, graphTableName.name().toLowerCase(), clazz); String inputActionSetPaths = createActionSets(); Path outputGraphDir = outputDir.resolve("graph"); @@ -178,13 +178,13 @@ public class PromoteActionSetFromHDFSJobTest { "-isSparkSessionManaged", Boolean.FALSE.toString(), "-inputGraphPath", inputGraphDir.toString(), "-inputActionSetPaths", inputActionSetPaths, - "-graphTableName", graphTableName, + "-graphTableName", graphTableName.name(), "-outputGraphPath", outputGraphDir.toString(), - "mergeAndGetStrategy", strategy.name() + "-mergeAndGetStrategy", strategy.name() }); // then - Path outputGraphTableDir = outputGraphDir.resolve(graphTableName); + Path outputGraphTableDir = outputGraphDir.resolve(graphTableName.name().toLowerCase()); assertTrue(Files.exists(outputGraphDir)); List outputGraphTableRows = readGraphTableFromParquet(outputGraphTableDir.toString(), clazz).collectAsList(); @@ -193,7 +193,7 @@ public class PromoteActionSetFromHDFSJobTest { assertEquals(10, outputGraphTableRows.size()); String expectedOutputGraphTableJsonDumpPath = - String.format("%s/%s/%s.json", "eu/dnetlib/dhp/actionmanager/output/graph", strategy.name().toLowerCase(), graphTableName); + String.format("%s/%s/%s.json", "eu/dnetlib/dhp/actionmanager/output/graph", strategy.name().toLowerCase(), graphTableName.name().toLowerCase()); Path expectedOutputGraphTableJsonDumpFile = Paths .get(Objects.requireNonNull(cl.getResource(expectedOutputGraphTableJsonDumpPath)).getFile()); List expectedOutputGraphTableRows = readGraphTableFromJSON(