From 2afe9718166aaf22da100c042c1342d4479ce1f4 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 8 Apr 2020 10:49:09 +0200 Subject: [PATCH] new implementation for country propagatio --- .../SparkCountryPropagationJob.java | 77 +++++++++---------- 1 file changed, 36 insertions(+), 41 deletions(-) diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java index ff0fb9603..2ebd818cb 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java @@ -53,14 +53,18 @@ public class SparkCountryPropagationJob { List whitelist = Arrays.asList(parser.get("whitelist").split(";")); List allowedtypes = Arrays.asList(parser.get("allowedtypes").split(";")); + boolean writeUpdates = TRUE.equals(parser.get("writeUpdate")); + boolean saveGraph = TRUE.equals(parser.get("saveGraph")); +// datasource(spark, whitelist, outputPath, inputPath, "true".equals(parser.get("writeUpdate")), +// "true".equals(parser.get("saveGraph")), allowedtypes); +// +// } +// +// +// private static void datasource(SparkSession spark, List whitelist, String outputPath, String inputPath, +// boolean writeUpdates, boolean saveGraph, List allowedtypes){ - datasource(spark, whitelist, outputPath, inputPath); - - } - - - private static void datasource(SparkSession spark, List whitelist, String outputPath, String inputPath){ String whitelisted = ""; for (String i : whitelist){ whitelisted += " OR id = '" + i + "'"; @@ -78,7 +82,7 @@ public class SparkCountryPropagationJob { .map(item -> new ObjectMapper().readValue(item, Organization.class)).rdd(), Encoders.bean(Organization.class)); Dataset dataset = spark.createDataset(sc.textFile(inputPath + "/dataset") - .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.Dataset.class)).rdd(), + .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.Dataset.class)).rdd(), Encoders.bean(eu.dnetlib.dhp.schema.oaf.Dataset.class)); Dataset other = spark.createDataset(sc.textFile(inputPath + "/otherresearchproduct") @@ -96,26 +100,14 @@ public class SparkCountryPropagationJob { datasource.createOrReplaceTempView("datasource"); relation.createOrReplaceTempView("relation"); organization.createOrReplaceTempView("organization"); -// String query = "SELECT source ds, target org, country.classid country " + -// "FROM ( SELECT id " + -// "FROM openaire.datasource " + -// "WHERE datasourcetype.classid = 'pubsrepository::institutional' " + -// "AND (datainfo.deletedbyinference = false " + whitelisted + ") ) d " + -// "JOIN ( SELECT source, target " + -// "FROM openaire.relation " + -// "WHERE relclass = 'provides' " + -// "AND datainfo.deletedbyinference = false ) rel " + -// "ON d.id = rel.source " + -// "JOIN (SELECT id, country " + -// "FROM openaire.organization " + -// "WHERE datainfo.deletedbyinference = false ) o " + -// "ON o.id = rel.target"; String query = "SELECT source ds, target org, country.classid country " + "FROM ( SELECT id " + "FROM datasource " + - "WHERE datasourcetype.classid = 'pubsrepository::institutional' " + - "AND (datainfo.deletedbyinference = false " + whitelisted + ") ) d " + + "WHERE (datainfo.deletedbyinference = false " + whitelisted + ") " + + getConstraintList("datasourcetype.classid = '", allowedtypes) + + // "datasourcetype.classid = 'pubsrepository::institutional' " + + // "AND (datainfo.deletedbyinference = false " + whitelisted + ") ) d " + "JOIN ( SELECT source, target " + "FROM relation " + "WHERE relclass = 'provides' " + @@ -141,23 +133,27 @@ public class SparkCountryPropagationJob { publication.createOrReplaceTempView("publication"); final JavaRDD toupdateresultpublication = propagateOnResult(spark, "publication"); - writeUpdates(toupdateresultsoftware, toupdateresultdataset, toupdateresultother, toupdateresultpublication, outputPath); + if(writeUpdates){ + writeUpdates(toupdateresultsoftware, toupdateresultdataset, toupdateresultother, toupdateresultpublication, outputPath); + } - createUpdateForSoftwareDataset(toupdateresultsoftware, inputPath, spark) - .map(s -> new ObjectMapper().writeValueAsString(s)) - .saveAsTextFile(outputPath + "/software"); + if(saveGraph){ + createUpdateForSoftwareDataset(toupdateresultsoftware, inputPath, spark) + .map(s -> new ObjectMapper().writeValueAsString(s)) + .saveAsTextFile(outputPath + "/software"); - createUpdateForDatasetDataset(toupdateresultdataset,inputPath,spark) - .map(d -> new ObjectMapper().writeValueAsString(d)) - .saveAsTextFile(outputPath + "/dataset"); + createUpdateForDatasetDataset(toupdateresultdataset,inputPath,spark) + .map(d -> new ObjectMapper().writeValueAsString(d)) + .saveAsTextFile(outputPath + "/dataset"); - createUpdateForOtherDataset(toupdateresultother, inputPath, spark) - .map(o -> new ObjectMapper().writeValueAsString(o)) - .saveAsTextFile(outputPath + "/otherresearchproduct"); + createUpdateForOtherDataset(toupdateresultother, inputPath, spark) + .map(o -> new ObjectMapper().writeValueAsString(o)) + .saveAsTextFile(outputPath + "/otherresearchproduct"); - createUpdateForPublicationDataset(toupdateresultpublication, inputPath, spark) - .map(p -> new ObjectMapper().writeValueAsString(p)) - .saveAsTextFile(outputPath + "/publication"); + createUpdateForPublicationDataset(toupdateresultpublication, inputPath, spark) + .map(p -> new ObjectMapper().writeValueAsString(p)) + .saveAsTextFile(outputPath + "/publication"); + } } @@ -276,9 +272,9 @@ public class SparkCountryPropagationJob { String query; query = "SELECT id, inst.collectedfrom.key cf , inst.hostedby.key hb " + "FROM ( SELECT id, instance " + - "FROM " + table + - " WHERE datainfo.deletedbyinference = false) ds " + - "LATERAL VIEW EXPLODE(instance) i AS inst"; + "FROM " + table + + " WHERE datainfo.deletedbyinference = false) ds " + + "LATERAL VIEW EXPLODE(instance) i AS inst"; Dataset cfhb = spark.sql(query); cfhb.createOrReplaceTempView("cfhb"); @@ -333,5 +329,4 @@ public class SparkCountryPropagationJob { -} - +} \ No newline at end of file