From a5f23d8a4c42206c8953c16ccab000dfb5a3fc03 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 7 Aug 2020 15:33:07 +0200 Subject: [PATCH] WIP: materialize graph as Hive DB --- .../main/java/eu/dnetlib/dhp/common/GraphSupport.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/GraphSupport.java b/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/GraphSupport.java index c2952677c..475bd21f0 100644 --- a/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/GraphSupport.java +++ b/dhp-workflows/dhp-workflows-common/src/main/java/eu/dnetlib/dhp/common/GraphSupport.java @@ -65,6 +65,8 @@ public class GraphSupport { log.info("reading graph {}, format {}, class {}", graph, format, clazz); + Encoder encoder = Encoders.bean(clazz); + switch (format) { case JSON: String path = graph + "/" + clazz.getSimpleName().toLowerCase(); @@ -74,21 +76,18 @@ public class GraphSupport { .textFile(path) .map( (MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), - Encoders.bean(clazz)) + encoder) .filter((FilterFunction) value -> Objects.nonNull(ModelSupport.idFn().apply(value))); case HIVE: String table = ModelSupport.tableIdentifier(graph, clazz); log.info("reading table {}", table); - return spark.read().table(table).as(Encoders.bean(clazz)); + + return spark.table(table).as(encoder); default: throw new IllegalStateException(String.format("format not managed: '%s'", format)); } } - public static Dataset readGraphPARQUET(SparkSession spark, String graph, Class clazz) { - return readGraph(spark, graph, clazz, GraphFormat.HIVE); - } - public static Dataset readGraphJSON(SparkSession spark, String graph, Class clazz) { return readGraph(spark, graph, clazz, GraphFormat.JSON); }