diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/PrepareDatasourceCountryAssociation.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/PrepareDatasourceCountryAssociation.java index 2de536222..0604bb019 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/PrepareDatasourceCountryAssociation.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/PrepareDatasourceCountryAssociation.java @@ -1,47 +1,43 @@ package eu.dnetlib.dhp.countrypropagation; +import static eu.dnetlib.dhp.PropagationConstant.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; + import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.*; +import java.util.Arrays; +import java.util.List; import org.apache.commons.io.IOUtils; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.SaveMode; - -import java.util.Arrays; -import java.util.List; -import java.util.Optional; - - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static eu.dnetlib.dhp.PropagationConstant.*; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; /** - * For the association of the country to the datasource - * The association is computed only for datasource of specific type or having whitelisted ids - * The country is registered in the Organization associated to the Datasource, so the - * relation provides between Datasource and Organization is exploited to get the country for the datasource + * For the association of the country to the datasource The association is computed only for + * datasource of specific type or having whitelisted ids The country is registered in the + * Organization associated to the Datasource, so the relation provides between Datasource and + * Organization is exploited to get the country for the datasource */ - public class PrepareDatasourceCountryAssociation { - private static final Logger log = LoggerFactory.getLogger(PrepareDatasourceCountryAssociation.class); + private static final Logger log = + LoggerFactory.getLogger(PrepareDatasourceCountryAssociation.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils.toString(PrepareDatasourceCountryAssociation.class - .getResourceAsStream("/eu/dnetlib/dhp/countrypropagation/input_prepareassoc_parameters.json")); + String jsonConfiguration = + IOUtils.toString( + PrepareDatasourceCountryAssociation.class.getResourceAsStream( + "/eu/dnetlib/dhp/countrypropagation/input_prepareassoc_parameters.json")); - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - jsonConfiguration); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); @@ -57,73 +53,83 @@ public class PrepareDatasourceCountryAssociation { SparkConf conf = new SparkConf(); conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); - - - runWithSparkHiveSession(conf, isSparkSessionManaged, + runWithSparkHiveSession( + conf, + isSparkSessionManaged, spark -> { removeOutputDir(spark, outputPath); - prepareDatasourceCountryAssociation(spark, + prepareDatasourceCountryAssociation( + spark, Arrays.asList(parser.get("whitelist").split(";")), Arrays.asList(parser.get("allowedtypes").split(";")), inputPath, outputPath); - }); - - } - - - private static void prepareDatasourceCountryAssociation(SparkSession spark, - List whitelist, - List allowedtypes, - String inputPath, - String outputPath) { + private static void prepareDatasourceCountryAssociation( + SparkSession spark, + List whitelist, + List allowedtypes, + String inputPath, + String outputPath) { String whitelisted = ""; - for (String i : whitelist){ + for (String i : whitelist) { whitelisted += " OR id = '" + i + "'"; } final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + Dataset datasource = + spark.createDataset( + sc.textFile(inputPath + "/datasource") + .map(item -> OBJECT_MAPPER.readValue(item, Datasource.class)) + .rdd(), + Encoders.bean(Datasource.class)); - Dataset datasource = spark.createDataset(sc.textFile(inputPath + "/datasource") - .map(item -> OBJECT_MAPPER.readValue(item, Datasource.class)).rdd(), Encoders.bean(Datasource.class)); + Dataset relation = + spark.createDataset( + sc.textFile(inputPath + "/relation") + .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)) + .rdd(), + Encoders.bean(Relation.class)); - Dataset relation = spark.createDataset(sc.textFile(inputPath + "/relation") - .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)).rdd(), Encoders.bean(Relation.class)); - - Dataset organization = spark.createDataset(sc.textFile(inputPath + "/organization") - .map(item -> OBJECT_MAPPER.readValue(item, Organization.class)).rdd(), Encoders.bean(Organization.class)); + Dataset organization = + spark.createDataset( + sc.textFile(inputPath + "/organization") + .map(item -> OBJECT_MAPPER.readValue(item, Organization.class)) + .rdd(), + Encoders.bean(Organization.class)); datasource.createOrReplaceTempView("datasource"); relation.createOrReplaceTempView("relation"); organization.createOrReplaceTempView("organization"); - String query = "SELECT source dataSourceId, named_struct('classid', country.classid, 'classname', country.classname) country " + - "FROM ( SELECT id " + - " FROM datasource " + - " WHERE (datainfo.deletedbyinference = false " + whitelisted + ") " + - getConstraintList("datasourcetype.classid = '", allowedtypes) + ") d " + - "JOIN ( SELECT source, target " + - " FROM relation " + - " WHERE relclass = '" + RELATION_DATASOURCE_ORGANIZATION_REL_CLASS + "' " + - " AND datainfo.deletedbyinference = false ) rel " + - "ON d.id = rel.source " + - "JOIN (SELECT id, country " + - " FROM organization " + - " WHERE datainfo.deletedbyinference = false " + - " AND length(country.classid)>0) o " + - "ON o.id = rel.target"; + String query = + "SELECT source dataSourceId, named_struct('classid', country.classid, 'classname', country.classname) country " + + "FROM ( SELECT id " + + " FROM datasource " + + " WHERE (datainfo.deletedbyinference = false " + + whitelisted + + ") " + + getConstraintList("datasourcetype.classid = '", allowedtypes) + + ") d " + + "JOIN ( SELECT source, target " + + " FROM relation " + + " WHERE relclass = '" + + RELATION_DATASOURCE_ORGANIZATION_REL_CLASS + + "' " + + " AND datainfo.deletedbyinference = false ) rel " + + "ON d.id = rel.source " + + "JOIN (SELECT id, country " + + " FROM organization " + + " WHERE datainfo.deletedbyinference = false " + + " AND length(country.classid)>0) o " + + "ON o.id = rel.target"; spark.sql(query) .as(Encoders.bean(DatasourceCountry.class)) .toJavaRDD() .map(c -> OBJECT_MAPPER.writeValueAsString(c)) .saveAsTextFile(outputPath, GzipCodec.class); - - } - - } diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob2.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob2.java index 643373a1c..37d4a5271 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob2.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob2.java @@ -1,28 +1,22 @@ package eu.dnetlib.dhp.countrypropagation; +import static eu.dnetlib.dhp.PropagationConstant.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; + import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.*; +import java.util.*; import org.apache.commons.io.IOUtils; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.*; import org.apache.spark.sql.Dataset; -import scala.Tuple2; - -import java.util.*; - -import static eu.dnetlib.dhp.PropagationConstant.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; +import scala.Tuple2; public class SparkCountryPropagationJob2 { @@ -30,14 +24,14 @@ public class SparkCountryPropagationJob2 { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils.toString(SparkCountryPropagationJob2.class - .getResourceAsStream("/eu/dnetlib/dhp/countrypropagation/input_countrypropagation_parameters.json")); + String jsonConfiguration = + IOUtils.toString( + SparkCountryPropagationJob2.class.getResourceAsStream( + "/eu/dnetlib/dhp/countrypropagation/input_countrypropagation_parameters.json")); - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - jsonConfiguration); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); @@ -56,182 +50,214 @@ public class SparkCountryPropagationJob2 { final String resultClassName = parser.get("resultTableName"); log.info("resultTableName: {}", resultClassName); - final String resultType = resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase(); + final String resultType = + resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase(); log.info("resultType: {}", resultType); - final Boolean writeUpdates = Optional - .ofNullable(parser.get("writeUpdate")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); + final Boolean writeUpdates = + Optional.ofNullable(parser.get("writeUpdate")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); log.info("writeUpdate: {}", writeUpdates); - final Boolean saveGraph = Optional - .ofNullable(parser.get("saveGraph")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); + final Boolean saveGraph = + Optional.ofNullable(parser.get("saveGraph")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); log.info("saveGraph: {}", saveGraph); - Class resultClazz = (Class) Class.forName(resultClassName); + Class resultClazz = + (Class) Class.forName(resultClassName); SparkConf conf = new SparkConf(); conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); - runWithSparkHiveSession(conf, isSparkSessionManaged, + runWithSparkHiveSession( + conf, + isSparkSessionManaged, spark -> { - //createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); + // createOutputDirs(outputPath, + // FileSystem.get(spark.sparkContext().hadoopConfiguration())); removeOutputDir(spark, outputPath); - execPropagation(spark, datasourcecountrypath, inputPath, outputPath, resultClazz, resultType, - writeUpdates, saveGraph); + execPropagation( + spark, + datasourcecountrypath, + inputPath, + outputPath, + resultClazz, + resultType, + writeUpdates, + saveGraph); }); - } - private static void execPropagation(SparkSession spark, String datasourcecountrypath, - String inputPath, String outputPath, Class resultClazz, String resultType, - boolean writeUpdates, boolean saveGraph){ + private static void execPropagation( + SparkSession spark, + String datasourcecountrypath, + String inputPath, + String outputPath, + Class resultClazz, + String resultType, + boolean writeUpdates, + boolean saveGraph) { final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - //Load parque file with preprocessed association datasource - country - Dataset datasourcecountryassoc = readAssocDatasourceCountry(spark, datasourcecountrypath); - //broadcasting the result of the preparation step - Broadcast> broadcast_datasourcecountryassoc = sc.broadcast(datasourcecountryassoc); + // Load parque file with preprocessed association datasource - country + Dataset datasourcecountryassoc = + readAssocDatasourceCountry(spark, datasourcecountrypath); + // broadcasting the result of the preparation step + Broadcast> broadcast_datasourcecountryassoc = + sc.broadcast(datasourcecountryassoc); - Dataset potentialUpdates = getPotentialResultToUpdate(spark, inputPath, resultClazz, - broadcast_datasourcecountryassoc).as(Encoders.bean(ResultCountrySet.class)); + Dataset potentialUpdates = + getPotentialResultToUpdate( + spark, inputPath, resultClazz, broadcast_datasourcecountryassoc) + .as(Encoders.bean(ResultCountrySet.class)); - if(writeUpdates){ + if (writeUpdates) { writeUpdates(potentialUpdates, outputPath + "/update_" + resultType); } - if(saveGraph){ + if (saveGraph) { updateResultTable(spark, potentialUpdates, inputPath, resultClazz, outputPath); - } } - private static void updateResultTable(SparkSession spark, Dataset potentialUpdates, - String inputPath, - Class resultClazz, - String outputPath) { + private static void updateResultTable( + SparkSession spark, + Dataset potentialUpdates, + String inputPath, + Class resultClazz, + String outputPath) { log.info("Reading Graph table from: {}", inputPath); Dataset result = readPathEntity(spark, inputPath, resultClazz); - Dataset> result_pair = result - .map(r -> new Tuple2<>(r.getId(), r), - Encoders.tuple(Encoders.STRING(), Encoders.bean(resultClazz))); + Dataset> result_pair = + result.map( + r -> new Tuple2<>(r.getId(), r), + Encoders.tuple(Encoders.STRING(), Encoders.bean(resultClazz))); -// Dataset> potential_update_pair = potentialUpdates.map(pu -> new Tuple2<>(pu.getResultId(), -// pu), -// Encoders.tuple(Encoders.STRING(), Encoders.bean(ResultCountrySet.class))); + // Dataset> potential_update_pair = + // potentialUpdates.map(pu -> new Tuple2<>(pu.getResultId(), + // pu), + // Encoders.tuple(Encoders.STRING(), Encoders.bean(ResultCountrySet.class))); - Dataset new_table = result_pair - .joinWith(potentialUpdates, result_pair.col("_1").equalTo(potentialUpdates.col("resultId")), "left_outer") - .map((MapFunction, ResultCountrySet>, R>) value -> { - R r = value._1()._2(); - Optional potentialNewCountries = Optional.ofNullable(value._2()); - if (potentialNewCountries.isPresent()) { - HashSet countries = new HashSet<>(); - for (Qualifier country : r.getCountry()) { - countries.add(country.getClassid()); - } - Result res = new Result(); - res.setId(r.getId()); - List countryList = new ArrayList<>(); - for (CountrySbs country : potentialNewCountries.get().getCountrySet()) { - if (!countries.contains(country.getClassid())) { - countryList.add(getCountry(country.getClassid(), country.getClassname())); - } - } - res.setCountry(countryList); - r.mergeFrom(res); - } - return r; - }, Encoders.bean(resultClazz)); + Dataset new_table = + result_pair + .joinWith( + potentialUpdates, + result_pair.col("_1").equalTo(potentialUpdates.col("resultId")), + "left_outer") + .map( + (MapFunction, ResultCountrySet>, R>) + value -> { + R r = value._1()._2(); + Optional potentialNewCountries = + Optional.ofNullable(value._2()); + if (potentialNewCountries.isPresent()) { + HashSet countries = new HashSet<>(); + for (Qualifier country : r.getCountry()) { + countries.add(country.getClassid()); + } + Result res = new Result(); + res.setId(r.getId()); + List countryList = new ArrayList<>(); + for (CountrySbs country : + potentialNewCountries + .get() + .getCountrySet()) { + if (!countries.contains(country.getClassid())) { + countryList.add( + getCountry( + country.getClassid(), + country.getClassname())); + } + } + res.setCountry(countryList); + r.mergeFrom(res); + } + return r; + }, + Encoders.bean(resultClazz)); - log.info("Saving graph table to path: {}", outputPath); - //log.info("number of saved recordsa: {}", new_table.count()); - new_table - .toJSON() - .write() - .option("compression", "gzip") - .text(outputPath); -// .toJavaRDD() -// .map(r -> OBJECT_MAPPER.writeValueAsString(r)) -// .saveAsTextFile(outputPath , GzipCodec.class); + log.info("Saving graph table to path: {}", outputPath); + // log.info("number of saved recordsa: {}", new_table.count()); + new_table.toJSON().write().option("compression", "gzip").text(outputPath); + // .toJavaRDD() + // .map(r -> OBJECT_MAPPER.writeValueAsString(r)) + // .saveAsTextFile(outputPath , GzipCodec.class); - } + } - - - private static Dataset getPotentialResultToUpdate(SparkSession spark, String inputPath, - Class resultClazz, - Broadcast> broadcast_datasourcecountryassoc) { + private static Dataset getPotentialResultToUpdate( + SparkSession spark, + String inputPath, + Class resultClazz, + Broadcast> broadcast_datasourcecountryassoc) { Dataset result = readPathEntity(spark, inputPath, resultClazz); result.createOrReplaceTempView("result"); - //log.info("number of results: {}", result.count()); + // log.info("number of results: {}", result.count()); createCfHbforresult(spark); return countryPropagationAssoc(spark, broadcast_datasourcecountryassoc); } + // private static void createCfHbforresult(SparkSession spark) { + // String query; + // query = "SELECT id, inst.collectedfrom.key cf , inst.hostedby.key hb " + + // "FROM ( SELECT id, instance " + + // "FROM result " + + // " WHERE datainfo.deletedbyinference = false) ds " + + // "LATERAL VIEW EXPLODE(instance) i AS inst"; + // Dataset cfhb = spark.sql(query); + // cfhb.createOrReplaceTempView("cfhb"); + // //log.info("cfhb_number : {}", cfhb.count()); + // } -// private static void createCfHbforresult(SparkSession spark) { -// String query; -// query = "SELECT id, inst.collectedfrom.key cf , inst.hostedby.key hb " + -// "FROM ( SELECT id, instance " + -// "FROM result " + -// " WHERE datainfo.deletedbyinference = false) ds " + -// "LATERAL VIEW EXPLODE(instance) i AS inst"; -// Dataset cfhb = spark.sql(query); -// cfhb.createOrReplaceTempView("cfhb"); -// //log.info("cfhb_number : {}", cfhb.count()); -// } - - - private static Dataset countryPropagationAssoc(SparkSession spark, - Broadcast> broadcast_datasourcecountryassoc){ + private static Dataset countryPropagationAssoc( + SparkSession spark, + Broadcast> broadcast_datasourcecountryassoc) { Dataset datasource_country = broadcast_datasourcecountryassoc.value(); datasource_country.createOrReplaceTempView("datasource_country"); - log.info("datasource_country number : {}",datasource_country.count()); + log.info("datasource_country number : {}", datasource_country.count()); - String query = "SELECT id resultId, collect_set(country) countrySet "+ - "FROM ( SELECT id, country " + - "FROM datasource_country " + - "JOIN cfhb " + - " ON cf = dataSourceId " + - "UNION ALL " + - "SELECT id , country " + - "FROM datasource_country " + - "JOIN cfhb " + - " ON hb = dataSourceId ) tmp " + - "GROUP BY id"; + String query = + "SELECT id resultId, collect_set(country) countrySet " + + "FROM ( SELECT id, country " + + "FROM datasource_country " + + "JOIN cfhb " + + " ON cf = dataSourceId " + + "UNION ALL " + + "SELECT id , country " + + "FROM datasource_country " + + "JOIN cfhb " + + " ON hb = dataSourceId ) tmp " + + "GROUP BY id"; Dataset potentialUpdates = spark.sql(query); - //log.info("potential update number : {}", potentialUpdates.count()); + // log.info("potential update number : {}", potentialUpdates.count()); return potentialUpdates; } - - - private static Dataset readAssocDatasourceCountry(SparkSession spark, String relationPath) { - return spark - .read() - .textFile(relationPath) - .map(value -> OBJECT_MAPPER.readValue(value, DatasourceCountry.class), Encoders.bean(DatasourceCountry.class)); + private static Dataset readAssocDatasourceCountry( + SparkSession spark, String relationPath) { + return spark.read() + .textFile(relationPath) + .map( + value -> OBJECT_MAPPER.readValue(value, DatasourceCountry.class), + Encoders.bean(DatasourceCountry.class)); } - private static void writeUpdates(Dataset potentialUpdates, String outputPath){ + private static void writeUpdates( + Dataset potentialUpdates, String outputPath) { potentialUpdates .toJSON() .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") .text(outputPath); -// map(u -> OBJECT_MAPPER.writeValueAsString(u)) -// .saveAsTextFile(outputPath, GzipCodec.class); + // map(u -> OBJECT_MAPPER.writeValueAsString(u)) + // .saveAsTextFile(outputPath, GzipCodec.class); } - - - -} \ No newline at end of file +}