diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob2.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob2.java index cdae25e85..7826f598b 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob2.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob2.java @@ -1,178 +1,202 @@ package eu.dnetlib.dhp.resulttoorganizationfrominstrepo; -import eu.dnetlib.dhp.TypedRow; +import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.countrypropagation.DatasourceCountry; +import eu.dnetlib.dhp.countrypropagation.ResultCountrySet; +import eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob2; +import eu.dnetlib.dhp.schema.oaf.*; import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; -import org.apache.hadoop.io.Text; -import com.fasterxml.jackson.databind.ObjectMapper; -import java.io.File; -import java.util.*; - -import eu.dnetlib.dhp.schema.oaf.*; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.*; +import org.apache.spark.sql.Dataset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; import scala.Tuple2; -import static eu.dnetlib.dhp.PropagationConstant.*; +import java.util.*; + +import static eu.dnetlib.dhp.PropagationConstant.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; + +public class SparkResultToOrganizationFromIstRepoJob2 { + + private static final Logger log = LoggerFactory.getLogger(SparkResultToOrganizationFromIstRepoJob2.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); -public class SparkResultToOrganizationFromIstRepoJob { public static void main(String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkResultToOrganizationFromIstRepoJob.class.getResourceAsStream("/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/input_propagationresulaffiliationfrominstrepo_parameters.json"))); + String jsonConfiguration = IOUtils.toString(SparkCountryPropagationJob2.class + .getResourceAsStream("/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/input_propagationresulaffiliationfrominstrepo_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = isSparkSessionManaged(parser); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + final String datasourceorganization = parser.get("datasourceOrganizationPath"); + log.info("datasourceOrganizationPath: {}", datasourceorganization); + + final String alreadylinked = parser.get("alreadyLinkedPath"); + log.info("alreadyLinkedPath: {}", alreadylinked); + + final String resultorganizationsetpath = parser.get("resultOrganizationsetPath"); + log.info("resultOrganizationsetPath: {}", resultorganizationsetpath); + + final String resultClassName = parser.get("resultTableName"); + log.info("resultTableName: {}", resultClassName); + + final String resultType = resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase(); + log.info("resultType: {}", resultType); + + final Boolean writeUpdates = Optional + .ofNullable(parser.get("writeUpdate")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("writeUpdate: {}", writeUpdates); + + final Boolean saveGraph = Optional + .ofNullable(parser.get("saveGraph")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("saveGraph: {}", saveGraph); + + Class resultClazz = (Class) Class.forName(resultClassName); + SparkConf conf = new SparkConf(); conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); - final SparkSession spark = SparkSession - .builder() - .appName(SparkResultToOrganizationFromIstRepoJob.class.getSimpleName()) - .master(parser.get("master")) - .config(conf) - .enableHiveSupport() - .getOrCreate(); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - final String inputPath = parser.get("sourcePath"); - final String outputPath = "/tmp/provision/propagation/resulttoorganizationfrominstitutionalrepositories"; - - createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); - - org.apache.spark.sql.Dataset datasource = spark.createDataset(sc.textFile(inputPath + "/datasource") - .map(item -> new ObjectMapper().readValue(item, Datasource.class)).rdd(), Encoders.bean(Datasource.class)); - - org.apache.spark.sql.Dataset relation = spark.createDataset(sc.textFile(inputPath + "/relation") - .map(item -> new ObjectMapper().readValue(item, Relation.class)).rdd(), Encoders.bean(Relation.class)); - - org.apache.spark.sql.Dataset organization = spark.createDataset(sc.textFile(inputPath + "/organization") - .map(item -> new ObjectMapper().readValue(item, Organization.class)).rdd(), Encoders.bean(Organization.class)); - - datasource.createOrReplaceTempView("datasource"); - relation.createOrReplaceTempView("relation"); - organization.createOrReplaceTempView("organization"); - - String query = "SELECT source ds, target org " + - "FROM ( SELECT id " + - "FROM datasource " + - "WHERE datasourcetype.classid = 'pubsrepository::institutional' " + - "AND datainfo.deletedbyinference = false ) d " + - "JOIN ( SELECT source, target " + - "FROM relation " + - "WHERE relclass = 'provides' " + - "AND datainfo.deletedbyinference = false ) rel " + - "ON d.id = rel.source "; - - org.apache.spark.sql.Dataset rels = spark.sql(query); - rels.createOrReplaceTempView("rels"); - - org.apache.spark.sql.Dataset dataset = spark.createDataset(sc.textFile(inputPath + "/dataset") - .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.Dataset.class)).rdd(), - Encoders.bean(eu.dnetlib.dhp.schema.oaf.Dataset.class)); - - org.apache.spark.sql.Dataset other = spark.createDataset(sc.textFile(inputPath + "/otherresearchproduct") - .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.OtherResearchProduct.class)).rdd(), - Encoders.bean(eu.dnetlib.dhp.schema.oaf.OtherResearchProduct.class)); - - org.apache.spark.sql.Dataset software = spark.createDataset(sc.textFile(inputPath + "/software") - .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.Software.class)).rdd(), - Encoders.bean(eu.dnetlib.dhp.schema.oaf.Software.class)); - - org.apache.spark.sql.Dataset publication = spark.createDataset(sc.textFile(inputPath + "/publication") - .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.Publication.class)).rdd(), - Encoders.bean(eu.dnetlib.dhp.schema.oaf.Publication.class)); - - - software.createOrReplaceTempView("software"); - final JavaRDD toupdateresultsoftware = propagateOnResult(spark, "software"); - - dataset.createOrReplaceTempView("dataset"); - final JavaRDD toupdateresultdataset = propagateOnResult(spark, "dataset"); - - other.createOrReplaceTempView("other"); - final JavaRDD toupdateresultother = propagateOnResult(spark, "other"); - - publication.createOrReplaceTempView("publication"); - final JavaRDD toupdateresultpublication = propagateOnResult(spark, "publication"); - - writeUpdates(toupdateresultsoftware, toupdateresultdataset, toupdateresultother, toupdateresultpublication, outputPath); - - - query = "Select source resultId, collect_set(target) org_list " + - "from relation " + - "where datainfo.deletedbyinference = false " + - "and relClass = '" + RELATION_RESULT_ORGANIZATION_REL_CLASS +"' " + - "group by source"; - - JavaRDD result_orglist = spark.sql(query).toJavaRDD(); - - JavaPairRDD> toupdateunion = toupdateresultdataset.mapToPair(d -> new Tuple2<>(d.getString(0), d.getList(1))) - .union(toupdateresultother.mapToPair(o -> new Tuple2<>(o.getString(0), o.getList(1)))) - .union(toupdateresultpublication.mapToPair(p -> new Tuple2<>(p.getString(0), p.getList(1)))) - .union(toupdateresultsoftware.mapToPair(s -> new Tuple2<>(s.getString(0), s.getList(1)))); - - JavaRDD new_rels = getNewRels(result_orglist.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1))), - toupdateunion); - - - - sc.textFile(inputPath + "/relation") - .map(item -> new ObjectMapper().readValue(item, Relation.class)) - .union(new_rels) - .map(r -> new ObjectMapper().writeValueAsString(r)) - .saveAsTextFile(outputPath + "/relation"); - - - } - - private static JavaRDD getNewRels(JavaPairRDD> relationOrgs, JavaPairRDD > newRels){ - return newRels - .leftOuterJoin(relationOrgs) - .flatMap(c -> { - List toAddOrgs = new ArrayList<>(); - toAddOrgs.addAll(c._2()._1()); - if (c._2()._2().isPresent()) { - Set originalOrgs = new HashSet<>(); - originalOrgs.addAll(c._2()._2().get()); - for (Object oId : originalOrgs) { - if (toAddOrgs.contains(oId)) { - toAddOrgs.remove(oId); - } - } + runWithSparkHiveSession(conf, isSparkSessionManaged, + spark -> { + if(isTest(parser)) { + removeOutputDir(spark, outputPath); } - List relationList = new ArrayList<>(); - String resId = c._1(); - for (Object org : toAddOrgs) { - relationList.add(getRelation((String)org, resId, RELATION_ORGANIZATION_RESULT_REL_CLASS, - RELATION_RESULTORGANIZATION_REL_TYPE, RELATION_RESULTORGANIZATION_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, - PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); - relationList.add(getRelation(resId, (String)org, RELATION_RESULT_ORGANIZATION_REL_CLASS, - RELATION_RESULTORGANIZATION_REL_TYPE, RELATION_RESULTORGANIZATION_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, - PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); - - } - return relationList.iterator(); + execPropagation(spark, datasourceorganization, alreadylinked, inputPath, outputPath, resultClazz, resultType, + writeUpdates, saveGraph); }); - } - private static void writeUpdates(JavaRDD toupdateresultsoftware, JavaRDD toupdateresultdataset, JavaRDD toupdateresultother, JavaRDD toupdateresultpublication, String outputPath) { - createUpdateForRelationWrite(toupdateresultsoftware, outputPath, "update_software"); - createUpdateForRelationWrite(toupdateresultdataset, outputPath, "update_dataset"); - createUpdateForRelationWrite(toupdateresultother, outputPath, "update_other"); - createUpdateForRelationWrite(toupdateresultpublication, outputPath, "update_publication"); } - private static void createUpdateForRelationWrite(JavaRDD toupdaterelation, String outputPath, String update_type) { + private static void execPropagation(SparkSession spark, String datasourceorganization, String alreadylinked, String inputPath, + String outputPath, Class resultClazz, String resultType, + Boolean writeUpdates, Boolean saveGraph) { + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + org.apache.spark.sql.Dataset datasourceorganizationassoc = + readAssocDatasourceOrganization(spark, datasourceorganization); + + //broadcasting the result of the preparation step + Broadcast> broadcast_datasourceorganizationassoc = + sc.broadcast(datasourceorganizationassoc); + + org.apache.spark.sql.Dataset potentialUpdates = getPotentialRelations(spark, inputPath, resultClazz, + broadcast_datasourceorganizationassoc).as(Encoders.bean(ResultOrganizationSet.class)); + + if(writeUpdates){ + createUpdateForRelationWrite(potentialUpdates, outputPath + "/" + resultType); + } + + if(saveGraph){ + getNewRelations(spark + .read() + .textFile(alreadylinked) + .map(value -> OBJECT_MAPPER.readValue(value, ResultOrganizationSet.class), + Encoders.bean(ResultOrganizationSet.class)), potentialUpdates); + } + + + } + + private static Dataset getNewRelations(Dataset alreadyLinked, Dataset potentialUpdates) { + return potentialUpdates + .joinWith(alreadyLinked, potentialUpdates.col("resultId") + .equalTo(alreadyLinked.col("resultId")), "left_outer") + .flatMap((FlatMapFunction, Relation>) value -> { + List new_relations = new ArrayList<>(); + ResultOrganizationSet potential_update = value._1(); + Optional already_linked = Optional.ofNullable(value._2()); + List organization_list = potential_update.getOrganizationSet(); + if(already_linked.isPresent()){ + already_linked.get().getOrganizationSet() + .stream() + .forEach(rId -> { + if (organization_list.contains(rId)) { + organization_list.remove(rId); + } + }); + } + String resultId = potential_update.getResultId(); + organization_list + .stream() + .forEach(orgId -> { + new_relations.add(getRelation(orgId, resultId, RELATION_ORGANIZATION_RESULT_REL_CLASS, RELATION_RESULTORGANIZATION_REL_TYPE, + RELATION_RESULTORGANIZATION_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, + PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); + new_relations.add(getRelation(resultId, orgId, RELATION_RESULT_ORGANIZATION_REL_CLASS, RELATION_RESULTORGANIZATION_REL_TYPE, + RELATION_RESULTORGANIZATION_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, + PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); + } + ); + return new_relations.iterator(); + + } + ,Encoders.bean(Relation.class)); + + } + + + private static org.apache.spark.sql.Dataset getPotentialRelations(SparkSession spark, + String inputPath, + Class resultClazz, + Broadcast> broadcast_datasourceorganizationassoc) { + org.apache.spark.sql.Dataset result = readPathEntity(spark, inputPath, resultClazz); + result.createOrReplaceTempView("result"); + createCfHbforresult(spark); + + return organizationPropagationAssoc(spark, broadcast_datasourceorganizationassoc); + } + + + + + private static org.apache.spark.sql.Dataset readAssocDatasourceOrganization(SparkSession spark, + String datasourcecountryorganization) { + return spark + .read() + .textFile(datasourcecountryorganization) + .map(value -> OBJECT_MAPPER.readValue(value, DatasourceOrganization.class), Encoders.bean(DatasourceOrganization.class)); + } + + + + private static void createUpdateForRelationWrite(Dataset toupdaterelation, String outputPath) { toupdaterelation.flatMap(s -> { List relationList = new ArrayList<>(); - List orgs = s.getList(1); - String resId = s.getString(0); + List orgs = s.getOrganizationSet(); + String resId = s.getResultId(); for (String org : orgs) { relationList.add(getRelation(org, resId, RELATION_ORGANIZATION_RESULT_REL_CLASS, RELATION_RESULTORGANIZATION_REL_TYPE, RELATION_RESULTORGANIZATION_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, @@ -183,267 +207,31 @@ public class SparkResultToOrganizationFromIstRepoJob { } return relationList.iterator(); - }).map(s -> new ObjectMapper().writeValueAsString(s)).saveAsTextFile(outputPath + "/" + update_type); + }, Encoders.bean(Relation.class)) + .toJSON() + .write() + .mode(SaveMode.Overwrite) + .option("compression","gzip") + .text(outputPath) + ; } - private static JavaRDD propagateOnResult(SparkSession spark, String table) { - String query; - query = "SELECT id, inst.collectedfrom.key cf , inst.hostedby.key hb " + - "FROM ( SELECT id, instance " + - "FROM " + table + - " WHERE datainfo.deletedbyinference = false) ds " + - "LATERAL VIEW EXPLODE(instance) i AS inst"; - org.apache.spark.sql.Dataset cfhb = spark.sql(query); - cfhb.createOrReplaceTempView("cfhb"); - return organizationPropagationAssoc(spark, "cfhb").toJavaRDD(); - - } - - private static org.apache.spark.sql.Dataset organizationPropagationAssoc(SparkSession spark, String cfhbTable){ - String query = "SELECT id, collect_set(org) org "+ - "FROM ( SELECT id, org " + + private static org.apache.spark.sql.Dataset organizationPropagationAssoc(SparkSession spark, Broadcast> broadcast_datasourceorganizationassoc){ + org.apache.spark.sql.Dataset datasourceorganization = broadcast_datasourceorganizationassoc.value(); + datasourceorganization.createOrReplaceTempView("rels"); + String query = "SELECT id resultId, collect_set(organizationId) organizationSet "+ + "FROM ( SELECT id, organizationId " + "FROM rels " + - "JOIN " + cfhbTable + - " ON cf = ds " + + "JOIN cfhb " + + " ON cf = datasourceId " + "UNION ALL " + - "SELECT id , org " + + "SELECT id , organizationId " + "FROM rels " + - "JOIN " + cfhbTable + - " ON hb = ds ) tmp " + + "JOIN cfhb " + + " ON hb = datasourceId ) tmp " + "GROUP BY id"; - return spark.sql(query); + return spark.sql(query).as(Encoders.bean(ResultOrganizationSet.class)); } } - -//package eu.dnetlib.dhp.resulttoorganizationfrominstrepo; -// -//import eu.dnetlib.dhp.TypedRow; -//import eu.dnetlib.dhp.application.ArgumentApplicationParser; -//import org.apache.commons.io.IOUtils; -//import org.apache.hadoop.fs.FileSystem; -//import org.apache.spark.SparkConf; -//import org.apache.spark.api.java.JavaPairRDD; -//import org.apache.spark.api.java.JavaRDD; -//import org.apache.spark.api.java.JavaSparkContext; -//import org.apache.spark.sql.Encoders; -//import org.apache.spark.sql.Row; -//import org.apache.spark.sql.SparkSession; -//import org.apache.hadoop.io.Text; -//import com.fasterxml.jackson.databind.ObjectMapper; -//import java.io.File; -//import java.util.ArrayList; -//import java.util.Arrays; -//import java.util.List; -//import java.util.Set; -// -//import eu.dnetlib.dhp.schema.oaf.*; -//import scala.Tuple2; -// -//import static eu.dnetlib.dhp.PropagationConstant.*; -// -//public class SparkResultToOrganizationFromIstRepoJob { -// public static void main(String[] args) throws Exception { -// -// final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkResultToOrganizationFromIstRepoJob.class.getResourceAsStream("/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/input_propagationresulaffiliationfrominstrepo_parameters.json"))); -// parser.parseArgument(args); -// SparkConf conf = new SparkConf(); -// conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); -// final SparkSession spark = SparkSession -// .builder() -// .appName(SparkResultToOrganizationFromIstRepoJob.class.getSimpleName()) -// .master(parser.get("master")) -// .config(conf) -// .enableHiveSupport() -// .getOrCreate(); -// -// final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); -// final String inputPath = parser.get("sourcePath"); -// final String outputPath = "/tmp/provision/propagation/resulttoorganizationfrominstitutionalrepositories"; -// -// createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); -// -// org.apache.spark.sql.Dataset datasource = spark.createDataset(sc.textFile(inputPath + "/datasource") -// .map(item -> new ObjectMapper().readValue(item, Datasource.class)) -// .rdd(), Encoders.bean(Datasource.class)); -// -// JavaRDD relation_rdd_all = sc.textFile(inputPath + "/relation") -// .map(item -> new ObjectMapper().readValue(item, Relation.class)); -// JavaRDD relation_rdd = relation_rdd_all.filter(r -> !r.getDataInfo().getDeletedbyinference()).cache(); -// -// org.apache.spark.sql.Dataset relation = spark.createDataset(relation_rdd.rdd(), Encoders.bean(Relation.class)); -// -// org.apache.spark.sql.Dataset organization = spark.createDataset(sc.textFile(inputPath + "/organziation") -// .map(item -> new ObjectMapper().readValue(item, Organization.class)).rdd(), Encoders.bean(Organization.class)); -// -// datasource.createOrReplaceTempView("datasource"); -// relation.createOrReplaceTempView("relation"); -// organization.createOrReplaceTempView("organization"); -// -// String query = "SELECT source ds, target org " + -// "FROM ( SELECT id " + -// "FROM datasource " + -// "WHERE datasourcetype.classid = 'pubsrepository::institutional' " + -// "AND datainfo.deletedbyinference = false ) d " + -// "JOIN ( SELECT source, target " + -// "FROM relation " + -// "WHERE relclass = 'provides' " + -// "AND datainfo.deletedbyinference = false ) rel " + -// "ON d.id = rel.source "; -// -// org.apache.spark.sql.Dataset rels = spark.sql(query); -// rels.createOrReplaceTempView("rels"); -// -// org.apache.spark.sql.Dataset dataset = spark.createDataset(sc.textFile(inputPath + "/dataset") -// .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.Dataset.class)).rdd(), -// Encoders.bean(eu.dnetlib.dhp.schema.oaf.Dataset.class)); -// -// org.apache.spark.sql.Dataset other = spark.createDataset(sc.textFile(inputPath + "/otherresearchproduct") -// .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.OtherResearchProduct.class)).rdd(), -// Encoders.bean(eu.dnetlib.dhp.schema.oaf.OtherResearchProduct.class)); -// -// org.apache.spark.sql.Dataset software = spark.createDataset(sc.textFile(inputPath + "/software") -// .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.Software.class)).rdd(), -// Encoders.bean(eu.dnetlib.dhp.schema.oaf.Software.class)); -// -// org.apache.spark.sql.Dataset publication = spark.createDataset(sc.textFile(inputPath + "/publication") -// .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.Publication.class)).rdd(), -// Encoders.bean(eu.dnetlib.dhp.schema.oaf.Publication.class)); -// -// -// software.createOrReplaceTempView("software"); -// final JavaRDD toupdateresultsoftware = propagateOnResult(spark, "software"); -// -// dataset.createOrReplaceTempView("dataset"); -// final JavaRDD toupdateresultdataset = propagateOnResult(spark, "dataset"); -// -// other.createOrReplaceTempView("other"); -// final JavaRDD toupdateresultother = propagateOnResult(spark, "other"); -// -// publication.createOrReplaceTempView("publication"); -// final JavaRDD toupdateresultpublication = propagateOnResult(spark, "publication"); -// -// writeUpdates(toupdateresultsoftware, toupdateresultdataset, toupdateresultother, toupdateresultpublication, outputPath); -// -// JavaPairRDD relation_rdd_pair = relation_rdd -// .filter(r -> RELATION_RESULT_ORGANIZATION_REL_CLASS.equals(r.getRelClass())) -// .map(r -> { -// TypedRow tp = new TypedRow(); -// tp.setSourceId(r.getSource()); -// tp.add(r.getTarget()); -// return tp; -// }).mapToPair(toPair()) -// .reduceByKey((a, b) -> { -// if (a == null) { -// return b; -// } -// if (b == null) { -// return a; -// } -// -// a.addAll(b.getAccumulator()); -// return a; -// }).cache(); -// -// -// JavaRDD new_rels = getNewRels(relation_rdd_pair, -// toupdateresultother.mapToPair(s -> new Tuple2<>(s.getString(0), s.getList(1)))) -// .union(getNewRels(relation_rdd_pair, -// toupdateresultsoftware.mapToPair(s -> new Tuple2<>(s.getString(0), s.getList(1))))) -// .union(getNewRels(relation_rdd_pair, -// toupdateresultdataset.mapToPair(s -> new Tuple2<>(s.getString(0), s.getList(1))))) -// .union(getNewRels(relation_rdd_pair, -// toupdateresultpublication.mapToPair(s -> new Tuple2<>(s.getString(0), s.getList(1))))); -// -// -// relation_rdd_all.union(new_rels).map(r -> new ObjectMapper().writeValueAsString(r)) -// .saveAsTextFile(outputPath + "/relation"); -// -// } -// -// private static JavaRDD getNewRels(JavaPairRDD relation_rdd_pair, JavaPairRDD > newRels){ -// return newRels//.mapToPair(s -> new Tuple2<>(s.getString(0), s.getList(1))) -// .leftOuterJoin(relation_rdd_pair) -// .flatMap(c -> { -// List toAddOrgs = c._2()._1(); -// if (c._2()._2().isPresent()) { -// Set originalOrgs = c._2()._2().get().getAccumulator(); -// for (String oId : toAddOrgs) { -// if (originalOrgs.contains(oId)) { -// toAddOrgs.remove(oId); -// } -// } -// } -// List relationList = new ArrayList<>(); -// String resId = c._1(); -// for (String org : toAddOrgs) { -// relationList.add(getRelation(org, resId, RELATION_ORGANIZATION_RESULT_REL_CLASS, -// RELATION_RESULTORGANIZATION_REL_TYPE, RELATION_RESULTORGANIZATION_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, -// PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); -// relationList.add(getRelation(resId, org, RELATION_RESULT_ORGANIZATION_REL_CLASS, -// RELATION_RESULTORGANIZATION_REL_TYPE, RELATION_RESULTORGANIZATION_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, -// PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); -// -// } -// return relationList.iterator(); -// }); -// -// } -// -// private static void writeUpdates(JavaRDD toupdateresultsoftware, JavaRDD toupdateresultdataset, JavaRDD toupdateresultother, JavaRDD toupdateresultpublication, String outputPath) { -// createUpdateForRelationWrite(toupdateresultsoftware, outputPath, "update_software"); -// createUpdateForRelationWrite(toupdateresultdataset, outputPath, "update_dataset"); -// createUpdateForRelationWrite(toupdateresultother, outputPath, "update_other"); -// createUpdateForRelationWrite(toupdateresultpublication, outputPath, "update_publication"); -// -// } -// -// private static void createUpdateForRelationWrite(JavaRDD toupdaterelation, String outputPath, String update_type) { -// toupdaterelation.flatMap(s -> { -// List relationList = new ArrayList<>(); -// List orgs = s.getList(1); -// String resId = s.getString(0); -// for (String org : orgs) { -// relationList.add(getRelation(org, resId, RELATION_ORGANIZATION_RESULT_REL_CLASS, -// RELATION_RESULTORGANIZATION_REL_TYPE, RELATION_RESULTORGANIZATION_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, -// PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); -// relationList.add(getRelation(resId, org, RELATION_RESULT_ORGANIZATION_REL_CLASS, -// RELATION_RESULTORGANIZATION_REL_TYPE, RELATION_RESULTORGANIZATION_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, -// PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); -// -// } -// return relationList.iterator(); -// }).map(s -> new ObjectMapper().writeValueAsString(s)).saveAsTextFile(outputPath + "/" + update_type); -// } -// -// private static JavaRDD propagateOnResult(SparkSession spark, String table) { -// String query; -// query = "SELECT id, inst.collectedfrom.key cf , inst.hostedby.key hb " + -// "FROM ( SELECT id, instance " + -// "FROM " + table + -// " WHERE datainfo.deletedbyinference = false) ds " + -// "LATERAL VIEW EXPLODE(instance) i AS inst"; -// org.apache.spark.sql.Dataset cfhb = spark.sql(query); -// cfhb.createOrReplaceTempView("cfhb"); -// -// return organizationPropagationAssoc(spark, "cfhb").toJavaRDD(); -// -// } -// -// private static org.apache.spark.sql.Dataset organizationPropagationAssoc(SparkSession spark, String cfhbTable){ -// String query = "SELECT id, collect_set(org) org "+ -// "FROM ( SELECT id, org " + -// "FROM rels " + -// "JOIN " + cfhbTable + -// " ON cf = ds " + -// "UNION ALL " + -// "SELECT id , org " + -// "FROM rels " + -// "JOIN " + cfhbTable + -// " ON hb = ds ) tmp " + -// "GROUP BY id"; -// return spark.sql(query); -// } -// -//}