diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob.java index 8d47bdd903..cdae25e85c 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob.java @@ -14,9 +14,7 @@ import org.apache.spark.sql.SparkSession; import org.apache.hadoop.io.Text; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.File; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; +import java.util.*; import eu.dnetlib.dhp.schema.oaf.*; import scala.Tuple2; @@ -50,7 +48,7 @@ public class SparkResultToOrganizationFromIstRepoJob { org.apache.spark.sql.Dataset relation = spark.createDataset(sc.textFile(inputPath + "/relation") .map(item -> new ObjectMapper().readValue(item, Relation.class)).rdd(), Encoders.bean(Relation.class)); - org.apache.spark.sql.Dataset organization = spark.createDataset(sc.textFile(inputPath + "/organziation") + org.apache.spark.sql.Dataset organization = spark.createDataset(sc.textFile(inputPath + "/organization") .map(item -> new ObjectMapper().readValue(item, Organization.class)).rdd(), Encoders.bean(Organization.class)); datasource.createOrReplaceTempView("datasource"); @@ -102,8 +100,66 @@ public class SparkResultToOrganizationFromIstRepoJob { writeUpdates(toupdateresultsoftware, toupdateresultdataset, toupdateresultother, toupdateresultpublication, outputPath); + + query = "Select source resultId, collect_set(target) org_list " + + "from relation " + + "where datainfo.deletedbyinference = false " + + "and relClass = '" + RELATION_RESULT_ORGANIZATION_REL_CLASS +"' " + + "group by source"; + + JavaRDD result_orglist = spark.sql(query).toJavaRDD(); + + JavaPairRDD> toupdateunion = toupdateresultdataset.mapToPair(d -> new Tuple2<>(d.getString(0), d.getList(1))) + .union(toupdateresultother.mapToPair(o -> new Tuple2<>(o.getString(0), o.getList(1)))) + .union(toupdateresultpublication.mapToPair(p -> new Tuple2<>(p.getString(0), p.getList(1)))) + .union(toupdateresultsoftware.mapToPair(s -> new Tuple2<>(s.getString(0), s.getList(1)))); + + JavaRDD new_rels = getNewRels(result_orglist.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1))), + toupdateunion); + + + + sc.textFile(inputPath + "/relation") + .map(item -> new ObjectMapper().readValue(item, Relation.class)) + .union(new_rels) + .map(r -> new ObjectMapper().writeValueAsString(r)) + .saveAsTextFile(outputPath + "/relation"); + + } + private static JavaRDD getNewRels(JavaPairRDD> relationOrgs, JavaPairRDD > newRels){ + return newRels + .leftOuterJoin(relationOrgs) + .flatMap(c -> { + List toAddOrgs = new ArrayList<>(); + toAddOrgs.addAll(c._2()._1()); + if (c._2()._2().isPresent()) { + Set originalOrgs = new HashSet<>(); + originalOrgs.addAll(c._2()._2().get()); + for (Object oId : originalOrgs) { + if (toAddOrgs.contains(oId)) { + toAddOrgs.remove(oId); + } + } + } + List relationList = new ArrayList<>(); + String resId = c._1(); + for (Object org : toAddOrgs) { + relationList.add(getRelation((String)org, resId, RELATION_ORGANIZATION_RESULT_REL_CLASS, + RELATION_RESULTORGANIZATION_REL_TYPE, RELATION_RESULTORGANIZATION_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); + relationList.add(getRelation(resId, (String)org, RELATION_RESULT_ORGANIZATION_REL_CLASS, + RELATION_RESULTORGANIZATION_REL_TYPE, RELATION_RESULTORGANIZATION_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); + + } + return relationList.iterator(); + }); + + } + + private static void writeUpdates(JavaRDD toupdateresultsoftware, JavaRDD toupdateresultdataset, JavaRDD toupdateresultother, JavaRDD toupdateresultpublication, String outputPath) { createUpdateForRelationWrite(toupdateresultsoftware, outputPath, "update_software"); createUpdateForRelationWrite(toupdateresultdataset, outputPath, "update_dataset"); @@ -160,3 +216,234 @@ public class SparkResultToOrganizationFromIstRepoJob { } } + +//package eu.dnetlib.dhp.resulttoorganizationfrominstrepo; +// +//import eu.dnetlib.dhp.TypedRow; +//import eu.dnetlib.dhp.application.ArgumentApplicationParser; +//import org.apache.commons.io.IOUtils; +//import org.apache.hadoop.fs.FileSystem; +//import org.apache.spark.SparkConf; +//import org.apache.spark.api.java.JavaPairRDD; +//import org.apache.spark.api.java.JavaRDD; +//import org.apache.spark.api.java.JavaSparkContext; +//import org.apache.spark.sql.Encoders; +//import org.apache.spark.sql.Row; +//import org.apache.spark.sql.SparkSession; +//import org.apache.hadoop.io.Text; +//import com.fasterxml.jackson.databind.ObjectMapper; +//import java.io.File; +//import java.util.ArrayList; +//import java.util.Arrays; +//import java.util.List; +//import java.util.Set; +// +//import eu.dnetlib.dhp.schema.oaf.*; +//import scala.Tuple2; +// +//import static eu.dnetlib.dhp.PropagationConstant.*; +// +//public class SparkResultToOrganizationFromIstRepoJob { +// public static void main(String[] args) throws Exception { +// +// final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkResultToOrganizationFromIstRepoJob.class.getResourceAsStream("/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/input_propagationresulaffiliationfrominstrepo_parameters.json"))); +// parser.parseArgument(args); +// SparkConf conf = new SparkConf(); +// conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); +// final SparkSession spark = SparkSession +// .builder() +// .appName(SparkResultToOrganizationFromIstRepoJob.class.getSimpleName()) +// .master(parser.get("master")) +// .config(conf) +// .enableHiveSupport() +// .getOrCreate(); +// +// final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); +// final String inputPath = parser.get("sourcePath"); +// final String outputPath = "/tmp/provision/propagation/resulttoorganizationfrominstitutionalrepositories"; +// +// createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); +// +// org.apache.spark.sql.Dataset datasource = spark.createDataset(sc.textFile(inputPath + "/datasource") +// .map(item -> new ObjectMapper().readValue(item, Datasource.class)) +// .rdd(), Encoders.bean(Datasource.class)); +// +// JavaRDD relation_rdd_all = sc.textFile(inputPath + "/relation") +// .map(item -> new ObjectMapper().readValue(item, Relation.class)); +// JavaRDD relation_rdd = relation_rdd_all.filter(r -> !r.getDataInfo().getDeletedbyinference()).cache(); +// +// org.apache.spark.sql.Dataset relation = spark.createDataset(relation_rdd.rdd(), Encoders.bean(Relation.class)); +// +// org.apache.spark.sql.Dataset organization = spark.createDataset(sc.textFile(inputPath + "/organziation") +// .map(item -> new ObjectMapper().readValue(item, Organization.class)).rdd(), Encoders.bean(Organization.class)); +// +// datasource.createOrReplaceTempView("datasource"); +// relation.createOrReplaceTempView("relation"); +// organization.createOrReplaceTempView("organization"); +// +// String query = "SELECT source ds, target org " + +// "FROM ( SELECT id " + +// "FROM datasource " + +// "WHERE datasourcetype.classid = 'pubsrepository::institutional' " + +// "AND datainfo.deletedbyinference = false ) d " + +// "JOIN ( SELECT source, target " + +// "FROM relation " + +// "WHERE relclass = 'provides' " + +// "AND datainfo.deletedbyinference = false ) rel " + +// "ON d.id = rel.source "; +// +// org.apache.spark.sql.Dataset rels = spark.sql(query); +// rels.createOrReplaceTempView("rels"); +// +// org.apache.spark.sql.Dataset dataset = spark.createDataset(sc.textFile(inputPath + "/dataset") +// .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.Dataset.class)).rdd(), +// Encoders.bean(eu.dnetlib.dhp.schema.oaf.Dataset.class)); +// +// org.apache.spark.sql.Dataset other = spark.createDataset(sc.textFile(inputPath + "/otherresearchproduct") +// .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.OtherResearchProduct.class)).rdd(), +// Encoders.bean(eu.dnetlib.dhp.schema.oaf.OtherResearchProduct.class)); +// +// org.apache.spark.sql.Dataset software = spark.createDataset(sc.textFile(inputPath + "/software") +// .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.Software.class)).rdd(), +// Encoders.bean(eu.dnetlib.dhp.schema.oaf.Software.class)); +// +// org.apache.spark.sql.Dataset publication = spark.createDataset(sc.textFile(inputPath + "/publication") +// .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.Publication.class)).rdd(), +// Encoders.bean(eu.dnetlib.dhp.schema.oaf.Publication.class)); +// +// +// software.createOrReplaceTempView("software"); +// final JavaRDD toupdateresultsoftware = propagateOnResult(spark, "software"); +// +// dataset.createOrReplaceTempView("dataset"); +// final JavaRDD toupdateresultdataset = propagateOnResult(spark, "dataset"); +// +// other.createOrReplaceTempView("other"); +// final JavaRDD toupdateresultother = propagateOnResult(spark, "other"); +// +// publication.createOrReplaceTempView("publication"); +// final JavaRDD toupdateresultpublication = propagateOnResult(spark, "publication"); +// +// writeUpdates(toupdateresultsoftware, toupdateresultdataset, toupdateresultother, toupdateresultpublication, outputPath); +// +// JavaPairRDD relation_rdd_pair = relation_rdd +// .filter(r -> RELATION_RESULT_ORGANIZATION_REL_CLASS.equals(r.getRelClass())) +// .map(r -> { +// TypedRow tp = new TypedRow(); +// tp.setSourceId(r.getSource()); +// tp.add(r.getTarget()); +// return tp; +// }).mapToPair(toPair()) +// .reduceByKey((a, b) -> { +// if (a == null) { +// return b; +// } +// if (b == null) { +// return a; +// } +// +// a.addAll(b.getAccumulator()); +// return a; +// }).cache(); +// +// +// JavaRDD new_rels = getNewRels(relation_rdd_pair, +// toupdateresultother.mapToPair(s -> new Tuple2<>(s.getString(0), s.getList(1)))) +// .union(getNewRels(relation_rdd_pair, +// toupdateresultsoftware.mapToPair(s -> new Tuple2<>(s.getString(0), s.getList(1))))) +// .union(getNewRels(relation_rdd_pair, +// toupdateresultdataset.mapToPair(s -> new Tuple2<>(s.getString(0), s.getList(1))))) +// .union(getNewRels(relation_rdd_pair, +// toupdateresultpublication.mapToPair(s -> new Tuple2<>(s.getString(0), s.getList(1))))); +// +// +// relation_rdd_all.union(new_rels).map(r -> new ObjectMapper().writeValueAsString(r)) +// .saveAsTextFile(outputPath + "/relation"); +// +// } +// +// private static JavaRDD getNewRels(JavaPairRDD relation_rdd_pair, JavaPairRDD > newRels){ +// return newRels//.mapToPair(s -> new Tuple2<>(s.getString(0), s.getList(1))) +// .leftOuterJoin(relation_rdd_pair) +// .flatMap(c -> { +// List toAddOrgs = c._2()._1(); +// if (c._2()._2().isPresent()) { +// Set originalOrgs = c._2()._2().get().getAccumulator(); +// for (String oId : toAddOrgs) { +// if (originalOrgs.contains(oId)) { +// toAddOrgs.remove(oId); +// } +// } +// } +// List relationList = new ArrayList<>(); +// String resId = c._1(); +// for (String org : toAddOrgs) { +// relationList.add(getRelation(org, resId, RELATION_ORGANIZATION_RESULT_REL_CLASS, +// RELATION_RESULTORGANIZATION_REL_TYPE, RELATION_RESULTORGANIZATION_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, +// PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); +// relationList.add(getRelation(resId, org, RELATION_RESULT_ORGANIZATION_REL_CLASS, +// RELATION_RESULTORGANIZATION_REL_TYPE, RELATION_RESULTORGANIZATION_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, +// PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); +// +// } +// return relationList.iterator(); +// }); +// +// } +// +// private static void writeUpdates(JavaRDD toupdateresultsoftware, JavaRDD toupdateresultdataset, JavaRDD toupdateresultother, JavaRDD toupdateresultpublication, String outputPath) { +// createUpdateForRelationWrite(toupdateresultsoftware, outputPath, "update_software"); +// createUpdateForRelationWrite(toupdateresultdataset, outputPath, "update_dataset"); +// createUpdateForRelationWrite(toupdateresultother, outputPath, "update_other"); +// createUpdateForRelationWrite(toupdateresultpublication, outputPath, "update_publication"); +// +// } +// +// private static void createUpdateForRelationWrite(JavaRDD toupdaterelation, String outputPath, String update_type) { +// toupdaterelation.flatMap(s -> { +// List relationList = new ArrayList<>(); +// List orgs = s.getList(1); +// String resId = s.getString(0); +// for (String org : orgs) { +// relationList.add(getRelation(org, resId, RELATION_ORGANIZATION_RESULT_REL_CLASS, +// RELATION_RESULTORGANIZATION_REL_TYPE, RELATION_RESULTORGANIZATION_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, +// PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); +// relationList.add(getRelation(resId, org, RELATION_RESULT_ORGANIZATION_REL_CLASS, +// RELATION_RESULTORGANIZATION_REL_TYPE, RELATION_RESULTORGANIZATION_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, +// PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); +// +// } +// return relationList.iterator(); +// }).map(s -> new ObjectMapper().writeValueAsString(s)).saveAsTextFile(outputPath + "/" + update_type); +// } +// +// private static JavaRDD propagateOnResult(SparkSession spark, String table) { +// String query; +// query = "SELECT id, inst.collectedfrom.key cf , inst.hostedby.key hb " + +// "FROM ( SELECT id, instance " + +// "FROM " + table + +// " WHERE datainfo.deletedbyinference = false) ds " + +// "LATERAL VIEW EXPLODE(instance) i AS inst"; +// org.apache.spark.sql.Dataset cfhb = spark.sql(query); +// cfhb.createOrReplaceTempView("cfhb"); +// +// return organizationPropagationAssoc(spark, "cfhb").toJavaRDD(); +// +// } +// +// private static org.apache.spark.sql.Dataset organizationPropagationAssoc(SparkSession spark, String cfhbTable){ +// String query = "SELECT id, collect_set(org) org "+ +// "FROM ( SELECT id, org " + +// "FROM rels " + +// "JOIN " + cfhbTable + +// " ON cf = ds " + +// "UNION ALL " + +// "SELECT id , org " + +// "FROM rels " + +// "JOIN " + cfhbTable + +// " ON hb = ds ) tmp " + +// "GROUP BY id"; +// return spark.sql(query); +// } +// +//}