diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java index 7cb3bd839..ff0fb9603 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java @@ -59,35 +59,87 @@ public class SparkCountryPropagationJob { } + private static void datasource(SparkSession spark, List whitelist, String outputPath, String inputPath){ String whitelisted = ""; for (String i : whitelist){ whitelisted += " OR id = '" + i + "'"; } + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + + Dataset datasource = spark.createDataset(sc.textFile(inputPath + "/datasource") + .map(item -> new ObjectMapper().readValue(item, Datasource.class)).rdd(), Encoders.bean(Datasource.class)); + + Dataset relation = spark.createDataset(sc.textFile(inputPath + "/relation") + .map(item -> new ObjectMapper().readValue(item, Relation.class)).rdd(), Encoders.bean(Relation.class)); + + Dataset organization = spark.createDataset(sc.textFile(inputPath + "/organization") + .map(item -> new ObjectMapper().readValue(item, Organization.class)).rdd(), Encoders.bean(Organization.class)); + + Dataset dataset = spark.createDataset(sc.textFile(inputPath + "/dataset") + .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.Dataset.class)).rdd(), + Encoders.bean(eu.dnetlib.dhp.schema.oaf.Dataset.class)); + + Dataset other = spark.createDataset(sc.textFile(inputPath + "/otherresearchproduct") + .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.OtherResearchProduct.class)).rdd(), + Encoders.bean(eu.dnetlib.dhp.schema.oaf.OtherResearchProduct.class)); + + Dataset software = spark.createDataset(sc.textFile(inputPath + "/software") + .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.Software.class)).rdd(), + Encoders.bean(eu.dnetlib.dhp.schema.oaf.Software.class)); + + Dataset publication = spark.createDataset(sc.textFile(inputPath + "/publication") + .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.Publication.class)).rdd(), + Encoders.bean(eu.dnetlib.dhp.schema.oaf.Publication.class)); + + datasource.createOrReplaceTempView("datasource"); + relation.createOrReplaceTempView("relation"); + organization.createOrReplaceTempView("organization"); +// String query = "SELECT source ds, target org, country.classid country " + +// "FROM ( SELECT id " + +// "FROM openaire.datasource " + +// "WHERE datasourcetype.classid = 'pubsrepository::institutional' " + +// "AND (datainfo.deletedbyinference = false " + whitelisted + ") ) d " + +// "JOIN ( SELECT source, target " + +// "FROM openaire.relation " + +// "WHERE relclass = 'provides' " + +// "AND datainfo.deletedbyinference = false ) rel " + +// "ON d.id = rel.source " + +// "JOIN (SELECT id, country " + +// "FROM openaire.organization " + +// "WHERE datainfo.deletedbyinference = false ) o " + +// "ON o.id = rel.target"; String query = "SELECT source ds, target org, country.classid country " + - "FROM ( SELECT id " + - "FROM openaire.datasource " + - "WHERE datasourcetype.classid = 'pubsrepository::institutional' " + - "AND (datainfo.deletedbyinference = false " + whitelisted + ") ) d " + - "JOIN ( SELECT source, target " + - "FROM openaire.relation " + - "WHERE relclass = 'provides' " + - "AND datainfo.deletedbyinference = false ) rel " + - "ON d.id = rel.source " + - "JOIN (SELECT id, country " + - "FROM openaire.organization " + - "WHERE datainfo.deletedbyinference = false ) o " + - "ON o.id = rel.target"; + "FROM ( SELECT id " + + "FROM datasource " + + "WHERE datasourcetype.classid = 'pubsrepository::institutional' " + + "AND (datainfo.deletedbyinference = false " + whitelisted + ") ) d " + + "JOIN ( SELECT source, target " + + "FROM relation " + + "WHERE relclass = 'provides' " + + "AND datainfo.deletedbyinference = false ) rel " + + "ON d.id = rel.source " + + "JOIN (SELECT id, country " + + "FROM organization " + + "WHERE datainfo.deletedbyinference = false ) o " + + "ON o.id = rel.target"; Dataset rels = spark.sql(query); rels.createOrReplaceTempView("rels"); + software.createOrReplaceTempView("software"); + final JavaRDD toupdateresultsoftware = propagateOnResult(spark, "software"); - final JavaRDD toupdateresultsoftware = propagateOnResult(spark, "openaire.software"); - final JavaRDD toupdateresultdataset = propagateOnResult(spark, "openaire.dataset"); - final JavaRDD toupdateresultother = propagateOnResult(spark, "openaire.otherresearchproduct"); - final JavaRDD toupdateresultpublication = propagateOnResult(spark, "openaire.publication"); + dataset.createOrReplaceTempView("dataset"); + final JavaRDD toupdateresultdataset = propagateOnResult(spark, "dataset"); + + other.createOrReplaceTempView("other"); + final JavaRDD toupdateresultother = propagateOnResult(spark, "other"); + + publication.createOrReplaceTempView("publication"); + final JavaRDD toupdateresultpublication = propagateOnResult(spark, "publication"); writeUpdates(toupdateresultsoftware, toupdateresultdataset, toupdateresultother, toupdateresultpublication, outputPath);