From d147295c2f9b531b8da7973436694c0354c79ba9 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 21 Oct 2021 14:15:57 +0200 Subject: [PATCH] avoiding java.io.NotSerializableException: java.util.HashMap --- .../dhp/oa/graph/raw/CopyHdfsOafApplication.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java index 792264e18..31ebcbc6e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java @@ -31,6 +31,7 @@ import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import scala.Tuple2; public class CopyHdfsOafApplication extends AbstractMigrationApplication { @@ -73,10 +74,13 @@ public class CopyHdfsOafApplication extends AbstractMigrationApplication { conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.registerKryoClasses(ModelSupport.getOafModelClasses()); - runWithSparkSession(conf, isSparkSessionManaged, spark -> processPaths(spark, hdfsPath, paths)); + final List oafTypes = Lists.newArrayList(ModelSupport.oafTypes.keySet()); + + runWithSparkSession(conf, isSparkSessionManaged, spark -> processPaths(spark, oafTypes, hdfsPath, paths)); } public static void processPaths(final SparkSession spark, + final List oafTypes, final String outputPath, final Set paths) { @@ -99,16 +103,16 @@ public class CopyHdfsOafApplication extends AbstractMigrationApplication { .as(Encoders.kryo(Oaf.class)); // dispatch each entity type individually in the respective graph subdirectory in append mode - for (Map.Entry e : ModelSupport.oafTypes.entrySet()) { + for (String type : oafTypes) { oaf - .filter((FilterFunction) o -> o.getClass().getSimpleName().toLowerCase().equals(e.getKey())) + .filter((FilterFunction) o -> o.getClass().getSimpleName().toLowerCase().equals(type)) .map((MapFunction) OBJECT_MAPPER::writeValueAsString, Encoders.STRING()) .write() .option("compression", "gzip") .mode(SaveMode.Append) - .text(outputPath + "/" + e.getKey()); + .text(outputPath + "/" + type); } } } - + }