From 27f1d3ee8f78e53e9bcd02ce37dad5cb814678e1 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 15 Apr 2020 12:21:05 +0200 Subject: [PATCH] minor refactoring --- .../SparkCountryPropagationJob2.java | 33 +- .../DatasourceOrganization.java | 4 + .../PrepareResultInstRepoAssociation.java | 4 + .../ResultOrganizationSet.java | 4 + ...rkResultToOrganizationFromIstRepoJob2.java | 449 ++++++++++++++++++ .../input_prepareresultorg_parameters.json | 44 ++ .../Result2OrganizationJobTest.java | 4 + 7 files changed, 523 insertions(+), 19 deletions(-) create mode 100644 dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/DatasourceOrganization.java create mode 100644 dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/PrepareResultInstRepoAssociation.java create mode 100644 dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/ResultOrganizationSet.java create mode 100644 dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob2.java create mode 100644 dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/input_prepareresultorg_parameters.json create mode 100644 dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/Result2OrganizationJobTest.java diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob2.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob2.java index dcdefaf10..029be645d 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob2.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob2.java @@ -114,6 +114,7 @@ public class SparkCountryPropagationJob2 { Class resultClazz, String outputPath) { + log.info("Reading Graph table from: {}", inputPath); Dataset result = readPathEntity(spark, inputPath, resultClazz); Dataset> result_pair = result @@ -171,17 +172,17 @@ public class SparkCountryPropagationJob2 { } - private static void createCfHbforresult(SparkSession spark) { - String query; - query = "SELECT id, inst.collectedfrom.key cf , inst.hostedby.key hb " + - "FROM ( SELECT id, instance " + - "FROM result " + - " WHERE datainfo.deletedbyinference = false) ds " + - "LATERAL VIEW EXPLODE(instance) i AS inst"; - Dataset cfhb = spark.sql(query); - cfhb.createOrReplaceTempView("cfhb"); - log.info("cfhb_number : {}", cfhb.count()); - } +// private static void createCfHbforresult(SparkSession spark) { +// String query; +// query = "SELECT id, inst.collectedfrom.key cf , inst.hostedby.key hb " + +// "FROM ( SELECT id, instance " + +// "FROM result " + +// " WHERE datainfo.deletedbyinference = false) ds " + +// "LATERAL VIEW EXPLODE(instance) i AS inst"; +// Dataset cfhb = spark.sql(query); +// cfhb.createOrReplaceTempView("cfhb"); +// //log.info("cfhb_number : {}", cfhb.count()); +// } private static Dataset countryPropagationAssoc(SparkSession spark, @@ -203,18 +204,11 @@ public class SparkCountryPropagationJob2 { " ON hb = dataSourceId ) tmp " + "GROUP BY id"; Dataset potentialUpdates = spark.sql(query); - log.info("potential update number : {}", potentialUpdates.count()); + //log.info("potential update number : {}", potentialUpdates.count()); return potentialUpdates; } - private static Dataset readPathEntity(SparkSession spark, String inputEntityPath, Class resultClazz) { - log.info("Reading Graph table from: {}", inputEntityPath); - return spark - .read() - .textFile(inputEntityPath) - .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, resultClazz), Encoders.bean(resultClazz)); - } private static Dataset readAssocDatasourceCountry(SparkSession spark, String relationPath) { return spark @@ -227,6 +221,7 @@ public class SparkCountryPropagationJob2 { potentialUpdates .toJSON() .write() + .mode(SaveMode.Overwrite) .option("compression", "gzip") .text(outputPath); // map(u -> OBJECT_MAPPER.writeValueAsString(u)) diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/DatasourceOrganization.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/DatasourceOrganization.java new file mode 100644 index 000000000..859d53f76 --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/DatasourceOrganization.java @@ -0,0 +1,4 @@ +package eu.dnetlib.dhp.resulttoorganizationfrominstrepo; + +public class DatasourceOrganization { +} diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/PrepareResultInstRepoAssociation.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/PrepareResultInstRepoAssociation.java new file mode 100644 index 000000000..5a5678e10 --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/PrepareResultInstRepoAssociation.java @@ -0,0 +1,4 @@ +package eu.dnetlib.dhp.resulttoorganizationfrominstrepo; + +public class PrepareResultInstRepoAssociation { +} diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/ResultOrganizationSet.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/ResultOrganizationSet.java new file mode 100644 index 000000000..9c51ceb02 --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/ResultOrganizationSet.java @@ -0,0 +1,4 @@ +package eu.dnetlib.dhp.resulttoorganizationfrominstrepo; + +public class ResultOrganizationSet { +} diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob2.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob2.java new file mode 100644 index 000000000..cdae25e85 --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob2.java @@ -0,0 +1,449 @@ +package eu.dnetlib.dhp.resulttoorganizationfrominstrepo; + +import eu.dnetlib.dhp.TypedRow; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.hadoop.io.Text; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.File; +import java.util.*; + +import eu.dnetlib.dhp.schema.oaf.*; +import scala.Tuple2; + +import static eu.dnetlib.dhp.PropagationConstant.*; + +public class SparkResultToOrganizationFromIstRepoJob { + public static void main(String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkResultToOrganizationFromIstRepoJob.class.getResourceAsStream("/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/input_propagationresulaffiliationfrominstrepo_parameters.json"))); + parser.parseArgument(args); + SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); + final SparkSession spark = SparkSession + .builder() + .appName(SparkResultToOrganizationFromIstRepoJob.class.getSimpleName()) + .master(parser.get("master")) + .config(conf) + .enableHiveSupport() + .getOrCreate(); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final String inputPath = parser.get("sourcePath"); + final String outputPath = "/tmp/provision/propagation/resulttoorganizationfrominstitutionalrepositories"; + + createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); + + org.apache.spark.sql.Dataset datasource = spark.createDataset(sc.textFile(inputPath + "/datasource") + .map(item -> new ObjectMapper().readValue(item, Datasource.class)).rdd(), Encoders.bean(Datasource.class)); + + org.apache.spark.sql.Dataset relation = spark.createDataset(sc.textFile(inputPath + "/relation") + .map(item -> new ObjectMapper().readValue(item, Relation.class)).rdd(), Encoders.bean(Relation.class)); + + org.apache.spark.sql.Dataset organization = spark.createDataset(sc.textFile(inputPath + "/organization") + .map(item -> new ObjectMapper().readValue(item, Organization.class)).rdd(), Encoders.bean(Organization.class)); + + datasource.createOrReplaceTempView("datasource"); + relation.createOrReplaceTempView("relation"); + organization.createOrReplaceTempView("organization"); + + String query = "SELECT source ds, target org " + + "FROM ( SELECT id " + + "FROM datasource " + + "WHERE datasourcetype.classid = 'pubsrepository::institutional' " + + "AND datainfo.deletedbyinference = false ) d " + + "JOIN ( SELECT source, target " + + "FROM relation " + + "WHERE relclass = 'provides' " + + "AND datainfo.deletedbyinference = false ) rel " + + "ON d.id = rel.source "; + + org.apache.spark.sql.Dataset rels = spark.sql(query); + rels.createOrReplaceTempView("rels"); + + org.apache.spark.sql.Dataset dataset = spark.createDataset(sc.textFile(inputPath + "/dataset") + .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.Dataset.class)).rdd(), + Encoders.bean(eu.dnetlib.dhp.schema.oaf.Dataset.class)); + + org.apache.spark.sql.Dataset other = spark.createDataset(sc.textFile(inputPath + "/otherresearchproduct") + .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.OtherResearchProduct.class)).rdd(), + Encoders.bean(eu.dnetlib.dhp.schema.oaf.OtherResearchProduct.class)); + + org.apache.spark.sql.Dataset software = spark.createDataset(sc.textFile(inputPath + "/software") + .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.Software.class)).rdd(), + Encoders.bean(eu.dnetlib.dhp.schema.oaf.Software.class)); + + org.apache.spark.sql.Dataset publication = spark.createDataset(sc.textFile(inputPath + "/publication") + .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.Publication.class)).rdd(), + Encoders.bean(eu.dnetlib.dhp.schema.oaf.Publication.class)); + + + software.createOrReplaceTempView("software"); + final JavaRDD toupdateresultsoftware = propagateOnResult(spark, "software"); + + dataset.createOrReplaceTempView("dataset"); + final JavaRDD toupdateresultdataset = propagateOnResult(spark, "dataset"); + + other.createOrReplaceTempView("other"); + final JavaRDD toupdateresultother = propagateOnResult(spark, "other"); + + publication.createOrReplaceTempView("publication"); + final JavaRDD toupdateresultpublication = propagateOnResult(spark, "publication"); + + writeUpdates(toupdateresultsoftware, toupdateresultdataset, toupdateresultother, toupdateresultpublication, outputPath); + + + query = "Select source resultId, collect_set(target) org_list " + + "from relation " + + "where datainfo.deletedbyinference = false " + + "and relClass = '" + RELATION_RESULT_ORGANIZATION_REL_CLASS +"' " + + "group by source"; + + JavaRDD result_orglist = spark.sql(query).toJavaRDD(); + + JavaPairRDD> toupdateunion = toupdateresultdataset.mapToPair(d -> new Tuple2<>(d.getString(0), d.getList(1))) + .union(toupdateresultother.mapToPair(o -> new Tuple2<>(o.getString(0), o.getList(1)))) + .union(toupdateresultpublication.mapToPair(p -> new Tuple2<>(p.getString(0), p.getList(1)))) + .union(toupdateresultsoftware.mapToPair(s -> new Tuple2<>(s.getString(0), s.getList(1)))); + + JavaRDD new_rels = getNewRels(result_orglist.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1))), + toupdateunion); + + + + sc.textFile(inputPath + "/relation") + .map(item -> new ObjectMapper().readValue(item, Relation.class)) + .union(new_rels) + .map(r -> new ObjectMapper().writeValueAsString(r)) + .saveAsTextFile(outputPath + "/relation"); + + + } + + private static JavaRDD getNewRels(JavaPairRDD> relationOrgs, JavaPairRDD > newRels){ + return newRels + .leftOuterJoin(relationOrgs) + .flatMap(c -> { + List toAddOrgs = new ArrayList<>(); + toAddOrgs.addAll(c._2()._1()); + if (c._2()._2().isPresent()) { + Set originalOrgs = new HashSet<>(); + originalOrgs.addAll(c._2()._2().get()); + for (Object oId : originalOrgs) { + if (toAddOrgs.contains(oId)) { + toAddOrgs.remove(oId); + } + } + } + List relationList = new ArrayList<>(); + String resId = c._1(); + for (Object org : toAddOrgs) { + relationList.add(getRelation((String)org, resId, RELATION_ORGANIZATION_RESULT_REL_CLASS, + RELATION_RESULTORGANIZATION_REL_TYPE, RELATION_RESULTORGANIZATION_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); + relationList.add(getRelation(resId, (String)org, RELATION_RESULT_ORGANIZATION_REL_CLASS, + RELATION_RESULTORGANIZATION_REL_TYPE, RELATION_RESULTORGANIZATION_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); + + } + return relationList.iterator(); + }); + + } + + + private static void writeUpdates(JavaRDD toupdateresultsoftware, JavaRDD toupdateresultdataset, JavaRDD toupdateresultother, JavaRDD toupdateresultpublication, String outputPath) { + createUpdateForRelationWrite(toupdateresultsoftware, outputPath, "update_software"); + createUpdateForRelationWrite(toupdateresultdataset, outputPath, "update_dataset"); + createUpdateForRelationWrite(toupdateresultother, outputPath, "update_other"); + createUpdateForRelationWrite(toupdateresultpublication, outputPath, "update_publication"); + + } + + private static void createUpdateForRelationWrite(JavaRDD toupdaterelation, String outputPath, String update_type) { + toupdaterelation.flatMap(s -> { + List relationList = new ArrayList<>(); + List orgs = s.getList(1); + String resId = s.getString(0); + for (String org : orgs) { + relationList.add(getRelation(org, resId, RELATION_ORGANIZATION_RESULT_REL_CLASS, + RELATION_RESULTORGANIZATION_REL_TYPE, RELATION_RESULTORGANIZATION_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); + relationList.add(getRelation(resId, org, RELATION_RESULT_ORGANIZATION_REL_CLASS, + RELATION_RESULTORGANIZATION_REL_TYPE, RELATION_RESULTORGANIZATION_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); + + } + return relationList.iterator(); + }).map(s -> new ObjectMapper().writeValueAsString(s)).saveAsTextFile(outputPath + "/" + update_type); + } + + private static JavaRDD propagateOnResult(SparkSession spark, String table) { + String query; + query = "SELECT id, inst.collectedfrom.key cf , inst.hostedby.key hb " + + "FROM ( SELECT id, instance " + + "FROM " + table + + " WHERE datainfo.deletedbyinference = false) ds " + + "LATERAL VIEW EXPLODE(instance) i AS inst"; + org.apache.spark.sql.Dataset cfhb = spark.sql(query); + cfhb.createOrReplaceTempView("cfhb"); + + return organizationPropagationAssoc(spark, "cfhb").toJavaRDD(); + + } + + private static org.apache.spark.sql.Dataset organizationPropagationAssoc(SparkSession spark, String cfhbTable){ + String query = "SELECT id, collect_set(org) org "+ + "FROM ( SELECT id, org " + + "FROM rels " + + "JOIN " + cfhbTable + + " ON cf = ds " + + "UNION ALL " + + "SELECT id , org " + + "FROM rels " + + "JOIN " + cfhbTable + + " ON hb = ds ) tmp " + + "GROUP BY id"; + return spark.sql(query); + } + +} + +//package eu.dnetlib.dhp.resulttoorganizationfrominstrepo; +// +//import eu.dnetlib.dhp.TypedRow; +//import eu.dnetlib.dhp.application.ArgumentApplicationParser; +//import org.apache.commons.io.IOUtils; +//import org.apache.hadoop.fs.FileSystem; +//import org.apache.spark.SparkConf; +//import org.apache.spark.api.java.JavaPairRDD; +//import org.apache.spark.api.java.JavaRDD; +//import org.apache.spark.api.java.JavaSparkContext; +//import org.apache.spark.sql.Encoders; +//import org.apache.spark.sql.Row; +//import org.apache.spark.sql.SparkSession; +//import org.apache.hadoop.io.Text; +//import com.fasterxml.jackson.databind.ObjectMapper; +//import java.io.File; +//import java.util.ArrayList; +//import java.util.Arrays; +//import java.util.List; +//import java.util.Set; +// +//import eu.dnetlib.dhp.schema.oaf.*; +//import scala.Tuple2; +// +//import static eu.dnetlib.dhp.PropagationConstant.*; +// +//public class SparkResultToOrganizationFromIstRepoJob { +// public static void main(String[] args) throws Exception { +// +// final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkResultToOrganizationFromIstRepoJob.class.getResourceAsStream("/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/input_propagationresulaffiliationfrominstrepo_parameters.json"))); +// parser.parseArgument(args); +// SparkConf conf = new SparkConf(); +// conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); +// final SparkSession spark = SparkSession +// .builder() +// .appName(SparkResultToOrganizationFromIstRepoJob.class.getSimpleName()) +// .master(parser.get("master")) +// .config(conf) +// .enableHiveSupport() +// .getOrCreate(); +// +// final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); +// final String inputPath = parser.get("sourcePath"); +// final String outputPath = "/tmp/provision/propagation/resulttoorganizationfrominstitutionalrepositories"; +// +// createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); +// +// org.apache.spark.sql.Dataset datasource = spark.createDataset(sc.textFile(inputPath + "/datasource") +// .map(item -> new ObjectMapper().readValue(item, Datasource.class)) +// .rdd(), Encoders.bean(Datasource.class)); +// +// JavaRDD relation_rdd_all = sc.textFile(inputPath + "/relation") +// .map(item -> new ObjectMapper().readValue(item, Relation.class)); +// JavaRDD relation_rdd = relation_rdd_all.filter(r -> !r.getDataInfo().getDeletedbyinference()).cache(); +// +// org.apache.spark.sql.Dataset relation = spark.createDataset(relation_rdd.rdd(), Encoders.bean(Relation.class)); +// +// org.apache.spark.sql.Dataset organization = spark.createDataset(sc.textFile(inputPath + "/organziation") +// .map(item -> new ObjectMapper().readValue(item, Organization.class)).rdd(), Encoders.bean(Organization.class)); +// +// datasource.createOrReplaceTempView("datasource"); +// relation.createOrReplaceTempView("relation"); +// organization.createOrReplaceTempView("organization"); +// +// String query = "SELECT source ds, target org " + +// "FROM ( SELECT id " + +// "FROM datasource " + +// "WHERE datasourcetype.classid = 'pubsrepository::institutional' " + +// "AND datainfo.deletedbyinference = false ) d " + +// "JOIN ( SELECT source, target " + +// "FROM relation " + +// "WHERE relclass = 'provides' " + +// "AND datainfo.deletedbyinference = false ) rel " + +// "ON d.id = rel.source "; +// +// org.apache.spark.sql.Dataset rels = spark.sql(query); +// rels.createOrReplaceTempView("rels"); +// +// org.apache.spark.sql.Dataset dataset = spark.createDataset(sc.textFile(inputPath + "/dataset") +// .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.Dataset.class)).rdd(), +// Encoders.bean(eu.dnetlib.dhp.schema.oaf.Dataset.class)); +// +// org.apache.spark.sql.Dataset other = spark.createDataset(sc.textFile(inputPath + "/otherresearchproduct") +// .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.OtherResearchProduct.class)).rdd(), +// Encoders.bean(eu.dnetlib.dhp.schema.oaf.OtherResearchProduct.class)); +// +// org.apache.spark.sql.Dataset software = spark.createDataset(sc.textFile(inputPath + "/software") +// .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.Software.class)).rdd(), +// Encoders.bean(eu.dnetlib.dhp.schema.oaf.Software.class)); +// +// org.apache.spark.sql.Dataset publication = spark.createDataset(sc.textFile(inputPath + "/publication") +// .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.Publication.class)).rdd(), +// Encoders.bean(eu.dnetlib.dhp.schema.oaf.Publication.class)); +// +// +// software.createOrReplaceTempView("software"); +// final JavaRDD toupdateresultsoftware = propagateOnResult(spark, "software"); +// +// dataset.createOrReplaceTempView("dataset"); +// final JavaRDD toupdateresultdataset = propagateOnResult(spark, "dataset"); +// +// other.createOrReplaceTempView("other"); +// final JavaRDD toupdateresultother = propagateOnResult(spark, "other"); +// +// publication.createOrReplaceTempView("publication"); +// final JavaRDD toupdateresultpublication = propagateOnResult(spark, "publication"); +// +// writeUpdates(toupdateresultsoftware, toupdateresultdataset, toupdateresultother, toupdateresultpublication, outputPath); +// +// JavaPairRDD relation_rdd_pair = relation_rdd +// .filter(r -> RELATION_RESULT_ORGANIZATION_REL_CLASS.equals(r.getRelClass())) +// .map(r -> { +// TypedRow tp = new TypedRow(); +// tp.setSourceId(r.getSource()); +// tp.add(r.getTarget()); +// return tp; +// }).mapToPair(toPair()) +// .reduceByKey((a, b) -> { +// if (a == null) { +// return b; +// } +// if (b == null) { +// return a; +// } +// +// a.addAll(b.getAccumulator()); +// return a; +// }).cache(); +// +// +// JavaRDD new_rels = getNewRels(relation_rdd_pair, +// toupdateresultother.mapToPair(s -> new Tuple2<>(s.getString(0), s.getList(1)))) +// .union(getNewRels(relation_rdd_pair, +// toupdateresultsoftware.mapToPair(s -> new Tuple2<>(s.getString(0), s.getList(1))))) +// .union(getNewRels(relation_rdd_pair, +// toupdateresultdataset.mapToPair(s -> new Tuple2<>(s.getString(0), s.getList(1))))) +// .union(getNewRels(relation_rdd_pair, +// toupdateresultpublication.mapToPair(s -> new Tuple2<>(s.getString(0), s.getList(1))))); +// +// +// relation_rdd_all.union(new_rels).map(r -> new ObjectMapper().writeValueAsString(r)) +// .saveAsTextFile(outputPath + "/relation"); +// +// } +// +// private static JavaRDD getNewRels(JavaPairRDD relation_rdd_pair, JavaPairRDD > newRels){ +// return newRels//.mapToPair(s -> new Tuple2<>(s.getString(0), s.getList(1))) +// .leftOuterJoin(relation_rdd_pair) +// .flatMap(c -> { +// List toAddOrgs = c._2()._1(); +// if (c._2()._2().isPresent()) { +// Set originalOrgs = c._2()._2().get().getAccumulator(); +// for (String oId : toAddOrgs) { +// if (originalOrgs.contains(oId)) { +// toAddOrgs.remove(oId); +// } +// } +// } +// List relationList = new ArrayList<>(); +// String resId = c._1(); +// for (String org : toAddOrgs) { +// relationList.add(getRelation(org, resId, RELATION_ORGANIZATION_RESULT_REL_CLASS, +// RELATION_RESULTORGANIZATION_REL_TYPE, RELATION_RESULTORGANIZATION_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, +// PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); +// relationList.add(getRelation(resId, org, RELATION_RESULT_ORGANIZATION_REL_CLASS, +// RELATION_RESULTORGANIZATION_REL_TYPE, RELATION_RESULTORGANIZATION_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, +// PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); +// +// } +// return relationList.iterator(); +// }); +// +// } +// +// private static void writeUpdates(JavaRDD toupdateresultsoftware, JavaRDD toupdateresultdataset, JavaRDD toupdateresultother, JavaRDD toupdateresultpublication, String outputPath) { +// createUpdateForRelationWrite(toupdateresultsoftware, outputPath, "update_software"); +// createUpdateForRelationWrite(toupdateresultdataset, outputPath, "update_dataset"); +// createUpdateForRelationWrite(toupdateresultother, outputPath, "update_other"); +// createUpdateForRelationWrite(toupdateresultpublication, outputPath, "update_publication"); +// +// } +// +// private static void createUpdateForRelationWrite(JavaRDD toupdaterelation, String outputPath, String update_type) { +// toupdaterelation.flatMap(s -> { +// List relationList = new ArrayList<>(); +// List orgs = s.getList(1); +// String resId = s.getString(0); +// for (String org : orgs) { +// relationList.add(getRelation(org, resId, RELATION_ORGANIZATION_RESULT_REL_CLASS, +// RELATION_RESULTORGANIZATION_REL_TYPE, RELATION_RESULTORGANIZATION_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, +// PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); +// relationList.add(getRelation(resId, org, RELATION_RESULT_ORGANIZATION_REL_CLASS, +// RELATION_RESULTORGANIZATION_REL_TYPE, RELATION_RESULTORGANIZATION_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, +// PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); +// +// } +// return relationList.iterator(); +// }).map(s -> new ObjectMapper().writeValueAsString(s)).saveAsTextFile(outputPath + "/" + update_type); +// } +// +// private static JavaRDD propagateOnResult(SparkSession spark, String table) { +// String query; +// query = "SELECT id, inst.collectedfrom.key cf , inst.hostedby.key hb " + +// "FROM ( SELECT id, instance " + +// "FROM " + table + +// " WHERE datainfo.deletedbyinference = false) ds " + +// "LATERAL VIEW EXPLODE(instance) i AS inst"; +// org.apache.spark.sql.Dataset cfhb = spark.sql(query); +// cfhb.createOrReplaceTempView("cfhb"); +// +// return organizationPropagationAssoc(spark, "cfhb").toJavaRDD(); +// +// } +// +// private static org.apache.spark.sql.Dataset organizationPropagationAssoc(SparkSession spark, String cfhbTable){ +// String query = "SELECT id, collect_set(org) org "+ +// "FROM ( SELECT id, org " + +// "FROM rels " + +// "JOIN " + cfhbTable + +// " ON cf = ds " + +// "UNION ALL " + +// "SELECT id , org " + +// "FROM rels " + +// "JOIN " + cfhbTable + +// " ON hb = ds ) tmp " + +// "GROUP BY id"; +// return spark.sql(query); +// } +// +//} diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/input_prepareresultorg_parameters.json b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/input_prepareresultorg_parameters.json new file mode 100644 index 000000000..8d2133075 --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/input_prepareresultorg_parameters.json @@ -0,0 +1,44 @@ +[ + { + "paramName":"s", + "paramLongName":"sourcePath", + "paramDescription": "the path of the sequencial file to read", + "paramRequired": true + }, + { + "paramName":"h", + "paramLongName":"hive_metastore_uris", + "paramDescription": "the hive metastore uris", + "paramRequired": true + }, + { + "paramName":"wu", + "paramLongName":"writeUpdate", + "paramDescription": "true if the update must be writte. No double check if information is already present", + "paramRequired": true + }, + { + "paramName":"sg", + "paramLongName":"saveGraph", + "paramDescription": "true if the new version of the graph must be saved", + "paramRequired": true + }, + { + "paramName":"dop", + "paramLongName":"datasourceOrganizationPath", + "paramDescription": "path where to store/find association from datasource and organization", + "paramRequired": true + }, + { + "paramName":"alp", + "paramLongName":"alreadyLinkedPath", + "paramDescription": "path where to store/find already linked results and organizations", + "paramRequired": true + }, + { + "paramName": "ssm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "the path where prepared info have been stored", + "paramRequired": false + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/Result2OrganizationJobTest.java b/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/Result2OrganizationJobTest.java new file mode 100644 index 000000000..bc2142314 --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/Result2OrganizationJobTest.java @@ -0,0 +1,4 @@ +package eu.dnetlib.dhp.resulttoorganizationfrominstrepo; + +public class Result2OrganizationJobTest { +}