From ff30f99c65c04bad95af6dc887797781f6d322df Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 15 Apr 2020 16:16:20 +0200 Subject: [PATCH] using newline delimited json files for the raw graph materialization. Introduced contentPath parameter --- .../raw/DispatchEntitiesApplication.java | 6 ++-- .../oa/graph/raw/MergeClaimsApplication.java | 10 +++++- .../oa/graph/raw_all/oozie_app/workflow.xml | 32 +++++++++++-------- 3 files changed, 30 insertions(+), 18 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/DispatchEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/DispatchEntitiesApplication.java index 0b47db588b..4812f1c304 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/DispatchEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/DispatchEntitiesApplication.java @@ -64,6 +64,7 @@ public class DispatchEntitiesApplication { log.info(String.format("Processing entities (%s) in file: %s", type, sourcePath)); + /* spark.read() .textFile(sourcePath) .filter((FilterFunction) value -> isEntityType(value, type)) @@ -73,14 +74,13 @@ public class DispatchEntitiesApplication { .mode(SaveMode.Overwrite) .parquet(targetPath + "/" + type); - /* + */ + JavaSparkContext.fromSparkContext(spark.sparkContext()) .textFile(sourcePath) .filter(l -> isEntityType(l, type)) .map(l -> StringUtils.substringAfter(l, "|")) .saveAsTextFile(targetPath + "/" + type, GzipCodec.class); // use repartition(XXX) ??? - - */ } private static boolean isEntityType(final String line, final String type) { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MergeClaimsApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MergeClaimsApplication.java index 85e4f3663b..4b209c68ab 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MergeClaimsApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MergeClaimsApplication.java @@ -100,16 +100,24 @@ public class MergeClaimsApplication { return opRaw.isPresent() ? opRaw.get()._2() : opClaim.isPresent() ? opClaim.get()._2() : null; }, Encoders.bean(clazz)) .filter(Objects::nonNull) + .map((MapFunction) value -> OBJECT_MAPPER.writeValueAsString(value), Encoders.STRING()) .write() .mode(SaveMode.Overwrite) - .parquet(outPath); + .option("compression", "gzip") + .text(outPath); } private static Dataset readFromPath(SparkSession spark, String path, Class clazz) { + return spark.read() + .textFile(path) + .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)) + .filter((FilterFunction) value -> Objects.nonNull(idFn().apply(value))); + /* return spark.read() .load(path) .as(Encoders.bean(clazz)) .filter((FilterFunction) value -> Objects.nonNull(idFn().apply(value))); + */ } private static void removeOutputDir(SparkSession spark, String path) { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml index d33bb0211c..9f91380abb 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml @@ -10,6 +10,10 @@ false should import content from the aggregator or reuse a previous version + + contentPath + path location to store (or reuse) content from the aggregator + postgresURL the postgres URL to access to the database @@ -108,10 +112,10 @@ - + eu.dnetlib.dhp.oa.graph.raw.MigrateDbEntitiesApplication - -p${workingDir}/db_claims + -p${contentPath}/db_claims -pgurl${postgresURL} -pguser${postgresUser} -pgpasswd${postgresPassword} @@ -124,10 +128,10 @@ - + eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication - -p${workingDir}/odf_claims + -p${contentPath}/odf_claims -mongourl${mongoURL} -mongodb${mongoDb} -fODF @@ -141,10 +145,10 @@ - + eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication - -p${workingDir}/oaf_claims + -p${contentPath}/oaf_claims -mongourl${mongoURL} -mongodb${mongoDb} -fOAF @@ -158,10 +162,10 @@ - + eu.dnetlib.dhp.oa.graph.raw.MigrateDbEntitiesApplication - -p${workingDir}/db_records + -p${contentPath}/db_records -pgurl${postgresURL} -pguser${postgresUser} -pgpasswd${postgresPassword} @@ -173,10 +177,10 @@ - + eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication - -p${workingDir}/odf_records + -p${contentPath}/odf_records -mongourl${mongoURL} -mongodb${mongoDb} -fODF @@ -190,10 +194,10 @@ - + eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication - -p${workingDir}/oaf_records + -p${contentPath}/oaf_records -mongourl${mongoURL} -mongodb${mongoDb} -fOAF @@ -227,7 +231,7 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - -s${workingDir}/db_claims,${workingDir}/oaf_claims,${workingDir}/odf_claims + -s${contentPath}/db_claims,${contentPath}/oaf_claims,${contentPath}/odf_claims -t${workingDir}/entities_claim -pgurl${postgresURL} -pguser${postgresUser} @@ -276,7 +280,7 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - -s${workingDir}/db_records,${workingDir}/oaf_records,${workingDir}/odf_records + -s${contentPath}/db_records,${contentPath}/oaf_records,${contentPath}/odf_records -t${workingDir}/entities -pgurl${postgresURL} -pguser${postgresUser}