From a440152b46556d4084c8e79d6507bfb1bb391baf Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 23 Mar 2020 14:30:56 +0100 Subject: [PATCH] refactoring --- .../SparkCountryPropagationJob.java | 309 +----------------- 1 file changed, 16 insertions(+), 293 deletions(-) diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java index 75fceab63..d2b50791d 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java @@ -18,6 +18,7 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.*; import org.apache.spark.sql.Dataset; +import org.codehaus.janino.Java; import scala.Tuple2; import javax.sql.DataSource; @@ -56,8 +57,6 @@ public class SparkCountryPropagationJob { datasource(spark, whitelist, outputPath, inputPath); - //rdd(spark,whitelist,allowedtypes, outputPath, inputPath); - } private static void datasource(SparkSession spark, List whitelist, String outputPath, String inputPath){ @@ -83,33 +82,40 @@ public class SparkCountryPropagationJob { Dataset rels = spark.sql(query); rels.createOrReplaceTempView("rels"); - - + final JavaRDD toupdateresultsoftware = propagateOnResult(spark, "openaire.software"); + final JavaRDD toupdateresultdataset = propagateOnResult(spark, "openaire.dataset"); + final JavaRDD toupdateresultother = propagateOnResult(spark, "openaire.otherresearchproduct"); + final JavaRDD toupdateresultpublication = propagateOnResult(spark, "openaire.publication"); + + writeUpdates(toupdateresultsoftware, toupdateresultdataset, toupdateresultother, toupdateresultpublication, outputPath); + createUpdateForSoftwareDataset(toupdateresultsoftware, inputPath, spark) .map(s -> new ObjectMapper().writeValueAsString(s)) .saveAsTextFile(outputPath + "/software"); - createUpdateForResultDatasetWrite(toupdateresultsoftware, outputPath, "update_software"); - JavaRDD toupdateresultdataset = propagateOnResult(spark, "openaire.dataset"); createUpdateForDatasetDataset(toupdateresultdataset,inputPath,spark) .map(d -> new ObjectMapper().writeValueAsString(d)) .saveAsTextFile(outputPath + "/dataset"); - createUpdateForResultDatasetWrite(toupdateresultdataset, outputPath, "update_dataset"); - JavaRDD toupdateresultother = propagateOnResult(spark, "openaire.otherresearchproduct"); createUpdateForOtherDataset(toupdateresultother, inputPath, spark) .map(o -> new ObjectMapper().writeValueAsString(o)) .saveAsTextFile(outputPath + "/otherresearchproduct"); - createUpdateForResultDatasetWrite(toupdateresultother, outputPath, "update_other"); - createUpdateForPublicationDataset(propagateOnResult(spark, "openaire.publication"), inputPath, spark) + createUpdateForPublicationDataset(toupdateresultpublication, inputPath, spark) .map(p -> new ObjectMapper().writeValueAsString(p)) .saveAsTextFile(outputPath + "/publication"); } + private static void writeUpdates(JavaRDD software, JavaRDD dataset, JavaRDD other , JavaRDD publication, String outputPath){ + createUpdateForResultDatasetWrite(software, outputPath, "update_software"); + createUpdateForResultDatasetWrite(dataset, outputPath, "update_dataset"); + createUpdateForResultDatasetWrite(other, outputPath, "update_other"); + createUpdateForResultDatasetWrite(publication, outputPath, "update_publication"); + } + private static JavaRDD createUpdateForOtherDataset(JavaRDD toupdateresult, String inputPath, SparkSession spark) { final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); @@ -273,290 +279,7 @@ public class SparkCountryPropagationJob { .saveAsTextFile(outputPath+"/"+type); } - private static Dataset countryPropagation(SparkSession spark, String cfhbTable){ - String query = "SELECT id, collect_set(named_struct('classid', country, 'classname', country, " + - "'datainfo', named_struct( 'deletedbyinference', false, " + - "'inferenceprovenance','" + PROPAGATION_DATA_INFO_TYPE +"'," + - "'inferred',true,'invisible',false, " + - "'provenanceaction', named_struct('classid','" + PROPAGATION_COUNTRY_INSTREPO_CLASS_ID + "'," + - "'classname','" + PROPAGATION_COUNTRY_INSTREPO_CLASS_NAME +"'," + - "'schemeid','" + DNET_SCHEMA_ID +"'," + - "'schemename','" + DNET_SCHEMA_NAME +"') , " + - "'trust','0.9') ,'schemeid','" + DNET_COUNTRY_SCHEMA +"','schemename','" + DNET_COUNTRY_SCHEMA + "')) country " + - "FROM ( SELECT id, country " + - "FROM rels " + - "JOIN " + cfhbTable + - " ON cf = ds " + - "UNION ALL " + - "SELECT id , country " + - "FROM rels " + - "JOIN " + cfhbTable + - " ON hb = ds ) tmp " + - "GROUP BY id"; - return spark.sql(query); - } - private static void rdd(SparkSession spark, List whitelist, List allowedtypes, String outputPath, String inputPath) throws IOException { - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - - JavaPairRDD organizations = sc.textFile(inputPath + "/organization") - .map(item -> new ObjectMapper().readValue(item, Organization.class)) - .filter(org -> !org.getDataInfo().getDeletedbyinference()) - .map(org -> { - TypedRow tr = new TypedRow(); - tr.setSourceId(org.getId()); - tr.setValue(org.getCountry().getClassid()); - return tr; - }) - .mapToPair(toPair()); - - JavaPairRDD organization_datasource = - sc.textFile(inputPath + "/relation") - .map(item -> new ObjectMapper().readValue(item, Relation.class)) - .filter(r -> !r.getDataInfo().getDeletedbyinference()) - .filter(r -> RELATION_DATASOURCEORGANIZATION_REL_TYPE.equals(r.getRelType()) && - RELATION_ORGANIZATION_DATASOURCE_REL_CLASS.equals(r.getRelClass())) - .map(r -> { - TypedRow tp = new TypedRow(); tp.setSourceId(r.getSource()); tp.setTargetId(r.getTarget()); - return tp; - }) - .mapToPair(toPair()); //id is the organization identifier - - JavaPairRDD datasources = sc.textFile(inputPath + "/datasource") - .map(item -> new ObjectMapper().readValue(item, Datasource.class)) - .filter(ds -> whitelist.contains(ds.getId()) || allowedtypes.contains(ds.getDatasourcetype().getClassid())) - .map(ds -> new TypedRow().setSourceId(ds.getId())) - .mapToPair(toPair()); - - JavaPairRDD datasource_country = organizations.join(organization_datasource) - .map(x -> x._2()._1().setSourceId(x._2()._2().getTargetId())) // (OrganizationId,(TypedRow for Organization, TypedRow for Relation) - .mapToPair(toPair()); //(DatasourceId, TypedRowforOrganziation) - - - JavaPairRDD alloweddatasources_country = datasources.join(datasource_country) - .mapToPair(ds -> new Tuple2<>(ds._1(), ds._2()._2())).cache(); - -// System.out.println("OUTPUT *** ORGANIZATION COUNT *** " + organizations.count()); -// System.out.println("OUTPUT *** ORGANIZATION DATASOURCE RELATIONS COUNT *** " + organization_datasource.count()); -// System.out.println("OUTPUT *** DATASOURCE COUNT *** " + datasources.count()); -// System.out.println("OUTPUT *** ALLOWED_DATASOURCE-COUNTRY COUNT *** " + alloweddatasources_country.count()); - -// alloweddatasources_country.map(p -> new ObjectMapper().writeValueAsString(p)) -// .saveAsTextFile(outputPath+"/datasource_country"); - - JavaRDD software = sc.textFile(inputPath + "/software") - .map(item -> new ObjectMapper().readValue(item, Software.class)); - - JavaPairRDD datasource_software = software - .map(oaf -> getTypedRowsDatasourceResult(oaf)) - .flatMapToPair(f -> { - ArrayList> ret = new ArrayList<>(); - for (TypedRow t : f) { - ret.add(new Tuple2<>(t.getSourceId(), t)); - } - return ret.iterator(); - }); - - datasource_software.map(p -> new ObjectMapper().writeValueAsString(p)) - .saveAsTextFile(outputPath+"/datasource_software"); - - JavaPairRDD sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p)); - - JavaPairRDD toupdateresult = alloweddatasources_country.join(datasource_software) - .map(u -> { - TypedRow tp = u._2()._2(); - tp.setValue(u._2()._1().getValue()); - return tp; - }) - .mapToPair(toPair()) - .reduceByKey((a, p) -> { - if (a == null) { - return p; - } - if (p == null) { - return a; - } - a.addAll(p.getAccumulator()); - return a; - }); - toupdateresult.map(p -> new ObjectMapper().writeValueAsString(p)) - .saveAsTextFile(outputPath+"/toupdateresult"); - //updateResult(sfw, toupdateresult, outputPath, "software"); - // createUpdateForResult(toupdateresult, outputPath, "software"); - - - - /* JavaRDD publications = sc.textFile(inputPath + "/publication") - .map(item -> new ObjectMapper().readValue(item, Publication.class)); - - JavaPairRDD datasource_publication = publications - .map(oaf -> getTypedRowsDatasourceResult(oaf)) - .flatMapToPair(f -> { - ArrayList> ret = new ArrayList<>(); - for (TypedRow t : f) { - ret.add(new Tuple2<>(t.getSourceId(), t)); - } - return ret.iterator(); - }); - - JavaPairRDD toupdateresult = alloweddatasources_country.join(datasource_publication) - .map(u -> u._2()._2().setValue(u._2()._1().getValue())) - .mapToPair(toPair()) - .reduceByKey((a, p) -> { - if (a == null) { - return p; - } - if (p == null) { - return a; - } - a.addAll(p.getAccumulator()); - return a; - }); - - - - - - - - - JavaRDD publications = sc.textFile(inputPath + "/publication") - .map(item -> new ObjectMapper().readValue(item, Publication.class)); - JavaRDD datasets = sc.textFile(inputPath + "/dataset") - .map(item -> new ObjectMapper().readValue(item, Dataset.class)); - JavaRDD software = sc.textFile(inputPath + "/software") - .map(item -> new ObjectMapper().readValue(item, Software.class)); - JavaRDD other = sc.textFile(inputPath + "/otherresearchproduct") - .map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class)); - - - - - JavaPairRDD datasource_results = publications - .map(oaf -> getTypedRowsDatasourceResult(oaf)) - .flatMapToPair(f -> { - ArrayList> ret = new ArrayList<>(); - for (TypedRow t : f) { - ret.add(new Tuple2<>(t.getSourceId(), t)); - } - return ret.iterator(); - }) - .union(datasets - .map(oaf -> getTypedRowsDatasourceResult(oaf)) - .flatMapToPair(f -> { - ArrayList> ret = new ArrayList<>(); - for (TypedRow t : f) { - ret.add(new Tuple2<>(t.getSourceId(), t)); - } - return ret.iterator(); - })) - .union(software - .map(oaf -> getTypedRowsDatasourceResult(oaf)) - .flatMapToPair(f -> { - ArrayList> ret = new ArrayList<>(); - for (TypedRow t : f) { - ret.add(new Tuple2<>(t.getSourceId(), t)); - } - return ret.iterator(); - })) - .union(other - .map(oaf -> getTypedRowsDatasourceResult(oaf)) - .flatMapToPair(f -> { - ArrayList> ret = new ArrayList<>(); - for (TypedRow t : f) { - ret.add(new Tuple2<>(t.getSourceId(), t)); - } - return ret.iterator(); - })); - - - - - - JavaPairRDD toupdateresult = alloweddatasources_country.join(datasource_results) - .map(u -> u._2()._2().setValue(u._2()._1().getValue())) - .mapToPair(toPair()) - .reduceByKey((a, p) -> { - if (a == null) { - return p; - } - if (p == null) { - return a; - } - a.addAll(p.getAccumulator()); - return a; - }); - - - JavaPairRDD pubs = publications.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD dss = datasets.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD orp = other.mapToPair(p -> new Tuple2<>(p.getId(),p)); - - updateResult(pubs, toupdateresult, outputPath, "publication"); - updateResult(dss, toupdateresult, outputPath, "dataset"); - updateResult(sfw, toupdateresult, outputPath, "software"); - updateResult(orp, toupdateresult, outputPath, "otherresearchproduct"); - //we use leftOuterJoin because we want to rebuild the entire structure - -*/ - - } - - - - private static void createUpdateForResult(JavaPairRDD toupdateresult, String outputPath, String type){ - toupdateresult.map(c -> { - List countryList = new ArrayList<>(); - for (String country : c._2.getAccumulator()) { - countryList.add(getCountry(country)); - } - switch(type ){ - case "software": - Software s = new Software(); - s.setId(c._1()); - s.setCountry(countryList); - return s; - case "publication": - break; - case "dataset": - break; - case "otherresearchproduct": - break; - - } - return null; - }).map(r ->new ObjectMapper().writeValueAsString(r)) - .saveAsTextFile(outputPath+"/"+type); - } - - private static void updateResult(JavaPairRDD results, JavaPairRDD toupdateresult, String outputPath, String type) { - results.leftOuterJoin(toupdateresult) - .map(c -> { - Result oaf = c._2()._1(); - List countryList = oaf.getCountry(); - if (c._2()._2().isPresent()) { - HashSet countries = new HashSet<>(); - for (Qualifier country : countryList) { - countries.add(country.getClassid()); - } - TypedRow t = c._2()._2().get(); - - for (String country : t.getAccumulator()) { - if (!countries.contains(country)) { - countryList.add(getCountry(country)); - } - - } - oaf.setCountry(countryList); - } - - return oaf; - }) - .map(p -> new ObjectMapper().writeValueAsString(p)) - .saveAsTextFile(outputPath+"/"+type); - } - }