From 540da4ab6124fc4e6c1aad2433550f24401f3b84 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 8 Apr 2020 13:04:04 +0200 Subject: [PATCH] new busuness logic with prepared info before actual job run --- .../countrypropagation/DatasourceCountry.java | 24 ++ .../PrepareResultCountryAssociation.java | 132 ++++++++ .../SparkCountryPropagationJob2.java | 287 ++++++++++++++++++ 3 files changed, 443 insertions(+) create mode 100644 dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/DatasourceCountry.java create mode 100644 dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/PrepareResultCountryAssociation.java create mode 100644 dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob2.java diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/DatasourceCountry.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/DatasourceCountry.java new file mode 100644 index 000000000..460764e16 --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/DatasourceCountry.java @@ -0,0 +1,24 @@ +package eu.dnetlib.dhp.countrypropagation; + +import java.io.Serializable; + +public class DatasourceCountry implements Serializable { + private String dataSourceId; + private String country; + + public String getDataSourceId() { + return dataSourceId; + } + + public void setDataSourceId(String dataSourceId) { + this.dataSourceId = dataSourceId; + } + + public String getCountry() { + return country; + } + + public void setCountry(String country) { + this.country = country; + } +} diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/PrepareResultCountryAssociation.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/PrepareResultCountryAssociation.java new file mode 100644 index 000000000..9572159ce --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/PrepareResultCountryAssociation.java @@ -0,0 +1,132 @@ +package eu.dnetlib.dhp.countrypropagation; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.*; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.SaveMode; + +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import static eu.dnetlib.dhp.PropagationConstant.createOutputDirs; +import static eu.dnetlib.dhp.PropagationConstant.getConstraintList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +/** + * For the association of the country to the datasource + * The association is computed only for datasource of specific type or having whitelisted ids + * The country is registered in the Organization associated to the Datasource, so the + * relation provides between Datasource and Organization is exploited to get the country for the datasource + */ + +public class PrepareResultCountryAssociation { + private static final Logger log = LoggerFactory.getLogger(PrepareResultCountryAssociation.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils.toString(PrepareResultCountryAssociation.class + .getResourceAsStream("/eu/dnetlib/dhp/countrypropagation/input_countrypropagation_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + jsonConfiguration); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + + String graphTableClassName = parser.get("graphTableClassName"); + log.info("graphTableClassName: {}", graphTableClassName); + + SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); + + runWithSparkSession(conf, isSparkSessionManaged, + spark -> { + removeOutputDir(spark, outputPath); + joinRelationEntity(spark, inputRelationsPath, inputEntityPath, entityClazz, outputPath); + }); + + + final SparkSession spark = SparkSession + .builder() + .appName(PrepareResultCountryAssociation.class.getSimpleName()) + .master(parser.get("master")) + .config(conf) + .enableHiveSupport() + .getOrCreate(); + + //todo add link to working dir + final String outputPath = "/tmp/provision/propagation/countrytoresultfrominstitutionalrepositories"; + + createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); + + List whitelist = Arrays.asList(parser.get("whitelist").split(";")); + List allowedtypes = Arrays.asList(parser.get("allowedtypes").split(";")); + + String whitelisted = ""; + for (String i : whitelist){ + whitelisted += " OR id = '" + i + "'"; + } + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + + Dataset datasource = spark.createDataset(sc.textFile(inputPath + "/datasource") + .map(item -> new ObjectMapper().readValue(item, Datasource.class)).rdd(), Encoders.bean(Datasource.class)); + + Dataset relation = spark.createDataset(sc.textFile(inputPath + "/relation") + .map(item -> new ObjectMapper().readValue(item, Relation.class)).rdd(), Encoders.bean(Relation.class)); + + Dataset organization = spark.createDataset(sc.textFile(inputPath + "/organization") + .map(item -> new ObjectMapper().readValue(item, Organization.class)).rdd(), Encoders.bean(Organization.class)); + + datasource.createOrReplaceTempView("datasource"); + relation.createOrReplaceTempView("relation"); + organization.createOrReplaceTempView("organization"); + + String query = "SELECT source ds, country.classid country " + + "FROM ( SELECT id " + + "FROM datasource " + + "WHERE (datainfo.deletedbyinference = false " + whitelisted + ") " + + getConstraintList("datasourcetype.classid = '", allowedtypes) + ") d " + + "JOIN ( SELECT source, target " + + "FROM relation " + + "WHERE relclass = 'provides' " + + "AND datainfo.deletedbyinference = false ) rel " + + "ON d.id = rel.source " + + "JOIN (SELECT id, country " + + "FROM organization " + + "WHERE datainfo.deletedbyinference = false " + + "AND length(country.classid)>0) o " + + "ON o.id = rel.target"; + + spark.sql(query) + .as(Encoders.bean(DatasourceCountry.class)) + .write() + .mode(SaveMode.Overwrite) + .parquet(outputPath + "/prepared_datasource_country"); + + + } +} diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob2.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob2.java new file mode 100644 index 000000000..23595bada --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob2.java @@ -0,0 +1,287 @@ +package eu.dnetlib.dhp.countrypropagation; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.*; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import scala.Tuple2; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; + +import static eu.dnetlib.dhp.PropagationConstant.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +public class SparkCountryPropagationJob2 { + + private static final Logger log = LoggerFactory.getLogger(SparkCountryPropagationJob2.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + + public static void main(String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCountryPropagationJob2.class.getResourceAsStream("/eu/dnetlib/dhp/countrypropagation/input_countrypropagation_parameters.json"))); + parser.parseArgument(args); + SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); + final SparkSession spark = SparkSession + .builder() + .appName(SparkCountryPropagationJob2.class.getSimpleName()) + .master(parser.get("master")) + .config(conf) + .enableHiveSupport() + .getOrCreate(); + + + final String inputPath = parser.get("sourcePath"); + final String outputPath = "/tmp/provision/propagation/countrytoresultfrominstitutionalrepositories"; + + // createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); + + boolean writeUpdates = TRUE.equals(parser.get("writeUpdate")); + boolean saveGraph = TRUE.equals(parser.get("saveGraph")); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + + Dataset dataset = spark.createDataset(sc.textFile(inputPath + "/dataset") + .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.Dataset.class)).rdd(), + Encoders.bean(eu.dnetlib.dhp.schema.oaf.Dataset.class)); + + Dataset other = spark.createDataset(sc.textFile(inputPath + "/otherresearchproduct") + .map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class)).rdd(), + Encoders.bean(OtherResearchProduct.class)); + + Dataset software = spark.createDataset(sc.textFile(inputPath + "/software") + .map(item -> new ObjectMapper().readValue(item, Software.class)).rdd(), + Encoders.bean(Software.class)); + + Dataset publication = spark.createDataset(sc.textFile(inputPath + "/publication") + .map(item -> new ObjectMapper().readValue(item, Publication.class)).rdd(), + Encoders.bean(Publication.class)); + + + //todo broadcast + + + software.createOrReplaceTempView("software"); + final JavaRDD toupdateresultsoftware = propagateOnResult(spark, "software"); + + dataset.createOrReplaceTempView("dataset"); + final JavaRDD toupdateresultdataset = propagateOnResult(spark, "dataset"); + + other.createOrReplaceTempView("other"); + final JavaRDD toupdateresultother = propagateOnResult(spark, "other"); + + publication.createOrReplaceTempView("publication"); + final JavaRDD toupdateresultpublication = propagateOnResult(spark, "publication"); + + if(writeUpdates){ + writeUpdates(toupdateresultsoftware, toupdateresultdataset, toupdateresultother, toupdateresultpublication, outputPath); + } + + if(saveGraph){ + createUpdateForSoftwareDataset(toupdateresultsoftware, inputPath, spark) + .map(s -> new ObjectMapper().writeValueAsString(s)) + .saveAsTextFile(outputPath + "/software"); + + createUpdateForDatasetDataset(toupdateresultdataset,inputPath,spark) + .map(d -> new ObjectMapper().writeValueAsString(d)) + .saveAsTextFile(outputPath + "/dataset"); + + createUpdateForOtherDataset(toupdateresultother, inputPath, spark) + .map(o -> new ObjectMapper().writeValueAsString(o)) + .saveAsTextFile(outputPath + "/otherresearchproduct"); + + createUpdateForPublicationDataset(toupdateresultpublication, inputPath, spark) + .map(p -> new ObjectMapper().writeValueAsString(p)) + .saveAsTextFile(outputPath + "/publication"); + } + + } + + private static void writeUpdates(JavaRDD software, JavaRDD dataset, JavaRDD other , JavaRDD publication, String outputPath){ + createUpdateForResultDatasetWrite(software, outputPath, "update_software"); + createUpdateForResultDatasetWrite(dataset, outputPath, "update_dataset"); + createUpdateForResultDatasetWrite(other, outputPath, "update_other"); + createUpdateForResultDatasetWrite(publication, outputPath, "update_publication"); + } + + private static JavaRDD createUpdateForOtherDataset(JavaRDD toupdateresult, String inputPath, SparkSession spark) { + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + return sc.textFile(inputPath + "/otherresearchproduct") + .map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class)) + .mapToPair(s -> new Tuple2<>(s.getId(), s)).leftOuterJoin(getStringResultJavaPairRDD(toupdateresult)) + .map(c -> { + OtherResearchProduct oaf = c._2()._1(); + List countryList = oaf.getCountry(); + if (c._2()._2().isPresent()) { + HashSet countries = new HashSet<>(); + for (Qualifier country : countryList) { + countries.add(country.getClassid()); + } + Result r = c._2()._2().get(); + for (Country country : r.getCountry()) { + if (!countries.contains(country.getClassid())) { + countryList.add(country); + } + } + oaf.setCountry(countryList); + } + return oaf; + }); + } + + private static JavaRDD createUpdateForPublicationDataset(JavaRDD toupdateresult, String inputPath, SparkSession spark) { + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + return sc.textFile(inputPath + "/publication") + .map(item -> new ObjectMapper().readValue(item, Publication.class)) + .mapToPair(s -> new Tuple2<>(s.getId(), s)).leftOuterJoin(getStringResultJavaPairRDD(toupdateresult)) + .map(c -> { + Publication oaf = c._2()._1(); + List countryList = oaf.getCountry(); + if (c._2()._2().isPresent()) { + HashSet countries = new HashSet<>(); + for (Qualifier country : countryList) { + countries.add(country.getClassid()); + } + Result r = c._2()._2().get(); + for (Country country : r.getCountry()) { + if (!countries.contains(country.getClassid())) { + countryList.add(country); + } + } + oaf.setCountry(countryList); + } + return oaf; + }); + } + + private static JavaRDD createUpdateForSoftwareDataset(JavaRDD toupdateresult, String inputPath, SparkSession spark) { + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + return sc.textFile(inputPath + "/software") + .map(item -> new ObjectMapper().readValue(item, Software.class)) + .mapToPair(s -> new Tuple2<>(s.getId(), s)).leftOuterJoin(getStringResultJavaPairRDD(toupdateresult)) + .map(c -> { + Software oaf = c._2()._1(); + List countryList = oaf.getCountry(); + if (c._2()._2().isPresent()) { + HashSet countries = new HashSet<>(); + for (Qualifier country : countryList) { + countries.add(country.getClassid()); + } + Result r = c._2()._2().get(); + for (Country country : r.getCountry()) { + if (!countries.contains(country.getClassid())) { + countryList.add(country); + } + } + oaf.setCountry(countryList); + } + return oaf; + }); + } + + private static JavaRDD createUpdateForDatasetDataset(JavaRDD toupdateresult, String inputPath, SparkSession spark) { + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + return sc.textFile(inputPath + "/dataset") + .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.Dataset.class)) + .mapToPair(d -> new Tuple2<>(d.getId(), d)).leftOuterJoin(getStringResultJavaPairRDD(toupdateresult)) + .map(c -> { + eu.dnetlib.dhp.schema.oaf.Dataset oaf = c._2()._1(); + List countryList = oaf.getCountry(); + if (c._2()._2().isPresent()) { + HashSet countries = new HashSet<>(); + for (Qualifier country : countryList) { + countries.add(country.getClassid()); + } + Result r = c._2()._2().get(); + for (Country country : r.getCountry()) { + if (!countries.contains(country.getClassid())) { + countryList.add(country); + } + } + oaf.setCountry(countryList); + } + return oaf; + }); + } + + private static JavaRDD propagateOnResult(SparkSession spark, String result_type) { + String query; + query = "SELECT id, inst.collectedfrom.key cf , inst.hostedby.key hb " + + "FROM ( SELECT id, instance " + + "FROM " + result_type + + " WHERE datainfo.deletedbyinference = false) ds " + + "LATERAL VIEW EXPLODE(instance) i AS inst"; + Dataset cfhb = spark.sql(query); + cfhb.createOrReplaceTempView("cfhb"); + + return countryPropagationAssoc(spark, "cfhb").toJavaRDD(); + + } + + private static Dataset countryPropagationAssoc(SparkSession spark, String cfhbTable){ + String query = "SELECT id, collect_set(country) country "+ + "FROM ( SELECT id, country " + + "FROM rels " + + "JOIN " + cfhbTable + + " ON cf = ds " + + "UNION ALL " + + "SELECT id , country " + + "FROM rels " + + "JOIN " + cfhbTable + + " ON hb = ds ) tmp " + + "GROUP BY id"; + return spark.sql(query); + } + + private static JavaPairRDD getStringResultJavaPairRDD(JavaRDD toupdateresult) { + return toupdateresult.map(c -> { + List countryList = new ArrayList<>(); + List tmp = c.getList(1); + for (String country : tmp) { + countryList.add(getCountry(country)); + } + Result r = new Result(); + r.setId(c.getString(0)); + r.setCountry(countryList); + return r; + }).mapToPair(r -> new Tuple2<>(r.getId(), r)); + } + + private static void createUpdateForResultDatasetWrite(JavaRDD toupdateresult, String outputPath, String type){ + toupdateresult.map(c -> { + List countryList = new ArrayList<>(); + List tmp = c.getList(1); + for (String country : tmp) { + countryList.add(getCountry(country)); + } + Result r = new Result(); + r.setId(c.getString(0)); + r.setCountry(countryList); + return r; + + }).map(r ->new ObjectMapper().writeValueAsString(r)) + .saveAsTextFile(outputPath+"/"+type); + } + + + +} \ No newline at end of file