From d8dc31d4afb8326d7deeb1fadb0f2af7f3486024 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Thu, 23 Apr 2020 12:35:49 +0200 Subject: [PATCH] refactoring --- .../PrepareResultOrcidAssociationStep1.java | 109 ++++++------ .../PrepareResultOrcidAssociationStep2.java | 87 +++++----- .../SparkOrcidToResultFromSemRelJob3.java | 162 ++++++++++-------- 3 files changed, 197 insertions(+), 161 deletions(-) diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep1.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep1.java index 8f4ecb6495..9bc34eb738 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep1.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep1.java @@ -1,38 +1,38 @@ package eu.dnetlib.dhp.orcidtoresultfromsemrel; +import static eu.dnetlib.dhp.PropagationConstant.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; + import com.fasterxml.jackson.databind.ObjectMapper; import com.google.gson.Gson; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Result; +import java.util.Arrays; +import java.util.List; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; -import java.util.List; - -import static eu.dnetlib.dhp.PropagationConstant.*; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; - public class PrepareResultOrcidAssociationStep1 { - private static final Logger log = LoggerFactory.getLogger(PrepareResultOrcidAssociationStep1.class); + private static final Logger log = + LoggerFactory.getLogger(PrepareResultOrcidAssociationStep1.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils.toString(SparkOrcidToResultFromSemRelJob3.class - .getResourceAsStream("/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_prepareorcidtoresult_parameters.json")); + String jsonConfiguration = + IOUtils.toString( + SparkOrcidToResultFromSemRelJob3.class.getResourceAsStream( + "/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_prepareorcidtoresult_parameters.json")); - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - jsonConfiguration); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); @@ -51,33 +51,44 @@ public class PrepareResultOrcidAssociationStep1 { final List allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";")); log.info("allowedSemRel: {}", new Gson().toJson(allowedsemrel)); - final String resultType = resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase(); + final String resultType = + resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase(); log.info("resultType: {}", resultType); - - Class resultClazz = (Class) Class.forName(resultClassName); + Class resultClazz = + (Class) Class.forName(resultClassName); SparkConf conf = new SparkConf(); conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); - runWithSparkHiveSession(conf, isSparkSessionManaged, + runWithSparkHiveSession( + conf, + isSparkSessionManaged, spark -> { if (isTest(parser)) { removeOutputDir(spark, outputPath); } - prepareInfo(spark, inputPath, outputPath, resultClazz, resultType, allowedsemrel); + prepareInfo( + spark, inputPath, outputPath, resultClazz, resultType, allowedsemrel); }); } - private static void prepareInfo(SparkSession spark, String inputPath, - String outputPath, Class resultClazz, - String resultType, - List allowedsemrel) { + private static void prepareInfo( + SparkSession spark, + String inputPath, + String outputPath, + Class resultClazz, + String resultType, + List allowedsemrel) { - //read the relation table and the table related to the result it is using + // read the relation table and the table related to the result it is using final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - org.apache.spark.sql.Dataset relation = spark.createDataset(sc.textFile(inputPath + "/relation") - .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)).rdd(), Encoders.bean(Relation.class)); + org.apache.spark.sql.Dataset relation = + spark.createDataset( + sc.textFile(inputPath + "/relation") + .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)) + .rdd(), + Encoders.bean(Relation.class)); relation.createOrReplaceTempView("relation"); log.info("Reading Graph table from: {}", inputPath + "/" + resultType); @@ -86,38 +97,38 @@ public class PrepareResultOrcidAssociationStep1 { result.createOrReplaceTempView("result"); getPossibleResultOrcidAssociation(spark, allowedsemrel, outputPath + "/" + resultType); - } - private static void getPossibleResultOrcidAssociation(SparkSession spark, List allowedsemrel, String outputPath){ - String query = " select target resultId, author authorList" + - " from (select id, collect_set(named_struct('name', name, 'surname', surname, 'fullname', fullname, 'orcid', orcid)) author " + - " from ( " + - " select id, MyT.fullname, MyT.name, MyT.surname, MyP.value orcid " + - " from result " + - " lateral view explode (author) a as MyT " + - " lateral view explode (MyT.pid) p as MyP " + - " where MyP.qualifier.classid = 'ORCID') tmp " + - " group by id) r_t " + - " join (" + - " select source, target " + - " from relation " + - " where datainfo.deletedbyinference = false " + - getConstraintList(" relclass = '" ,allowedsemrel) + ") rel_rel " + - " on source = id"; + private static void getPossibleResultOrcidAssociation( + SparkSession spark, List allowedsemrel, String outputPath) { + String query = + " select target resultId, author authorList" + + " from (select id, collect_set(named_struct('name', name, 'surname', surname, 'fullname', fullname, 'orcid', orcid)) author " + + " from ( " + + " select id, MyT.fullname, MyT.name, MyT.surname, MyP.value orcid " + + " from result " + + " lateral view explode (author) a as MyT " + + " lateral view explode (MyT.pid) p as MyP " + + " where MyP.qualifier.classid = 'ORCID') tmp " + + " group by id) r_t " + + " join (" + + " select source, target " + + " from relation " + + " where datainfo.deletedbyinference = false " + + getConstraintList(" relclass = '", allowedsemrel) + + ") rel_rel " + + " on source = id"; spark.sql(query) .as(Encoders.bean(ResultOrcidList.class)) .toJavaRDD() .map(r -> OBJECT_MAPPER.writeValueAsString(r)) .saveAsTextFile(outputPath, GzipCodec.class); -// .toJSON() -// .write() -// .mode(SaveMode.Append) -// .option("compression","gzip") -// .text(outputPath) -// ; + // .toJSON() + // .write() + // .mode(SaveMode.Append) + // .option("compression","gzip") + // .text(outputPath) + // ; } - - } diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep2.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep2.java index e38ba65833..658c97f6c5 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep2.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep2.java @@ -1,7 +1,12 @@ package eu.dnetlib.dhp.orcidtoresultfromsemrel; +import static eu.dnetlib.dhp.PropagationConstant.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import java.util.HashSet; +import java.util.Set; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; @@ -9,22 +14,20 @@ import org.apache.spark.sql.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Tuple2; -import java.util.HashSet; -import java.util.Set; -import static eu.dnetlib.dhp.PropagationConstant.*; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; public class PrepareResultOrcidAssociationStep2 { - private static final Logger log = LoggerFactory.getLogger(PrepareResultOrcidAssociationStep2.class); + private static final Logger log = + LoggerFactory.getLogger(PrepareResultOrcidAssociationStep2.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils.toString(PrepareResultOrcidAssociationStep2.class - .getResourceAsStream("/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_prepareorcidtoresult_parameters2.json")); + String jsonConfiguration = + IOUtils.toString( + PrepareResultOrcidAssociationStep2.class.getResourceAsStream( + "/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_prepareorcidtoresult_parameters2.json")); - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - jsonConfiguration); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); @@ -39,56 +42,60 @@ public class PrepareResultOrcidAssociationStep2 { SparkConf conf = new SparkConf(); - runWithSparkSession(conf, isSparkSessionManaged, + runWithSparkSession( + conf, + isSparkSessionManaged, spark -> { if (isTest(parser)) { removeOutputDir(spark, outputPath); } - mergeInfo(spark, inputPath, outputPath); + mergeInfo(spark, inputPath, outputPath); }); } private static void mergeInfo(SparkSession spark, String inputPath, String outputPath) { - Dataset resultOrcidAssoc = readAssocResultOrcidList(spark, inputPath + "/publication") - .union(readAssocResultOrcidList(spark, inputPath + "/dataset")) - .union(readAssocResultOrcidList(spark, inputPath + "/otherresearchproduct")) - .union(readAssocResultOrcidList(spark, inputPath + "/software")); + Dataset resultOrcidAssoc = + readAssocResultOrcidList(spark, inputPath + "/publication") + .union(readAssocResultOrcidList(spark, inputPath + "/dataset")) + .union(readAssocResultOrcidList(spark, inputPath + "/otherresearchproduct")) + .union(readAssocResultOrcidList(spark, inputPath + "/software")); resultOrcidAssoc .toJavaRDD() .mapToPair(r -> new Tuple2<>(r.getResultId(), r)) - .reduceByKey((a, b) -> { - if (a == null) { - return b; - } - if (b == null) { - return a; - } - Set orcid_set = new HashSet<>(); - a.getAuthorList().stream().forEach(aa -> orcid_set.add(aa.getOrcid())); + .reduceByKey( + (a, b) -> { + if (a == null) { + return b; + } + if (b == null) { + return a; + } + Set orcid_set = new HashSet<>(); + a.getAuthorList().stream().forEach(aa -> orcid_set.add(aa.getOrcid())); - b.getAuthorList().stream().forEach(aa -> { - if (!orcid_set.contains(aa.getOrcid())) { - a.getAuthorList().add(aa); - orcid_set.add(aa.getOrcid()); - } - }); - return a; - }) + b.getAuthorList().stream() + .forEach( + aa -> { + if (!orcid_set.contains(aa.getOrcid())) { + a.getAuthorList().add(aa); + orcid_set.add(aa.getOrcid()); + } + }); + return a; + }) .map(c -> c._2()) .map(r -> OBJECT_MAPPER.writeValueAsString(r)) .saveAsTextFile(outputPath, GzipCodec.class); } - private static Dataset readAssocResultOrcidList(SparkSession spark, String relationPath) { - return spark - .read() + private static Dataset readAssocResultOrcidList( + SparkSession spark, String relationPath) { + return spark.read() .textFile(relationPath) - .map(value -> OBJECT_MAPPER.readValue(value, ResultOrcidList.class), Encoders.bean(ResultOrcidList.class)); + .map( + value -> OBJECT_MAPPER.readValue(value, ResultOrcidList.class), + Encoders.bean(ResultOrcidList.class)); } - - - - } diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob3.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob3.java index b8fef7ef61..75527552a9 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob3.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob3.java @@ -1,10 +1,15 @@ package eu.dnetlib.dhp.orcidtoresultfromsemrel; +import static eu.dnetlib.dhp.PropagationConstant.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; + import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.Author; import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; +import java.util.List; +import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; @@ -15,30 +20,25 @@ import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; -import java.util.Optional; - -import static eu.dnetlib.dhp.PropagationConstant.*; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; - public class SparkOrcidToResultFromSemRelJob3 { - private static final Logger log = LoggerFactory.getLogger(SparkOrcidToResultFromSemRelJob3.class); + private static final Logger log = + LoggerFactory.getLogger(SparkOrcidToResultFromSemRelJob3.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils.toString(SparkOrcidToResultFromSemRelJob3.class - .getResourceAsStream("/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_orcidtoresult_parameters.json")); + String jsonConfiguration = + IOUtils.toString( + SparkOrcidToResultFromSemRelJob3.class.getResourceAsStream( + "/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_orcidtoresult_parameters.json")); - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - jsonConfiguration); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); Boolean isSparkSessionManaged = isSparkSessionManaged(parser); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - final String inputPath = parser.get("sourcePath"); log.info("inputPath: {}", inputPath); @@ -51,96 +51,116 @@ public class SparkOrcidToResultFromSemRelJob3 { final String resultClassName = parser.get("resultTableName"); log.info("resultTableName: {}", resultClassName); - final Boolean saveGraph = Optional - .ofNullable(parser.get("saveGraph")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); + final Boolean saveGraph = + Optional.ofNullable(parser.get("saveGraph")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); log.info("saveGraph: {}", saveGraph); - Class resultClazz = (Class) Class.forName(resultClassName); + Class resultClazz = + (Class) Class.forName(resultClassName); SparkConf conf = new SparkConf(); conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); - runWithSparkHiveSession(conf, isSparkSessionManaged, + runWithSparkHiveSession( + conf, + isSparkSessionManaged, spark -> { - if(isTest(parser)) { + if (isTest(parser)) { removeOutputDir(spark, outputPath); } - if(saveGraph) + if (saveGraph) execPropagation(spark, possibleUpdates, inputPath, outputPath, resultClazz); }); - } - private static void execPropagation(SparkSession spark, String possibleUpdatesPath, String inputPath, - String outputPath, Class resultClazz ) { + private static void execPropagation( + SparkSession spark, + String possibleUpdatesPath, + String inputPath, + String outputPath, + Class resultClazz) { - //read possible updates (resultId and list of possible orcid to add - Dataset possible_updates = readAssocResultOrcidList(spark, possibleUpdatesPath); - //read the result we have been considering + // read possible updates (resultId and list of possible orcid to add + Dataset possible_updates = + readAssocResultOrcidList(spark, possibleUpdatesPath); + // read the result we have been considering Dataset result = readPathEntity(spark, inputPath, resultClazz); - //make join result left_outer with possible updates + // make join result left_outer with possible updates - result.joinWith(possible_updates, result.col("id").equalTo(possible_updates.col("resultId")), - "left_outer") - .map(value -> { - R ret = value._1(); - Optional rol = Optional.ofNullable(value._2()); - if(rol.isPresent()) { - List toenrich_author = ret.getAuthor(); - List autoritativeAuthors = rol.get().getAuthorList(); - for(Author author: toenrich_author){ - if (!containsAllowedPid(author)){ - enrichAuthor(author, autoritativeAuthors); + result.joinWith( + possible_updates, + result.col("id").equalTo(possible_updates.col("resultId")), + "left_outer") + .map( + value -> { + R ret = value._1(); + Optional rol = Optional.ofNullable(value._2()); + if (rol.isPresent()) { + List toenrich_author = ret.getAuthor(); + List autoritativeAuthors = + rol.get().getAuthorList(); + for (Author author : toenrich_author) { + if (!containsAllowedPid(author)) { + enrichAuthor(author, autoritativeAuthors); + } } } - } - - return ret; - }, Encoders.bean(resultClazz)) - .toJSON() - .write() - .mode(SaveMode.Overwrite) - .option("compression","gzip") - .text(outputPath); - + return ret; + }, + Encoders.bean(resultClazz)) + .toJSON() + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .text(outputPath); } - private static Dataset readAssocResultOrcidList(SparkSession spark, String relationPath) { - return spark - .read() + private static Dataset readAssocResultOrcidList( + SparkSession spark, String relationPath) { + return spark.read() .textFile(relationPath) - .map(value -> OBJECT_MAPPER.readValue(value, ResultOrcidList.class), Encoders.bean(ResultOrcidList.class)); + .map( + value -> OBJECT_MAPPER.readValue(value, ResultOrcidList.class), + Encoders.bean(ResultOrcidList.class)); } - private static void enrichAuthor(Author a, List au){ - for (AutoritativeAuthor aa: au){ - if(enrichAuthor(aa, a)){ + private static void enrichAuthor(Author a, List au) { + for (AutoritativeAuthor aa : au) { + if (enrichAuthor(aa, a)) { return; } } - } - - private static boolean enrichAuthor(AutoritativeAuthor autoritative_author, Author author) { boolean toaddpid = false; if (StringUtils.isNoneEmpty(autoritative_author.getSurname())) { if (StringUtils.isNoneEmpty(author.getSurname())) { - if (autoritative_author.getSurname().trim().equalsIgnoreCase(author.getSurname().trim())) { + if (autoritative_author + .getSurname() + .trim() + .equalsIgnoreCase(author.getSurname().trim())) { - //have the same surname. Check the name + // have the same surname. Check the name if (StringUtils.isNoneEmpty(autoritative_author.getName())) { if (StringUtils.isNoneEmpty(author.getName())) { - if (autoritative_author.getName().trim().equalsIgnoreCase(author.getName().trim())) { + if (autoritative_author + .getName() + .trim() + .equalsIgnoreCase(author.getName().trim())) { toaddpid = true; } - //they could be differently written (i.e. only the initials of the name in one of the two - if (autoritative_author.getName().trim().substring(0, 0).equalsIgnoreCase(author.getName().trim().substring(0, 0))) { + // they could be differently written (i.e. only the initials of the name + // in one of the two + if (autoritative_author + .getName() + .trim() + .substring(0, 0) + .equalsIgnoreCase(author.getName().trim().substring(0, 0))) { toaddpid = true; } } @@ -148,21 +168,20 @@ public class SparkOrcidToResultFromSemRelJob3 { } } } - if (toaddpid){ + if (toaddpid) { StructuredProperty p = new StructuredProperty(); p.setValue(autoritative_author.getOrcid()); - p.setQualifier(getQualifier(PROPAGATION_AUTHOR_PID, PROPAGATION_AUTHOR_PID )); - p.setDataInfo(getDataInfo(PROPAGATION_DATA_INFO_TYPE, PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_ID, PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_NAME)); + p.setQualifier(getQualifier(PROPAGATION_AUTHOR_PID, PROPAGATION_AUTHOR_PID)); + p.setDataInfo( + getDataInfo( + PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_ID, + PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_NAME)); author.addPid(p); - } return toaddpid; - } - - - private static boolean containsAllowedPid(Author a) { for (StructuredProperty pid : a.getPid()) { if (PROPAGATION_AUTHOR_PID.equals(pid.getQualifier().getClassid())) { @@ -171,5 +190,4 @@ public class SparkOrcidToResultFromSemRelJob3 { } return false; } - }