From 47561f35973fabf995892611877ff7d24f8ed1da Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 23 Mar 2020 11:58:32 +0100 Subject: [PATCH] changed the implementation from rdd to dataset got from sql queries (on hive) --- .../eu/dnetlib/dhp/SerializableSupplier.java | 4 + .../SparkCountryPropagationJob.java | 430 ++++++++++++++++-- 2 files changed, 406 insertions(+), 28 deletions(-) create mode 100644 dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/SerializableSupplier.java diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/SerializableSupplier.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/SerializableSupplier.java new file mode 100644 index 000000000..b890c141d --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/SerializableSupplier.java @@ -0,0 +1,4 @@ +package eu.dnetlib.dhp; + +public class SerializableSupplier { +} diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java index 3fbba47ba..75fceab63 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java @@ -1,20 +1,27 @@ package eu.dnetlib.dhp.countrypropagation; import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.PropagationConstant; import eu.dnetlib.dhp.TypedRow; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.*; -import eu.dnetlib.dhp.schema.oaf.Dataset; +import net.sf.saxon.expr.ContextMappingFunction; +import net.sf.saxon.expr.flwor.Tuple; +import net.sf.saxon.om.Item; import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.*; +import org.apache.spark.sql.Dataset; import scala.Tuple2; +import javax.sql.DataSource; +import java.beans.Encoder; import java.io.File; import java.io.IOException; import java.util.*; @@ -22,42 +29,298 @@ import java.util.*; import static eu.dnetlib.dhp.PropagationConstant.*; public class SparkCountryPropagationJob { + public static void main(String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCountryPropagationJob.class.getResourceAsStream("/eu/dnetlib/dhp/countrypropagation/input_countrypropagation_parameters.json"))); parser.parseArgument(args); + SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); final SparkSession spark = SparkSession .builder() .appName(SparkCountryPropagationJob.class.getSimpleName()) .master(parser.get("master")) + .config(conf) .enableHiveSupport() .getOrCreate(); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final String inputPath = parser.get("sourcePath"); final String outputPath = "/tmp/provision/propagation/countrytoresultfrominstitutionalrepositories"; createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); - List whitelist = Arrays.asList(parser.get("whitelist").split(";")); List allowedtypes = Arrays.asList(parser.get("allowedtypes").split(";")); - JavaPairRDD organizations = sc.textFile(inputPath + "/organization") + datasource(spark, whitelist, outputPath, inputPath); + + //rdd(spark,whitelist,allowedtypes, outputPath, inputPath); + + } + + private static void datasource(SparkSession spark, List whitelist, String outputPath, String inputPath){ + String whitelisted = ""; + for (String i : whitelist){ + whitelisted += " OR id = '" + i + "'"; + } + + String query = "SELECT source ds, target org, country.classid country " + + "FROM ( SELECT id " + + "FROM openaire.datasource " + + "WHERE datasourcetype.classid = 'pubsrepository::institutional' " + + "AND (datainfo.deletedbyinference = false " + whitelisted + ") ) d " + + "JOIN ( SELECT source, target " + + "FROM openaire.relation " + + "WHERE relclass = 'provides' " + + "AND datainfo.deletedbyinference = false ) rel " + + "ON d.id = rel.source " + + "JOIN (SELECT id, country " + + "FROM openaire.organization " + + "WHERE datainfo.deletedbyinference = false ) o " + + "ON o.id = rel.target"; + + Dataset rels = spark.sql(query); + rels.createOrReplaceTempView("rels"); + + + + final JavaRDD toupdateresultsoftware = propagateOnResult(spark, "openaire.software"); + createUpdateForSoftwareDataset(toupdateresultsoftware, inputPath, spark) + .map(s -> new ObjectMapper().writeValueAsString(s)) + .saveAsTextFile(outputPath + "/software"); + createUpdateForResultDatasetWrite(toupdateresultsoftware, outputPath, "update_software"); + + JavaRDD toupdateresultdataset = propagateOnResult(spark, "openaire.dataset"); + createUpdateForDatasetDataset(toupdateresultdataset,inputPath,spark) + .map(d -> new ObjectMapper().writeValueAsString(d)) + .saveAsTextFile(outputPath + "/dataset"); + createUpdateForResultDatasetWrite(toupdateresultdataset, outputPath, "update_dataset"); + + JavaRDD toupdateresultother = propagateOnResult(spark, "openaire.otherresearchproduct"); + createUpdateForOtherDataset(toupdateresultother, inputPath, spark) + .map(o -> new ObjectMapper().writeValueAsString(o)) + .saveAsTextFile(outputPath + "/otherresearchproduct"); + createUpdateForResultDatasetWrite(toupdateresultother, outputPath, "update_other"); + + createUpdateForPublicationDataset(propagateOnResult(spark, "openaire.publication"), inputPath, spark) + .map(p -> new ObjectMapper().writeValueAsString(p)) + .saveAsTextFile(outputPath + "/publication"); + + } + + private static JavaRDD createUpdateForOtherDataset(JavaRDD toupdateresult, String inputPath, SparkSession spark) { + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + return sc.textFile(inputPath + "/otherresearchproduct") + .map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class)) + .mapToPair(s -> new Tuple2<>(s.getId(), s)).leftOuterJoin(getStringResultJavaPairRDD(toupdateresult)) + .map(c -> { + OtherResearchProduct oaf = c._2()._1(); + List countryList = oaf.getCountry(); + if (c._2()._2().isPresent()) { + HashSet countries = new HashSet<>(); + for (Qualifier country : countryList) { + countries.add(country.getClassid()); + } + Result r = c._2()._2().get(); + for (Country country : r.getCountry()) { + if (!countries.contains(country.getClassid())) { + countryList.add(country); + } + } + oaf.setCountry(countryList); + } + return oaf; + }); + } + + private static JavaRDD createUpdateForPublicationDataset(JavaRDD toupdateresult, String inputPath, SparkSession spark) { + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + return sc.textFile(inputPath + "/publication") + .map(item -> new ObjectMapper().readValue(item, Publication.class)) + .mapToPair(s -> new Tuple2<>(s.getId(), s)).leftOuterJoin(getStringResultJavaPairRDD(toupdateresult)) + .map(c -> { + Publication oaf = c._2()._1(); + List countryList = oaf.getCountry(); + if (c._2()._2().isPresent()) { + HashSet countries = new HashSet<>(); + for (Qualifier country : countryList) { + countries.add(country.getClassid()); + } + Result r = c._2()._2().get(); + for (Country country : r.getCountry()) { + if (!countries.contains(country.getClassid())) { + countryList.add(country); + } + } + oaf.setCountry(countryList); + } + return oaf; + }); + } + + private static JavaRDD createUpdateForSoftwareDataset(JavaRDD toupdateresult, String inputPath, SparkSession spark) { + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + return sc.textFile(inputPath + "/software") + .map(item -> new ObjectMapper().readValue(item, Software.class)) + .mapToPair(s -> new Tuple2<>(s.getId(), s)).leftOuterJoin(getStringResultJavaPairRDD(toupdateresult)) + .map(c -> { + Software oaf = c._2()._1(); + List countryList = oaf.getCountry(); + if (c._2()._2().isPresent()) { + HashSet countries = new HashSet<>(); + for (Qualifier country : countryList) { + countries.add(country.getClassid()); + } + Result r = c._2()._2().get(); + for (Country country : r.getCountry()) { + if (!countries.contains(country.getClassid())) { + countryList.add(country); + } + } + oaf.setCountry(countryList); + } + return oaf; + }); + } + + private static JavaRDD createUpdateForDatasetDataset(JavaRDD toupdateresult, String inputPath, SparkSession spark) { + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + return sc.textFile(inputPath + "/dataset") + .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.Dataset.class)) + .mapToPair(d -> new Tuple2<>(d.getId(), d)).leftOuterJoin(getStringResultJavaPairRDD(toupdateresult)) + .map(c -> { + eu.dnetlib.dhp.schema.oaf.Dataset oaf = c._2()._1(); + List countryList = oaf.getCountry(); + if (c._2()._2().isPresent()) { + HashSet countries = new HashSet<>(); + for (Qualifier country : countryList) { + countries.add(country.getClassid()); + } + Result r = c._2()._2().get(); + for (Country country : r.getCountry()) { + if (!countries.contains(country.getClassid())) { + countryList.add(country); + } + } + oaf.setCountry(countryList); + } + return oaf; + }); + } + + private static JavaRDD propagateOnResult(SparkSession spark, String table) { + String query; + query = "SELECT id, inst.collectedfrom.key cf , inst.hostedby.key hb " + + "FROM ( SELECT id, instance " + + "FROM " + table + + " WHERE datainfo.deletedbyinference = false) ds " + + "LATERAL VIEW EXPLODE(instance) i AS inst"; + Dataset cfhb = spark.sql(query); + cfhb.createOrReplaceTempView("cfhb"); + + return countryPropagationAssoc(spark, "cfhb").toJavaRDD(); + + } + + private static Dataset countryPropagationAssoc(SparkSession spark, String cfhbTable){ + String query = "SELECT id, collect_set(country) country "+ + "FROM ( SELECT id, country " + + "FROM rels " + + "JOIN " + cfhbTable + + " ON cf = ds " + + "UNION ALL " + + "SELECT id , country " + + "FROM rels " + + "JOIN " + cfhbTable + + " ON hb = ds ) tmp " + + "GROUP BY id"; + return spark.sql(query); + } + + private static JavaPairRDD getStringResultJavaPairRDD(JavaRDD toupdateresult) { + return toupdateresult.map(c -> { + List countryList = new ArrayList<>(); + List tmp = c.getList(1); + for (String country : tmp) { + countryList.add(getCountry(country)); + } + Result r = new Result(); + r.setId(c.getString(0)); + r.setCountry(countryList); + return r; + }).mapToPair(r -> new Tuple2<>(r.getId(), r)); + } + + private static void createUpdateForResultDatasetWrite(JavaRDD toupdateresult, String outputPath, String type){ + toupdateresult.map(c -> { + List countryList = new ArrayList<>(); + List tmp = c.getList(1); + for (String country : tmp) { + countryList.add(getCountry(country)); + } + Result r = new Result(); + r.setId(c.getString(0)); + r.setCountry(countryList); + return r; + + }).map(r ->new ObjectMapper().writeValueAsString(r)) + .saveAsTextFile(outputPath+"/"+type); + } + + private static Dataset countryPropagation(SparkSession spark, String cfhbTable){ + String query = "SELECT id, collect_set(named_struct('classid', country, 'classname', country, " + + "'datainfo', named_struct( 'deletedbyinference', false, " + + "'inferenceprovenance','" + PROPAGATION_DATA_INFO_TYPE +"'," + + "'inferred',true,'invisible',false, " + + "'provenanceaction', named_struct('classid','" + PROPAGATION_COUNTRY_INSTREPO_CLASS_ID + "'," + + "'classname','" + PROPAGATION_COUNTRY_INSTREPO_CLASS_NAME +"'," + + "'schemeid','" + DNET_SCHEMA_ID +"'," + + "'schemename','" + DNET_SCHEMA_NAME +"') , " + + "'trust','0.9') ,'schemeid','" + DNET_COUNTRY_SCHEMA +"','schemename','" + DNET_COUNTRY_SCHEMA + "')) country " + + "FROM ( SELECT id, country " + + "FROM rels " + + "JOIN " + cfhbTable + + " ON cf = ds " + + "UNION ALL " + + "SELECT id , country " + + "FROM rels " + + "JOIN " + cfhbTable + + " ON hb = ds ) tmp " + + "GROUP BY id"; + return spark.sql(query); + } + + private static void rdd(SparkSession spark, List whitelist, List allowedtypes, String outputPath, String inputPath) throws IOException { + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + JavaPairRDD organizations = sc.textFile(inputPath + "/organization") .map(item -> new ObjectMapper().readValue(item, Organization.class)) .filter(org -> !org.getDataInfo().getDeletedbyinference()) - .map(org -> new TypedRow().setSourceId(org.getId()).setValue(org.getCountry().getClassid())) + .map(org -> { + TypedRow tr = new TypedRow(); + tr.setSourceId(org.getId()); + tr.setValue(org.getCountry().getClassid()); + return tr; + }) .mapToPair(toPair()); JavaPairRDD organization_datasource = sc.textFile(inputPath + "/relation") - .map(item -> new ObjectMapper().readValue(item, Relation.class)) - .filter(r -> !r.getDataInfo().getDeletedbyinference()) - .filter(r -> RELATION_DATASOURCEORGANIZATION_REL_TYPE.equals(r.getRelClass()) && RELATION_ORGANIZATION_DATASOURCE_REL_CLASS.equals(r.getRelType())) - .map(r -> new TypedRow().setSourceId(r.getSource()).setTargetId(r.getTarget())) - .mapToPair(toPair()); //id is the organization identifier - + .map(item -> new ObjectMapper().readValue(item, Relation.class)) + .filter(r -> !r.getDataInfo().getDeletedbyinference()) + .filter(r -> RELATION_DATASOURCEORGANIZATION_REL_TYPE.equals(r.getRelType()) && + RELATION_ORGANIZATION_DATASOURCE_REL_CLASS.equals(r.getRelClass())) + .map(r -> { + TypedRow tp = new TypedRow(); tp.setSourceId(r.getSource()); tp.setTargetId(r.getTarget()); + return tp; + }) + .mapToPair(toPair()); //id is the organization identifier JavaPairRDD datasources = sc.textFile(inputPath + "/datasource") .map(item -> new ObjectMapper().readValue(item, Datasource.class)) @@ -65,6 +328,97 @@ public class SparkCountryPropagationJob { .map(ds -> new TypedRow().setSourceId(ds.getId())) .mapToPair(toPair()); + JavaPairRDD datasource_country = organizations.join(organization_datasource) + .map(x -> x._2()._1().setSourceId(x._2()._2().getTargetId())) // (OrganizationId,(TypedRow for Organization, TypedRow for Relation) + .mapToPair(toPair()); //(DatasourceId, TypedRowforOrganziation) + + + JavaPairRDD alloweddatasources_country = datasources.join(datasource_country) + .mapToPair(ds -> new Tuple2<>(ds._1(), ds._2()._2())).cache(); + +// System.out.println("OUTPUT *** ORGANIZATION COUNT *** " + organizations.count()); +// System.out.println("OUTPUT *** ORGANIZATION DATASOURCE RELATIONS COUNT *** " + organization_datasource.count()); +// System.out.println("OUTPUT *** DATASOURCE COUNT *** " + datasources.count()); +// System.out.println("OUTPUT *** ALLOWED_DATASOURCE-COUNTRY COUNT *** " + alloweddatasources_country.count()); + +// alloweddatasources_country.map(p -> new ObjectMapper().writeValueAsString(p)) +// .saveAsTextFile(outputPath+"/datasource_country"); + + JavaRDD software = sc.textFile(inputPath + "/software") + .map(item -> new ObjectMapper().readValue(item, Software.class)); + + JavaPairRDD datasource_software = software + .map(oaf -> getTypedRowsDatasourceResult(oaf)) + .flatMapToPair(f -> { + ArrayList> ret = new ArrayList<>(); + for (TypedRow t : f) { + ret.add(new Tuple2<>(t.getSourceId(), t)); + } + return ret.iterator(); + }); + + datasource_software.map(p -> new ObjectMapper().writeValueAsString(p)) + .saveAsTextFile(outputPath+"/datasource_software"); + + JavaPairRDD sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p)); + + JavaPairRDD toupdateresult = alloweddatasources_country.join(datasource_software) + .map(u -> { + TypedRow tp = u._2()._2(); + tp.setValue(u._2()._1().getValue()); + return tp; + }) + .mapToPair(toPair()) + .reduceByKey((a, p) -> { + if (a == null) { + return p; + } + if (p == null) { + return a; + } + a.addAll(p.getAccumulator()); + return a; + }); + toupdateresult.map(p -> new ObjectMapper().writeValueAsString(p)) + .saveAsTextFile(outputPath+"/toupdateresult"); + //updateResult(sfw, toupdateresult, outputPath, "software"); + // createUpdateForResult(toupdateresult, outputPath, "software"); + + + + /* JavaRDD publications = sc.textFile(inputPath + "/publication") + .map(item -> new ObjectMapper().readValue(item, Publication.class)); + + JavaPairRDD datasource_publication = publications + .map(oaf -> getTypedRowsDatasourceResult(oaf)) + .flatMapToPair(f -> { + ArrayList> ret = new ArrayList<>(); + for (TypedRow t : f) { + ret.add(new Tuple2<>(t.getSourceId(), t)); + } + return ret.iterator(); + }); + + JavaPairRDD toupdateresult = alloweddatasources_country.join(datasource_publication) + .map(u -> u._2()._2().setValue(u._2()._1().getValue())) + .mapToPair(toPair()) + .reduceByKey((a, p) -> { + if (a == null) { + return p; + } + if (p == null) { + return a; + } + a.addAll(p.getAccumulator()); + return a; + }); + + + + + + + JavaRDD publications = sc.textFile(inputPath + "/publication") .map(item -> new ObjectMapper().readValue(item, Publication.class)); @@ -75,6 +429,9 @@ public class SparkCountryPropagationJob { JavaRDD other = sc.textFile(inputPath + "/otherresearchproduct") .map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class)); + + + JavaPairRDD datasource_results = publications .map(oaf -> getTypedRowsDatasourceResult(oaf)) .flatMapToPair(f -> { @@ -112,18 +469,8 @@ public class SparkCountryPropagationJob { return ret.iterator(); })); - JavaPairRDD pubs = publications.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD dss = datasets.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD orp = other.mapToPair(p -> new Tuple2<>(p.getId(),p)); - - JavaPairRDD datasource_country = organizations.join(organization_datasource) - .map(x -> x._2()._1().setSourceId(x._2()._2().getTargetId())) // (OrganizationId,(TypedRow for Organization, TypedRow for Relation) - .mapToPair(toPair()); //(DatasourceId, TypedRowforOrganziation) - JavaPairRDD alloweddatasources_country = datasources.join(datasource_country) - .mapToPair(ds -> new Tuple2<>(ds._1(), ds._2()._2())); JavaPairRDD toupdateresult = alloweddatasources_country.join(datasource_results) @@ -140,15 +487,48 @@ public class SparkCountryPropagationJob { return a; }); + + JavaPairRDD pubs = publications.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD dss = datasets.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD orp = other.mapToPair(p -> new Tuple2<>(p.getId(),p)); + updateResult(pubs, toupdateresult, outputPath, "publication"); updateResult(dss, toupdateresult, outputPath, "dataset"); updateResult(sfw, toupdateresult, outputPath, "software"); updateResult(orp, toupdateresult, outputPath, "otherresearchproduct"); //we use leftOuterJoin because we want to rebuild the entire structure +*/ + } + + private static void createUpdateForResult(JavaPairRDD toupdateresult, String outputPath, String type){ + toupdateresult.map(c -> { + List countryList = new ArrayList<>(); + for (String country : c._2.getAccumulator()) { + countryList.add(getCountry(country)); + } + switch(type ){ + case "software": + Software s = new Software(); + s.setId(c._1()); + s.setCountry(countryList); + return s; + case "publication": + break; + case "dataset": + break; + case "otherresearchproduct": + break; + + } + return null; + }).map(r ->new ObjectMapper().writeValueAsString(r)) + .saveAsTextFile(outputPath+"/"+type); + } private static void updateResult(JavaPairRDD results, JavaPairRDD toupdateresult, String outputPath, String type) { results.leftOuterJoin(toupdateresult) @@ -176,13 +556,7 @@ public class SparkCountryPropagationJob { .map(p -> new ObjectMapper().writeValueAsString(p)) .saveAsTextFile(outputPath+"/"+type); } - - - - - - - + }