diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob2.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob2.java index 37d4a52715..cd37e79eb3 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob2.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob2.java @@ -50,16 +50,6 @@ public class SparkCountryPropagationJob2 { final String resultClassName = parser.get("resultTableName"); log.info("resultTableName: {}", resultClassName); - final String resultType = - resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase(); - log.info("resultType: {}", resultType); - - final Boolean writeUpdates = - Optional.ofNullable(parser.get("writeUpdate")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("writeUpdate: {}", writeUpdates); - final Boolean saveGraph = Optional.ofNullable(parser.get("saveGraph")) .map(Boolean::valueOf) @@ -76,17 +66,12 @@ public class SparkCountryPropagationJob2 { conf, isSparkSessionManaged, spark -> { - // createOutputDirs(outputPath, - // FileSystem.get(spark.sparkContext().hadoopConfiguration())); - removeOutputDir(spark, outputPath); execPropagation( spark, datasourcecountrypath, inputPath, outputPath, resultClazz, - resultType, - writeUpdates, saveGraph); }); } @@ -97,12 +82,10 @@ public class SparkCountryPropagationJob2 { String inputPath, String outputPath, Class resultClazz, - String resultType, - boolean writeUpdates, boolean saveGraph) { final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - // Load parque file with preprocessed association datasource - country + // Load file with preprocessed association datasource - country Dataset datasourcecountryassoc = readAssocDatasourceCountry(spark, datasourcecountrypath); // broadcasting the result of the preparation step @@ -114,10 +97,6 @@ public class SparkCountryPropagationJob2 { spark, inputPath, resultClazz, broadcast_datasourcecountryassoc) .as(Encoders.bean(ResultCountrySet.class)); - if (writeUpdates) { - writeUpdates(potentialUpdates, outputPath + "/update_" + resultType); - } - if (saveGraph) { updateResultTable(spark, potentialUpdates, inputPath, resultClazz, outputPath); } @@ -138,11 +117,6 @@ public class SparkCountryPropagationJob2 { r -> new Tuple2<>(r.getId(), r), Encoders.tuple(Encoders.STRING(), Encoders.bean(resultClazz))); - // Dataset> potential_update_pair = - // potentialUpdates.map(pu -> new Tuple2<>(pu.getResultId(), - // pu), - // Encoders.tuple(Encoders.STRING(), Encoders.bean(ResultCountrySet.class))); - Dataset new_table = result_pair .joinWith( @@ -184,10 +158,6 @@ public class SparkCountryPropagationJob2 { log.info("Saving graph table to path: {}", outputPath); // log.info("number of saved recordsa: {}", new_table.count()); new_table.toJSON().write().option("compression", "gzip").text(outputPath); - // .toJavaRDD() - // .map(r -> OBJECT_MAPPER.writeValueAsString(r)) - // .saveAsTextFile(outputPath , GzipCodec.class); - } private static Dataset getPotentialResultToUpdate( @@ -203,18 +173,6 @@ public class SparkCountryPropagationJob2 { return countryPropagationAssoc(spark, broadcast_datasourcecountryassoc); } - // private static void createCfHbforresult(SparkSession spark) { - // String query; - // query = "SELECT id, inst.collectedfrom.key cf , inst.hostedby.key hb " + - // "FROM ( SELECT id, instance " + - // "FROM result " + - // " WHERE datainfo.deletedbyinference = false) ds " + - // "LATERAL VIEW EXPLODE(instance) i AS inst"; - // Dataset cfhb = spark.sql(query); - // cfhb.createOrReplaceTempView("cfhb"); - // //log.info("cfhb_number : {}", cfhb.count()); - // } - private static Dataset countryPropagationAssoc( SparkSession spark, Broadcast> broadcast_datasourcecountryassoc) { @@ -248,16 +206,4 @@ public class SparkCountryPropagationJob2 { value -> OBJECT_MAPPER.readValue(value, DatasourceCountry.class), Encoders.bean(DatasourceCountry.class)); } - - private static void writeUpdates( - Dataset potentialUpdates, String outputPath) { - potentialUpdates - .toJSON() - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .text(outputPath); - // map(u -> OBJECT_MAPPER.writeValueAsString(u)) - // .saveAsTextFile(outputPath, GzipCodec.class); - } }