From 0e5399bf74e717984c2bc12d5d3d2ef4b4517a23 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Thu, 16 Apr 2020 16:08:51 +0200 Subject: [PATCH] seconf phase of data preparation. Groups all the possible updates by id --- .../PrepareResultOrcidAssociationStep2.java | 114 +++++++----------- 1 file changed, 43 insertions(+), 71 deletions(-) diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep2.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep2.java index 4af652ef0d..660d0b87a9 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep2.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep2.java @@ -1,34 +1,27 @@ package eu.dnetlib.dhp.orcidtoresultfromsemrel; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.gson.Gson; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.dhp.schema.oaf.Result; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import java.util.Arrays; -import java.util.List; - +import scala.Tuple2; +import java.util.HashSet; +import java.util.Set; import static eu.dnetlib.dhp.PropagationConstant.*; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -public class PrepareResultOrcidAssociationStep1 { - private static final Logger log = LoggerFactory.getLogger(PrepareResultOrcidAssociationStep1.class); +public class PrepareResultOrcidAssociationStep2 { + private static final Logger log = LoggerFactory.getLogger(PrepareResultOrcidAssociationStep2.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils.toString(SparkOrcidToResultFromSemRelJob3.class - .getResourceAsStream("/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_prepareorcidtoresult_parameters.json")); + .getResourceAsStream("/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_prepareorcidtoresult2_parameters.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser( jsonConfiguration); @@ -44,76 +37,55 @@ public class PrepareResultOrcidAssociationStep1 { final String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); - final String resultClassName = parser.get("resultTableName"); - log.info("resultTableName: {}", resultClassName); - - final List allowedsemrel = Arrays.asList(parser.get("allowedsemrel").split(";")); - log.info("allowedSemRel: {}", new Gson().toJson(allowedsemrel)); - - final String resultType = resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase(); - log.info("resultType: {}", resultType); - - - Class resultClazz = (Class) Class.forName(resultClassName); - SparkConf conf = new SparkConf(); - conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); - runWithSparkHiveSession(conf, isSparkSessionManaged, + runWithSparkSession(conf, isSparkSessionManaged, spark -> { if (isTest(parser)) { removeOutputDir(spark, outputPath); } - prepareInfo(spark, inputPath, outputPath, resultClazz, resultType, allowedsemrel); + mergeInfo(spark, inputPath, outputPath); }); } - private static void prepareInfo(SparkSession spark, String inputPath, - String outputPath, Class resultClazz, - String resultType, - List allowedsemrel) { + private static void mergeInfo(SparkSession spark, String inputPath, String outputPath) { - //read the relation table and the table related to the result it is using - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - org.apache.spark.sql.Dataset relation = spark.createDataset(sc.textFile(inputPath + "/relation") - .map(item -> new ObjectMapper().readValue(item, Relation.class)).rdd(), Encoders.bean(Relation.class)); - relation.createOrReplaceTempView("relation"); + Dataset resultOrcidAssoc = readAssocResultOrcidList(spark, inputPath); - log.info("Reading Graph table from: {}", inputPath + "/" + resultType); - Dataset result = readPathEntity(spark, inputPath + "/" + resultType, resultClazz); - - result.createOrReplaceTempView("result"); - - getPossibleResultOrcidAssociation(spark, allowedsemrel, outputPath); + resultOrcidAssoc + .toJavaRDD() + .mapToPair(r -> new Tuple2<>(r.getResultId(), r)) + .reduceByKey((a, b) -> { + if (a == null) { + return b; + } + if (b == null) { + return a; + } + Set orcid_set = new HashSet<>(); + a.getAuthorList().stream().forEach(aa -> orcid_set.add(aa.getOrcid())); + b.getAuthorList().stream().forEach(aa -> { + if (!orcid_set.contains(aa.getOrcid())) { + a.getAuthorList().add(aa); + orcid_set.add(aa.getOrcid()); + } + }); + return a; + }) + .map(c -> c._2()) + .map(r -> OBJECT_MAPPER.writeValueAsString(r)) + .saveAsTextFile(outputPath, GzipCodec.class); } - private static void getPossibleResultOrcidAssociation(SparkSession spark, List allowedsemrel, String outputPath){ - String query = " select target resultId, author authorList" + - " from (select id, collect_set(named_struct('name', name, 'surname', surname, 'fullname', fullname, 'orcid', orcid)) author " + - " from ( " + - " select id, MyT.fullname, MyT.name, MyT.surname, MyP.value orcid " + - " from result " + - " lateral view explode (author) a as MyT " + - " lateral view explode (MyT.pid) p as MyP " + - " where MyP.qualifier.classid = 'ORCID') tmp " + - " group by id) r_t " + - " join (" + - " select source, target " + - " from relation " + - " where datainfo.deletedbyinference = false " + - getConstraintList(" relclass = '" ,allowedsemrel) + ") rel_rel " + - " on source = id"; - - spark.sql(query) - .as(Encoders.bean(ResultOrcidList.class)) - .toJSON() - .write() - .mode(SaveMode.Append) - .option("compression","gzip") - .text(outputPath) - ; + private static Dataset readAssocResultOrcidList(SparkSession spark, String relationPath) { + return spark + .read() + .textFile(relationPath) + .map(value -> OBJECT_MAPPER.readValue(value, ResultOrcidList.class), Encoders.bean(ResultOrcidList.class)); } + + }