WIP action payload mapping into OAF type moved, (local) graph table name enum created, tests fixed

This commit is contained in:
Przemysław Jacewicz 2020-03-13 10:01:39 +01:00 committed by przemek
parent 5cc560c7e5
commit 8d9b3c5de2
2 changed files with 122 additions and 134 deletions

View File

@ -9,6 +9,7 @@ import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.*; import org.apache.spark.sql.*;
@ -25,10 +26,22 @@ import static org.apache.spark.sql.functions.*;
public class PromoteActionSetFromHDFSJob { public class PromoteActionSetFromHDFSJob {
// TODO replace with project's common implementation
public enum GraphTableName {
DATASET, DATASOURCE, ORGANIZATION, OTHERRESEARCHPRODUCT, PROJECT, PUBLICATION, RELATION, SOFTWARE
}
private static final StructType KV_SCHEMA = StructType$.MODULE$.apply(
Arrays.asList(
StructField$.MODULE$.apply("key", DataTypes.StringType, false, Metadata.empty()),
StructField$.MODULE$.apply("value", DataTypes.StringType, false, Metadata.empty())
));
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString( String jsonConfiguration = IOUtils.toString(
PromoteActionSetFromHDFSJob.class PromoteActionSetFromHDFSJob.class
.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/actionmanager_input_parameters.json"))); .getResourceAsStream("/eu/dnetlib/dhp/actionmanager/actionmanager_input_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args); parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional Boolean isSparkSessionManaged = Optional
@ -37,9 +50,9 @@ public class PromoteActionSetFromHDFSJob {
.orElse(Boolean.TRUE); .orElse(Boolean.TRUE);
String inputGraphPath = parser.get("inputGraphPath"); String inputGraphPath = parser.get("inputGraphPath");
String inputActionSetPaths = parser.get("inputActionSetPaths"); String inputActionSetPaths = parser.get("inputActionSetPaths");
String graphTableName = parser.get("graphTableName"); GraphTableName graphTableName = GraphTableName.valueOf(parser.get("graphTableName").toUpperCase());
String outputGraphPath = parser.get("outputGraphPath"); String outputGraphPath = parser.get("outputGraphPath");
OafMergeAndGet.Strategy strategy = OafMergeAndGet.Strategy.valueOf(parser.get("mergeAndGetStrategy")); OafMergeAndGet.Strategy strategy = OafMergeAndGet.Strategy.valueOf(parser.get("mergeAndGetStrategy").toUpperCase());
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
@ -75,78 +88,71 @@ public class PromoteActionSetFromHDFSJob {
SparkSession spark = null; SparkSession spark = null;
try { try {
spark = SparkSession.builder().config(conf).getOrCreate(); spark = SparkSession.builder().config(conf).getOrCreate();
String inputGraphTablePath = String.format("%s/%s", inputGraphPath, graphTableName.name().toLowerCase());
String outputGraphTablePath = String.format("%s/%s", outputGraphPath, graphTableName.name().toLowerCase());
//TODO make graph table generic using enums
switch (graphTableName) { switch (graphTableName) {
case "dataset": case DATASET:
processWith(spark, processGraphTable(spark,
String.format("%s/%s", inputGraphPath, graphTableName), inputGraphTablePath,
inputActionSetPaths, inputActionSetPaths,
outputGraphPath, outputGraphTablePath,
graphTableName,
strategy, strategy,
eu.dnetlib.dhp.schema.oaf.Dataset.class); eu.dnetlib.dhp.schema.oaf.Dataset.class);
break; break;
case "datasource": case DATASOURCE:
processWith(spark, processGraphTable(spark,
String.format("%s/%s", inputGraphPath, graphTableName), inputGraphTablePath,
inputActionSetPaths, inputActionSetPaths,
outputGraphPath, outputGraphTablePath,
graphTableName,
strategy, strategy,
Datasource.class); Datasource.class);
break; break;
case "organization": case ORGANIZATION:
processWith(spark, processGraphTable(spark,
String.format("%s/%s", inputGraphPath, graphTableName), inputGraphTablePath,
inputActionSetPaths, inputActionSetPaths,
outputGraphPath, outputGraphTablePath,
graphTableName,
strategy, strategy,
Organization.class); Organization.class);
break; break;
case "otherresearchproduct": case OTHERRESEARCHPRODUCT:
processWith(spark, processGraphTable(spark,
String.format("%s/%s", inputGraphPath, graphTableName), inputGraphTablePath,
inputActionSetPaths, inputActionSetPaths,
outputGraphPath, outputGraphTablePath,
graphTableName,
strategy, strategy,
OtherResearchProduct.class); OtherResearchProduct.class);
break; break;
case "project": case PROJECT:
processWith(spark, processGraphTable(spark,
String.format("%s/%s", inputGraphPath, graphTableName), inputGraphTablePath,
inputActionSetPaths, inputActionSetPaths,
outputGraphPath, outputGraphTablePath,
graphTableName,
strategy, strategy,
Project.class); Project.class);
break; break;
case "publication": case PUBLICATION:
processWith(spark, processGraphTable(spark,
String.format("%s/%s", inputGraphPath, graphTableName), inputGraphTablePath,
inputActionSetPaths, inputActionSetPaths,
outputGraphPath, outputGraphTablePath,
graphTableName,
strategy, strategy,
Publication.class); Publication.class);
break; break;
case "relation": case RELATION:
processWith(spark, processGraphTable(spark,
String.format("%s/%s", inputGraphPath, graphTableName), inputGraphTablePath,
inputActionSetPaths, inputActionSetPaths,
outputGraphPath, outputGraphTablePath,
graphTableName,
strategy, strategy,
Relation.class); Relation.class);
break; break;
case "software": case SOFTWARE:
processWith(spark, processGraphTable(spark,
String.format("%s/%s", inputGraphPath, graphTableName), inputGraphTablePath,
inputActionSetPaths, inputActionSetPaths,
outputGraphPath, outputGraphTablePath,
graphTableName,
strategy, strategy,
Software.class); Software.class);
break; break;
@ -160,91 +166,74 @@ public class PromoteActionSetFromHDFSJob {
} }
} }
private static final StructType KV_SCHEMA = StructType$.MODULE$.apply( private static <T extends Oaf> void processGraphTable(SparkSession spark,
Arrays.asList( String inputGraphTablePath,
StructField$.MODULE$.apply("key", DataTypes.StringType, false, Metadata.empty()), String inputActionSetPaths,
StructField$.MODULE$.apply("value", DataTypes.StringType, false, Metadata.empty()) String outputGraphTablePath,
)); OafMergeAndGet.Strategy strategy,
Class<T> clazz) {
Dataset<T> tableDS = readGraphTable(spark, inputGraphTablePath, clazz)
.cache();
Dataset<T> actionPayloadDS = readActionSetPayloads(spark, inputActionSetPaths, clazz)
.cache();
private static <T> Dataset<T> readGraphTable(SparkSession spark, String path, Class<T> clazz) { Dataset<T> result = promoteActionSetForGraphTable(tableDS, actionPayloadDS, strategy, clazz)
.map((MapFunction<T, T>) value -> value, Encoders.bean(clazz));
saveGraphTableAsParquet(result, outputGraphTablePath);
}
private static <T extends Oaf> Dataset<T> readGraphTable(SparkSession spark,
String inputGraphTablePath,
Class<T> clazz) {
JavaRDD<Row> rows = JavaSparkContext JavaRDD<Row> rows = JavaSparkContext
.fromSparkContext(spark.sparkContext()) .fromSparkContext(spark.sparkContext())
.sequenceFile(path, Text.class, Text.class) .sequenceFile(inputGraphTablePath, Text.class, Text.class)
.map(x -> RowFactory.create(x._1().toString(), x._2().toString())); .map(x -> RowFactory.create(x._1().toString(), x._2().toString()));
return spark.createDataFrame(rows, KV_SCHEMA) return spark.createDataFrame(rows, KV_SCHEMA)
.map((MapFunction<Row, T>) row -> new ObjectMapper().readValue(row.<String>getAs("value"), clazz), .map((MapFunction<Row, T>) row -> new ObjectMapper().readValue(row.<String>getAs("value"), clazz),
Encoders.kryo(clazz)); Encoders.bean(clazz));
} }
private static Dataset<String> readActionSetPayloads(SparkSession spark, String inputActionSetPaths) { private static <T extends Oaf> Dataset<T> readActionSetPayloads(SparkSession spark,
String inputActionSetPaths,
Class<T> clazz) {
return Arrays return Arrays
.stream(inputActionSetPaths.split(",")) .stream(inputActionSetPaths.split(","))
.map(inputActionSetPath -> readActionSetPayload(spark, inputActionSetPath)) .map(inputActionSetPath -> readActionSetPayload(spark, inputActionSetPath, clazz))
.reduce(Dataset::union) .reduce(Dataset::union)
.get(); .orElseThrow(() -> new RuntimeException("error reading action sets: " + inputActionSetPaths));
} }
private static Dataset<String> readActionSetPayload(SparkSession spark, String inputActionSetPath) { private static <T extends Oaf> Dataset<T> readActionSetPayload(SparkSession spark,
String inputActionSetPath,
Class<T> clazz) {
JavaRDD<Row> actionsRDD = JavaSparkContext JavaRDD<Row> actionsRDD = JavaSparkContext
.fromSparkContext(spark.sparkContext()) .fromSparkContext(spark.sparkContext())
.sequenceFile(inputActionSetPath, Text.class, Text.class) .sequenceFile(inputActionSetPath, Text.class, Text.class)
.map(x -> RowFactory.create(x._1().toString(), x._2().toString())); .map(x -> RowFactory.create(x._1().toString(), x._2().toString()));
SerializableSupplier<BiFunction<String, Class<T>, T>> actionPayloadToOafFn = () -> (json, c) -> {
try {
return new ObjectMapper().disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES).readValue(json, c);
} catch (IOException e) {
return null;
}
};
return spark.createDataFrame(actionsRDD, KV_SCHEMA) return spark.createDataFrame(actionsRDD, KV_SCHEMA)
.select(unbase64(get_json_object(col("value"), "$.TargetValue")) .select(unbase64(get_json_object(col("value"), "$.TargetValue"))
.cast(DataTypes.StringType).as("target_value_json")) .cast(DataTypes.StringType).as("target_value_json"))
.as(Encoders.STRING()); .as(Encoders.STRING())
.map((MapFunction<String, T>) value -> actionPayloadToOafFn.get().apply(value, clazz), Encoders.bean(clazz))
.filter((FilterFunction<T>) Objects::nonNull);
} }
private static <T extends Oaf> void processWith(SparkSession spark, private static <T extends Oaf> Dataset<T> promoteActionSetForGraphTable(Dataset<T> tableDS,
String inputGraphTablePath, Dataset<T> actionPayloadDS,
String inputActionSetPaths, OafMergeAndGet.Strategy strategy,
String outputGraphPath, Class<T> clazz) {
String graphTableName,
OafMergeAndGet.Strategy strategy,
Class<T> clazz) {
// System.out.println("===== tableDS =====");
Dataset<T> tableDS = readGraphTable(spark, inputGraphTablePath, clazz)
.cache();
// tableDS.printSchema();
// tableDS.show();
// tableDS.explain();
// System.out.println("DEBUG: tableDS.partitions=" + tableDS.rdd().getNumPartitions());
// System.out.println("===== actionPayloadDS =====");
Dataset<String> actionPayloadDS = readActionSetPayloads(spark, inputActionSetPaths)
.cache();
// actionPayloadDS.printSchema();
// actionPayloadDS.show();
// actionPayloadDS.explain();
// System.out.println("DEBUG: actionPayloadDS.partitions=" + actionPayloadDS.rdd().getNumPartitions());
// System.out.println("===== processed =====");
Dataset<T> processed = processGraphTable(tableDS, actionPayloadDS, strategy, clazz);
// processed.printSchema();
// processed.show();
// processed.explain();
// System.out.println("DEBUG: processed.partitions=" + processed.rdd().getNumPartitions());
// System.out.println("===== result =====");
Dataset<T> result = processed
.map((MapFunction<T, T>) value -> value, Encoders.bean(clazz));
// result.printSchema();
// result.show();
// result.explain();
// System.out.println("DEBUG: result.partitions=" + result.rdd().getNumPartitions());
String outputGraphTablePath = String.format("%s/%s", outputGraphPath, graphTableName);
result.write()
.format("parquet")
.save(outputGraphTablePath);
}
private static <T extends Oaf> Dataset<T> processGraphTable(Dataset<T> tableDS,
Dataset<String> actionPayloadDS,
OafMergeAndGet.Strategy strategy,
Class<T> clazz) {
SerializableSupplier<Function<T, String>> oafIdFn = () -> x -> { SerializableSupplier<Function<T, String>> oafIdFn = () -> x -> {
if (x instanceof Relation) { if (x instanceof Relation) {
Relation r = (Relation) x; Relation r = (Relation) x;
@ -274,13 +263,6 @@ public class PromoteActionSetFromHDFSJob {
tableDS, tableDS,
actionPayloadDS, actionPayloadDS,
oafIdFn, oafIdFn,
() -> (json, c) -> {
try {
return new ObjectMapper().disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES).readValue(json, clazz);
} catch (IOException e) {
return null;
}
},
mergeAndGetFn, mergeAndGetFn,
clazz); clazz);
@ -292,4 +274,10 @@ public class PromoteActionSetFromHDFSJob {
clazz clazz
); );
} }
private static <T extends Oaf> void saveGraphTableAsParquet(Dataset<T> result, String outputGraphTablePath) {
result.write()
.format("parquet")
.save(outputGraphTablePath);
}
} }

View File

@ -109,67 +109,67 @@ public class PromoteActionSetFromHDFSJobTest {
@Test @Test
public void shouldReadActionsFromHDFSAndPromoteThemForDatasetsUsingMergeFromStrategy() throws Exception { public void shouldReadActionsFromHDFSAndPromoteThemForDatasetsUsingMergeFromStrategy() throws Exception {
readActionsFromHDFSAndPromoteThemFor("dataset", readActionsFromHDFSAndPromoteThemFor(PromoteActionSetFromHDFSJob.GraphTableName.DATASET,
OafMergeAndGet.Strategy.MERGE_FROM_AND_GET, OafMergeAndGet.Strategy.MERGE_FROM_AND_GET,
eu.dnetlib.dhp.schema.oaf.Dataset.class); eu.dnetlib.dhp.schema.oaf.Dataset.class);
} }
@Test @Test
public void shouldReadActionsFromHDFSAndPromoteThemForDatasourcesUsingMergeFromStrategy() throws Exception { public void shouldReadActionsFromHDFSAndPromoteThemForDatasourcesUsingMergeFromStrategy() throws Exception {
readActionsFromHDFSAndPromoteThemFor("datasource", readActionsFromHDFSAndPromoteThemFor(PromoteActionSetFromHDFSJob.GraphTableName.DATASOURCE,
OafMergeAndGet.Strategy.MERGE_FROM_AND_GET, OafMergeAndGet.Strategy.MERGE_FROM_AND_GET,
Datasource.class); Datasource.class);
} }
@Test @Test
public void shouldReadActionsFromHDFSAndPromoteThemForOrganizationUsingMergeFromStrategy() throws Exception { public void shouldReadActionsFromHDFSAndPromoteThemForOrganizationsUsingMergeFromStrategy() throws Exception {
readActionsFromHDFSAndPromoteThemFor("organization", readActionsFromHDFSAndPromoteThemFor(PromoteActionSetFromHDFSJob.GraphTableName.ORGANIZATION,
OafMergeAndGet.Strategy.MERGE_FROM_AND_GET, OafMergeAndGet.Strategy.MERGE_FROM_AND_GET,
Organization.class); Organization.class);
} }
@Test @Test
public void shouldReadActionsFromHDFSAndPromoteThemForOtherResearchProductUsingMergeFromStrategy() throws Exception { public void shouldReadActionsFromHDFSAndPromoteThemForOtherResearchProductsUsingMergeFromStrategy() throws Exception {
readActionsFromHDFSAndPromoteThemFor("otherresearchproduct", readActionsFromHDFSAndPromoteThemFor(PromoteActionSetFromHDFSJob.GraphTableName.OTHERRESEARCHPRODUCT,
OafMergeAndGet.Strategy.MERGE_FROM_AND_GET, OafMergeAndGet.Strategy.MERGE_FROM_AND_GET,
OtherResearchProduct.class); OtherResearchProduct.class);
} }
@Test @Test
public void shouldReadActionsFromHDFSAndPromoteThemForProjectUsingMergeFromStrategy() throws Exception { public void shouldReadActionsFromHDFSAndPromoteThemForProjectsUsingMergeFromStrategy() throws Exception {
readActionsFromHDFSAndPromoteThemFor("project", readActionsFromHDFSAndPromoteThemFor(PromoteActionSetFromHDFSJob.GraphTableName.PROJECT,
OafMergeAndGet.Strategy.MERGE_FROM_AND_GET, OafMergeAndGet.Strategy.MERGE_FROM_AND_GET,
Project.class); Project.class);
} }
@Test @Test
public void shouldReadActionsFromHDFSAndPromoteThemForPublicationUsingMergeFromStrategy() throws Exception { public void shouldReadActionsFromHDFSAndPromoteThemForPublicationsUsingMergeFromStrategy() throws Exception {
readActionsFromHDFSAndPromoteThemFor("publication", readActionsFromHDFSAndPromoteThemFor(PromoteActionSetFromHDFSJob.GraphTableName.PUBLICATION,
OafMergeAndGet.Strategy.MERGE_FROM_AND_GET, OafMergeAndGet.Strategy.MERGE_FROM_AND_GET,
Publication.class); Publication.class);
} }
@Test @Test
public void shouldReadActionsFromHDFSAndPromoteThemForRelationUsingMergeFromStrategy() throws Exception { public void shouldReadActionsFromHDFSAndPromoteThemForRelationsUsingMergeFromStrategy() throws Exception {
readActionsFromHDFSAndPromoteThemFor("relation", readActionsFromHDFSAndPromoteThemFor(PromoteActionSetFromHDFSJob.GraphTableName.RELATION,
OafMergeAndGet.Strategy.MERGE_FROM_AND_GET, OafMergeAndGet.Strategy.MERGE_FROM_AND_GET,
Relation.class); Relation.class);
} }
@Test @Test
public void shouldReadActionsFromHDFSAndPromoteThemForSoftwareUsingMergeFromStrategy() throws Exception { public void shouldReadActionsFromHDFSAndPromoteThemForSoftwaresUsingMergeFromStrategy() throws Exception {
readActionsFromHDFSAndPromoteThemFor("software", readActionsFromHDFSAndPromoteThemFor(PromoteActionSetFromHDFSJob.GraphTableName.SOFTWARE,
OafMergeAndGet.Strategy.MERGE_FROM_AND_GET, OafMergeAndGet.Strategy.MERGE_FROM_AND_GET,
Software.class); Software.class);
} }
private <T extends Oaf> void readActionsFromHDFSAndPromoteThemFor(String graphTableName, private <T extends Oaf> void readActionsFromHDFSAndPromoteThemFor(PromoteActionSetFromHDFSJob.GraphTableName graphTableName,
OafMergeAndGet.Strategy strategy, OafMergeAndGet.Strategy strategy,
Class<T> clazz) throws Exception { Class<T> clazz) throws Exception {
// given // given
String inputGraphTableJsonDumpPath = String inputGraphTableJsonDumpPath =
String.format("%s/%s.json", "eu/dnetlib/dhp/actionmanager/input/graph", graphTableName); String.format("%s/%s.json", "eu/dnetlib/dhp/actionmanager/input/graph", graphTableName.name().toLowerCase());
createGraphTableFor(inputGraphTableJsonDumpPath, graphTableName, clazz); createGraphTableFor(inputGraphTableJsonDumpPath, graphTableName.name().toLowerCase(), clazz);
String inputActionSetPaths = createActionSets(); String inputActionSetPaths = createActionSets();
Path outputGraphDir = outputDir.resolve("graph"); Path outputGraphDir = outputDir.resolve("graph");
@ -178,13 +178,13 @@ public class PromoteActionSetFromHDFSJobTest {
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-inputGraphPath", inputGraphDir.toString(), "-inputGraphPath", inputGraphDir.toString(),
"-inputActionSetPaths", inputActionSetPaths, "-inputActionSetPaths", inputActionSetPaths,
"-graphTableName", graphTableName, "-graphTableName", graphTableName.name(),
"-outputGraphPath", outputGraphDir.toString(), "-outputGraphPath", outputGraphDir.toString(),
"mergeAndGetStrategy", strategy.name() "-mergeAndGetStrategy", strategy.name()
}); });
// then // then
Path outputGraphTableDir = outputGraphDir.resolve(graphTableName); Path outputGraphTableDir = outputGraphDir.resolve(graphTableName.name().toLowerCase());
assertTrue(Files.exists(outputGraphDir)); assertTrue(Files.exists(outputGraphDir));
List<T> outputGraphTableRows = readGraphTableFromParquet(outputGraphTableDir.toString(), clazz).collectAsList(); List<T> outputGraphTableRows = readGraphTableFromParquet(outputGraphTableDir.toString(), clazz).collectAsList();
@ -193,7 +193,7 @@ public class PromoteActionSetFromHDFSJobTest {
assertEquals(10, outputGraphTableRows.size()); assertEquals(10, outputGraphTableRows.size());
String expectedOutputGraphTableJsonDumpPath = String expectedOutputGraphTableJsonDumpPath =
String.format("%s/%s/%s.json", "eu/dnetlib/dhp/actionmanager/output/graph", strategy.name().toLowerCase(), graphTableName); String.format("%s/%s/%s.json", "eu/dnetlib/dhp/actionmanager/output/graph", strategy.name().toLowerCase(), graphTableName.name().toLowerCase());
Path expectedOutputGraphTableJsonDumpFile = Paths Path expectedOutputGraphTableJsonDumpFile = Paths
.get(Objects.requireNonNull(cl.getResource(expectedOutputGraphTableJsonDumpPath)).getFile()); .get(Objects.requireNonNull(cl.getResource(expectedOutputGraphTableJsonDumpPath)).getFile());
List<T> expectedOutputGraphTableRows = readGraphTableFromJSON( List<T> expectedOutputGraphTableRows = readGraphTableFromJSON(