diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/PrepareDatasourceCountryAssociation.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/PrepareDatasourceCountryAssociation.java index 887ddbc69..85d3e58c4 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/PrepareDatasourceCountryAssociation.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/PrepareDatasourceCountryAssociation.java @@ -5,6 +5,7 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.*; import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; @@ -14,14 +15,14 @@ import org.apache.spark.sql.SaveMode; import java.util.Arrays; import java.util.List; +import java.util.Optional; -import static eu.dnetlib.dhp.PropagationConstant.createOutputDirs; -import static eu.dnetlib.dhp.PropagationConstant.getConstraintList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +import static eu.dnetlib.dhp.PropagationConstant.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; /** * For the association of the country to the datasource * The association is computed only for datasource of specific type or having whitelisted ids @@ -30,48 +31,50 @@ import org.slf4j.LoggerFactory; */ public class PrepareDatasourceCountryAssociation { - private static final Logger log = LoggerFactory.getLogger(PrepareDatasourceCountryAssociation.class); + private static final Logger log = LoggerFactory.getLogger(PrepareDatasourceCountryAssociation.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils.toString(PrepareDatasourceCountryAssociation.class - .getResourceAsStream("/eu/dnetlib/dhp/countrypropagation/input_prepare_dc_assoc.json")); + .getResourceAsStream("/eu/dnetlib/dhp/countrypropagation/input_prepareassoc_parameters.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser( jsonConfiguration); parser.parseArgument(args); + Boolean isSparkSessionManaged = isSparkSessionManaged(parser); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + String inputPath = parser.get("sourcePath"); log.info("inputPath: {}", inputPath); + final String outputPath = parser.get("outputPath"); + log.info("outputPath {}: ", outputPath); + SparkConf conf = new SparkConf(); conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); - final SparkSession spark = SparkSession - .builder() - .appName(SparkCountryPropagationJob.class.getSimpleName()) - .master(parser.get("master")) - .config(conf) - .enableHiveSupport() - .getOrCreate(); - final String outputPath = "/tmp/provision/propagation/countrytoresultfrominstitutionalrepositories"; + runWithSparkHiveSession(conf, isSparkSessionManaged, + spark -> { + removeOutputDir(spark, outputPath); + prepareDatasourceCountryAssociation(spark, + Arrays.asList(parser.get("whitelist").split(";")), + Arrays.asList(parser.get("allowedtypes").split(";")), + inputPath, + outputPath); - - createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); - prepareDatasourceCountryAssociation(spark, - Arrays.asList(parser.get("whitelist").split(";")), - Arrays.asList(parser.get("allowedtypes").split(";")), - inputPath, - outputPath); + }); } + + private static void prepareDatasourceCountryAssociation(SparkSession spark, List whitelist, List allowedtypes, @@ -97,7 +100,7 @@ public class PrepareDatasourceCountryAssociation { relation.createOrReplaceTempView("relation"); organization.createOrReplaceTempView("organization"); - String query = "SELECT source ds, named_struct('classid', country.classid, 'classname', country.classname) country " + + String query = "SELECT source dataSourceId, named_struct('classid', country.classid, 'classname', country.classname) country " + "FROM ( SELECT id " + " FROM datasource " + " WHERE (datainfo.deletedbyinference = false " + whitelisted + ") " + @@ -115,9 +118,9 @@ public class PrepareDatasourceCountryAssociation { spark.sql(query) .as(Encoders.bean(DatasourceCountry.class)) - .write() - .mode(SaveMode.Overwrite) - .parquet(outputPath + "/prepared_datasource_country"); + .toJavaRDD() + .map(c -> OBJECT_MAPPER.writeValueAsString(c)) + .saveAsTextFile(outputPath, GzipCodec.class); }