diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/PrepareResultInstRepoAssociation.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/PrepareResultInstRepoAssociation.java index 51c46e2be..095f476cf 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/PrepareResultInstRepoAssociation.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/PrepareResultInstRepoAssociation.java @@ -1,5 +1,8 @@ package eu.dnetlib.dhp.resulttoorganizationfrominstrepo; +import static eu.dnetlib.dhp.PropagationConstant.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; + import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.Datasource; @@ -9,28 +12,26 @@ import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static eu.dnetlib.dhp.PropagationConstant.*; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; - public class PrepareResultInstRepoAssociation { - private static final Logger log = LoggerFactory.getLogger(PrepareResultInstRepoAssociation.class); + private static final Logger log = + LoggerFactory.getLogger(PrepareResultInstRepoAssociation.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static void main(String[] args) throws Exception{ + public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils.toString(PrepareResultInstRepoAssociation.class - .getResourceAsStream("/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/input_prepareresultorg_parameters.json")); + String jsonConfiguration = + IOUtils.toString( + PrepareResultInstRepoAssociation.class.getResourceAsStream( + "/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/input_prepareresultorg_parameters.json")); - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - jsonConfiguration); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); @@ -49,77 +50,89 @@ public class PrepareResultInstRepoAssociation { SparkConf conf = new SparkConf(); conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); - runWithSparkHiveSession(conf, isSparkSessionManaged, + runWithSparkHiveSession( + conf, + isSparkSessionManaged, spark -> { readNeededResources(spark, inputPath); - prepareDatasourceOrganizationAssociations(spark, datasourceOrganizationPath, alreadyLinkedPath); + prepareDatasourceOrganizationAssociations( + spark, datasourceOrganizationPath, alreadyLinkedPath); prepareAlreadyLinkedAssociation(spark, alreadyLinkedPath); }); } - private static void prepareAlreadyLinkedAssociation(SparkSession spark, String alreadyLinkedPath) { - String query = "Select source resultId, collect_set(target) organizationSet " + - "from relation " + - "where datainfo.deletedbyinference = false " + - "and relClass = '" + RELATION_RESULT_ORGANIZATION_REL_CLASS +"' " + - "group by source"; - + private static void prepareAlreadyLinkedAssociation( + SparkSession spark, String alreadyLinkedPath) { + String query = + "Select source resultId, collect_set(target) organizationSet " + + "from relation " + + "where datainfo.deletedbyinference = false " + + "and relClass = '" + + RELATION_RESULT_ORGANIZATION_REL_CLASS + + "' " + + "group by source"; spark.sql(query) .as(Encoders.bean(ResultOrganizationSet.class)) .toJavaRDD() .map(r -> OBJECT_MAPPER.writeValueAsString(r)) .saveAsTextFile(alreadyLinkedPath, GzipCodec.class); -// .as(Encoders.bean(ResultOrganizationSet.class)) -// .toJSON() -// .write() -// .mode(SaveMode.Overwrite) -// .option("compression","gzip") -// .text(alreadyLinkedPath); } private static void readNeededResources(SparkSession spark, String inputPath) { final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - org.apache.spark.sql.Dataset datasource = spark.createDataset(sc.textFile(inputPath + "/datasource") - .map(item -> new ObjectMapper().readValue(item, Datasource.class)).rdd(), Encoders.bean(Datasource.class)); + org.apache.spark.sql.Dataset datasource = + spark.createDataset( + sc.textFile(inputPath + "/datasource") + .map(item -> new ObjectMapper().readValue(item, Datasource.class)) + .rdd(), + Encoders.bean(Datasource.class)); - org.apache.spark.sql.Dataset relation = spark.createDataset(sc.textFile(inputPath + "/relation") - .map(item -> new ObjectMapper().readValue(item, Relation.class)).rdd(), Encoders.bean(Relation.class)); + org.apache.spark.sql.Dataset relation = + spark.createDataset( + sc.textFile(inputPath + "/relation") + .map(item -> new ObjectMapper().readValue(item, Relation.class)) + .rdd(), + Encoders.bean(Relation.class)); - org.apache.spark.sql.Dataset organization = spark.createDataset(sc.textFile(inputPath + "/organization") - .map(item -> new ObjectMapper().readValue(item, Organization.class)).rdd(), Encoders.bean(Organization.class)); + org.apache.spark.sql.Dataset organization = + spark.createDataset( + sc.textFile(inputPath + "/organization") + .map(item -> new ObjectMapper().readValue(item, Organization.class)) + .rdd(), + Encoders.bean(Organization.class)); datasource.createOrReplaceTempView("datasource"); relation.createOrReplaceTempView("relation"); organization.createOrReplaceTempView("organization"); } - private static void prepareDatasourceOrganizationAssociations(SparkSession spark, String datasourceOrganizationPath, - String alreadyLinkedPath){ + private static void prepareDatasourceOrganizationAssociations( + SparkSession spark, String datasourceOrganizationPath, String alreadyLinkedPath) { - - String query = "SELECT source datasourceId, target organizationId " + - "FROM ( SELECT id " + - "FROM datasource " + - "WHERE datasourcetype.classid = 'pubsrepository::institutional' " + - "AND datainfo.deletedbyinference = false ) d " + - "JOIN ( SELECT source, target " + - "FROM relation " + - "WHERE relclass = '" + RELATION_DATASOURCE_ORGANIZATION_REL_CLASS + "' " + - "AND datainfo.deletedbyinference = false ) rel " + - "ON d.id = rel.source "; + String query = + "SELECT source datasourceId, target organizationId " + + "FROM ( SELECT id " + + "FROM datasource " + + "WHERE datasourcetype.classid = '" + + INSTITUTIONAL_REPO_TYPE + + "' " + + "AND datainfo.deletedbyinference = false ) d " + + "JOIN ( SELECT source, target " + + "FROM relation " + + "WHERE relclass = '" + + RELATION_DATASOURCE_ORGANIZATION_REL_CLASS + + "' " + + "AND datainfo.deletedbyinference = false ) rel " + + "ON d.id = rel.source "; spark.sql(query) - .as(Encoders.bean(DatasourceOrganization.class)) - .toJSON() - .write() - .mode(SaveMode.Overwrite) - .option("compression","gzip") - .text(datasourceOrganizationPath); - - - + .as(Encoders.bean(DatasourceOrganization.class)) + .toJSON() + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .text(datasourceOrganizationPath); } - } diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob2.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob2.java index 82c69e927..db8b99ac7 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob2.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob2.java @@ -1,17 +1,14 @@ package eu.dnetlib.dhp.resulttoorganizationfrominstrepo; +import static eu.dnetlib.dhp.PropagationConstant.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; + import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.countrypropagation.DatasourceCountry; -import eu.dnetlib.dhp.countrypropagation.ResultCountrySet; -import eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob2; import eu.dnetlib.dhp.schema.oaf.*; +import java.util.*; import org.apache.commons.io.IOUtils; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.broadcast.Broadcast; @@ -19,27 +16,23 @@ import org.apache.spark.sql.*; import org.apache.spark.sql.Dataset; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Option; import scala.Tuple2; -import java.util.*; - -import static eu.dnetlib.dhp.PropagationConstant.*; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; - public class SparkResultToOrganizationFromIstRepoJob2 { - private static final Logger log = LoggerFactory.getLogger(SparkResultToOrganizationFromIstRepoJob2.class); + private static final Logger log = + LoggerFactory.getLogger(SparkResultToOrganizationFromIstRepoJob2.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils.toString(SparkResultToOrganizationFromIstRepoJob2.class - .getResourceAsStream("/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/input_propagationresulaffiliationfrominstrepo_parameters.json")); + String jsonConfiguration = + IOUtils.toString( + SparkResultToOrganizationFromIstRepoJob2.class.getResourceAsStream( + "/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/input_propagationresulaffiliationfrominstrepo_parameters.json")); - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - jsonConfiguration); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); @@ -49,7 +42,6 @@ public class SparkResultToOrganizationFromIstRepoJob2 { String inputPath = parser.get("sourcePath"); log.info("inputPath: {}", inputPath); - final String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); @@ -62,119 +54,163 @@ public class SparkResultToOrganizationFromIstRepoJob2 { final String resultClassName = parser.get("resultTableName"); log.info("resultTableName: {}", resultClassName); - final String resultType = resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase(); + final String resultType = + resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase(); log.info("resultType: {}", resultType); - final Boolean writeUpdates = Optional - .ofNullable(parser.get("writeUpdate")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); + final Boolean writeUpdates = + Optional.ofNullable(parser.get("writeUpdate")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); log.info("writeUpdate: {}", writeUpdates); - final Boolean saveGraph = Optional - .ofNullable(parser.get("saveGraph")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); + final Boolean saveGraph = + Optional.ofNullable(parser.get("saveGraph")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); log.info("saveGraph: {}", saveGraph); - Class resultClazz = (Class) Class.forName(resultClassName); + Class resultClazz = + (Class) Class.forName(resultClassName); SparkConf conf = new SparkConf(); conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); - runWithSparkHiveSession(conf, isSparkSessionManaged, + runWithSparkHiveSession( + conf, + isSparkSessionManaged, spark -> { - if(isTest(parser)) { + if (isTest(parser)) { removeOutputDir(spark, outputPath); } - execPropagation(spark, datasourceorganization, alreadylinked, inputPath, outputPath, resultClazz, resultType, - writeUpdates, saveGraph); + execPropagation( + spark, + datasourceorganization, + alreadylinked, + inputPath, + outputPath, + resultClazz, + resultType, + writeUpdates, + saveGraph); }); - } - private static void execPropagation(SparkSession spark, String datasourceorganization, String alreadylinked, String inputPath, - String outputPath, Class resultClazz, String resultType, - Boolean writeUpdates, Boolean saveGraph) { + private static void execPropagation( + SparkSession spark, + String datasourceorganization, + String alreadylinked, + String inputPath, + String outputPath, + Class resultClazz, + String resultType, + Boolean writeUpdates, + Boolean saveGraph) { final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); org.apache.spark.sql.Dataset datasourceorganizationassoc = readAssocDatasourceOrganization(spark, datasourceorganization); - //broadcasting the result of the preparation step - Broadcast> broadcast_datasourceorganizationassoc = - sc.broadcast(datasourceorganizationassoc); + // broadcasting the result of the preparation step + Broadcast> + broadcast_datasourceorganizationassoc = sc.broadcast(datasourceorganizationassoc); - org.apache.spark.sql.Dataset potentialUpdates = getPotentialRelations(spark, inputPath, resultClazz, - broadcast_datasourceorganizationassoc).as(Encoders.bean(ResultOrganizationSet.class)); + org.apache.spark.sql.Dataset potentialUpdates = + getPotentialRelations( + spark, + inputPath, + resultClazz, + broadcast_datasourceorganizationassoc) + .as(Encoders.bean(ResultOrganizationSet.class)); - if(writeUpdates){ + if (writeUpdates) { createUpdateForRelationWrite(potentialUpdates, outputPath + "/" + resultType); } - if(saveGraph){ - getNewRelations(spark - .read() - .textFile(alreadylinked) - .map(value -> OBJECT_MAPPER.readValue(value, ResultOrganizationSet.class), - Encoders.bean(ResultOrganizationSet.class)), potentialUpdates) + if (saveGraph) { + getNewRelations( + spark.read() + .textFile(alreadylinked) + .map( + value -> + OBJECT_MAPPER.readValue( + value, ResultOrganizationSet.class), + Encoders.bean(ResultOrganizationSet.class)), + potentialUpdates) .toJSON() .write() .mode(SaveMode.Append) .option("compression", "gzip") .text(outputPath); -// .toJavaRDD() -// .map(r -> OBJECT_MAPPER.writeValueAsString(r)) -// .saveAsTextFile(outputPath , GzipCodec.class); } - - } - private static Dataset getNewRelations(Dataset alreadyLinked, Dataset potentialUpdates) { - - return potentialUpdates - .joinWith(alreadyLinked, potentialUpdates.col("resultId") - .equalTo(alreadyLinked.col("resultId")), "left_outer").flatMap((FlatMapFunction, Relation>) value -> { - List new_relations = new ArrayList<>(); - ResultOrganizationSet potential_update = value._1(); - Optional already_linked = Optional.ofNullable(value._2()); - List organization_list = potential_update.getOrganizationSet(); - if(already_linked.isPresent()){ - already_linked.get().getOrganizationSet() - .stream() - .forEach(rId -> { - if (organization_list.contains(rId)) { - organization_list.remove(rId); - } - }); - } - String resultId = potential_update.getResultId(); - organization_list - .stream() - .forEach(orgId -> { - new_relations.add(getRelation(orgId, resultId, RELATION_ORGANIZATION_RESULT_REL_CLASS, RELATION_RESULTORGANIZATION_REL_TYPE, - RELATION_RESULTORGANIZATION_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, - PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, - PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); - new_relations.add(getRelation(resultId, orgId, RELATION_RESULT_ORGANIZATION_REL_CLASS, RELATION_RESULTORGANIZATION_REL_TYPE, - RELATION_RESULTORGANIZATION_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, - PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, - PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); - } - ); - return new_relations.iterator(); - - } - ,Encoders.bean(Relation.class)); + private static Dataset getNewRelations( + Dataset alreadyLinked, + Dataset potentialUpdates) { + return potentialUpdates + .joinWith( + alreadyLinked, + potentialUpdates.col("resultId").equalTo(alreadyLinked.col("resultId")), + "left_outer") + .flatMap( + (FlatMapFunction< + Tuple2, + Relation>) + value -> { + List new_relations = new ArrayList<>(); + ResultOrganizationSet potential_update = value._1(); + Optional already_linked = + Optional.ofNullable(value._2()); + List organization_list = + potential_update.getOrganizationSet(); + if (already_linked.isPresent()) { + already_linked.get().getOrganizationSet().stream() + .forEach( + rId -> { + if (organization_list.contains(rId)) { + organization_list.remove(rId); + } + }); + } + String resultId = potential_update.getResultId(); + organization_list.stream() + .forEach( + orgId -> { + new_relations.add( + getRelation( + orgId, + resultId, + RELATION_ORGANIZATION_RESULT_REL_CLASS, + RELATION_RESULTORGANIZATION_REL_TYPE, + RELATION_RESULTORGANIZATION_SUBREL_TYPE, + PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, + PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); + new_relations.add( + getRelation( + resultId, + orgId, + RELATION_RESULT_ORGANIZATION_REL_CLASS, + RELATION_RESULTORGANIZATION_REL_TYPE, + RELATION_RESULTORGANIZATION_SUBREL_TYPE, + PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, + PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); + }); + return new_relations.iterator(); + }, + Encoders.bean(Relation.class)); } - - private static org.apache.spark.sql.Dataset getPotentialRelations(SparkSession spark, - String inputPath, - Class resultClazz, - Broadcast> broadcast_datasourceorganizationassoc) { + private static + org.apache.spark.sql.Dataset getPotentialRelations( + SparkSession spark, + String inputPath, + Class resultClazz, + Broadcast> + broadcast_datasourceorganizationassoc) { org.apache.spark.sql.Dataset result = readPathEntity(spark, inputPath, resultClazz); result.createOrReplaceTempView("result"); createCfHbforresult(spark); @@ -182,59 +218,75 @@ public class SparkResultToOrganizationFromIstRepoJob2 { return organizationPropagationAssoc(spark, broadcast_datasourceorganizationassoc); } - - - - private static org.apache.spark.sql.Dataset readAssocDatasourceOrganization(SparkSession spark, - String datasourcecountryorganization) { - return spark - .read() + private static org.apache.spark.sql.Dataset + readAssocDatasourceOrganization( + SparkSession spark, String datasourcecountryorganization) { + return spark.read() .textFile(datasourcecountryorganization) - .map(value -> OBJECT_MAPPER.readValue(value, DatasourceOrganization.class), Encoders.bean(DatasourceOrganization.class)); + .map( + value -> OBJECT_MAPPER.readValue(value, DatasourceOrganization.class), + Encoders.bean(DatasourceOrganization.class)); } - - - private static void createUpdateForRelationWrite(Dataset toupdaterelation, String outputPath) { - toupdaterelation.flatMap(s -> { - List relationList = new ArrayList<>(); - List orgs = s.getOrganizationSet(); - String resId = s.getResultId(); - for (String org : orgs) { - relationList.add(getRelation(org, resId, RELATION_ORGANIZATION_RESULT_REL_CLASS, - RELATION_RESULTORGANIZATION_REL_TYPE, RELATION_RESULTORGANIZATION_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, - PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); - relationList.add(getRelation(resId, org, RELATION_RESULT_ORGANIZATION_REL_CLASS, - RELATION_RESULTORGANIZATION_REL_TYPE, RELATION_RESULTORGANIZATION_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, - PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); - - } - return relationList.iterator(); - }, Encoders.bean(Relation.class)) + private static void createUpdateForRelationWrite( + Dataset toupdaterelation, String outputPath) { + toupdaterelation + .flatMap( + s -> { + List relationList = new ArrayList<>(); + List orgs = s.getOrganizationSet(); + String resId = s.getResultId(); + for (String org : orgs) { + relationList.add( + getRelation( + org, + resId, + RELATION_ORGANIZATION_RESULT_REL_CLASS, + RELATION_RESULTORGANIZATION_REL_TYPE, + RELATION_RESULTORGANIZATION_SUBREL_TYPE, + PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, + PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); + relationList.add( + getRelation( + resId, + org, + RELATION_RESULT_ORGANIZATION_REL_CLASS, + RELATION_RESULTORGANIZATION_REL_TYPE, + RELATION_RESULTORGANIZATION_SUBREL_TYPE, + PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, + PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); + } + return relationList.iterator(); + }, + Encoders.bean(Relation.class)) .toJSON() .write() .mode(SaveMode.Overwrite) - .option("compression","gzip") - .text(outputPath) - ; + .option("compression", "gzip") + .text(outputPath); } - - private static org.apache.spark.sql.Dataset organizationPropagationAssoc(SparkSession spark, Broadcast> broadcast_datasourceorganizationassoc){ - org.apache.spark.sql.Dataset datasourceorganization = broadcast_datasourceorganizationassoc.value(); + private static org.apache.spark.sql.Dataset organizationPropagationAssoc( + SparkSession spark, + Broadcast> + broadcast_datasourceorganizationassoc) { + org.apache.spark.sql.Dataset datasourceorganization = + broadcast_datasourceorganizationassoc.value(); datasourceorganization.createOrReplaceTempView("rels"); - String query = "SELECT id resultId, collect_set(organizationId) organizationSet "+ - "FROM ( SELECT id, organizationId " + - "FROM rels " + - "JOIN cfhb " + - " ON cf = datasourceId " + - "UNION ALL " + - "SELECT id , organizationId " + - "FROM rels " + - "JOIN cfhb " + - " ON hb = datasourceId ) tmp " + - "GROUP BY id"; + String query = + "SELECT id resultId, collect_set(organizationId) organizationSet " + + "FROM ( SELECT id, organizationId " + + "FROM rels " + + "JOIN cfhb " + + " ON cf = datasourceId " + + "UNION ALL " + + "SELECT id , organizationId " + + "FROM rels " + + "JOIN cfhb " + + " ON hb = datasourceId ) tmp " + + "GROUP BY id"; return spark.sql(query).as(Encoders.bean(ResultOrganizationSet.class)); } - }