diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/PrepareResultInstRepoAssociation.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/PrepareResultInstRepoAssociation.java index aa2861623..bb4dfe429 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/PrepareResultInstRepoAssociation.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/PrepareResultInstRepoAssociation.java @@ -6,8 +6,10 @@ import eu.dnetlib.dhp.schema.oaf.Datasource; import eu.dnetlib.dhp.schema.oaf.Organization; import eu.dnetlib.dhp.schema.oaf.Relation; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; @@ -50,12 +52,34 @@ public class PrepareResultInstRepoAssociation { runWithSparkHiveSession(conf, isSparkSessionManaged, spark -> { - prepareAssociations(spark, inputPath, datasourceOrganizationPath, alreadyLinkedPath); + readNeededResources(spark, inputPath); + prepareDatasourceOrganizationAssociations(spark, datasourceOrganizationPath, alreadyLinkedPath); + prepareAlreadyLinkedAssociation(spark, alreadyLinkedPath); }); } - private static void prepareAssociations(SparkSession spark, String inputPath, String datasourceOrganizationPath, - String alreadyLinkedPath){ + private static void prepareAlreadyLinkedAssociation(SparkSession spark, String alreadyLinkedPath) { + String query = "Select source resultId, collect_set(target) organizationSet " + + "from relation " + + "where datainfo.deletedbyinference = false " + + "and relClass = '" + RELATION_RESULT_ORGANIZATION_REL_CLASS +"' " + + "group by source"; + + + spark.sql(query) + .as(Encoders.bean(ResultOrganizationSet.class)) + .toJavaRDD() + .map(r -> OBJECT_MAPPER.writeValueAsString(r)) + .saveAsTextFile(alreadyLinkedPath, GzipCodec.class); +// .as(Encoders.bean(ResultOrganizationSet.class)) +// .toJSON() +// .write() +// .mode(SaveMode.Overwrite) +// .option("compression","gzip") +// .text(alreadyLinkedPath); + } + + private static void readNeededResources(SparkSession spark, String inputPath) { final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); org.apache.spark.sql.Dataset datasource = spark.createDataset(sc.textFile(inputPath + "/datasource") @@ -70,6 +94,11 @@ public class PrepareResultInstRepoAssociation { datasource.createOrReplaceTempView("datasource"); relation.createOrReplaceTempView("relation"); organization.createOrReplaceTempView("organization"); + } + + private static void prepareDatasourceOrganizationAssociations(SparkSession spark, String datasourceOrganizationPath, + String alreadyLinkedPath){ + String query = "SELECT source datasourceId, target organizationId " + "FROM ( SELECT id " + @@ -90,19 +119,6 @@ public class PrepareResultInstRepoAssociation { .option("compression","gzip") .text(datasourceOrganizationPath); - query = "Select source, collect_set(target) organizationSet " + - "from relation " + - "where datainfo.deletedbyinference = false " + - "and relClass = '" + RELATION_RESULT_ORGANIZATION_REL_CLASS +"' " + - "group by source"; - - spark.sql(query) - .as(Encoders.bean(ResultOrganizationSet.class)) - .toJSON() - .write() - .mode(SaveMode.Overwrite) - .option("compression","gzip") - .text(alreadyLinkedPath); }