From fd5d792e353c5121a0c3fedbfba2ff9b5167de45 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Thu, 16 Apr 2020 15:53:34 +0200 Subject: [PATCH] refactoring --- .../SparkCountryPropagationJob2.java | 6 +- .../PrepareResultOrcidAssociationStep1.java | 88 ++++ .../PrepareResultOrcidAssociationStep2.java | 119 ++++++ .../ResultOrcidList.java | 26 ++ .../ResultWithOrcid.java | 4 - .../SparkOrcidToResultFromSemRelJob3.java | 398 ++++++++++++++++++ .../oozie_app/config-default.xml | 18 - .../oozie_app/workflow.xml | 55 --- .../input_orcidtoresult_parameters.json | 12 + ...input_prepareorcidtoresult_parameters.json | 32 ++ ...nput_prepareorcidtoresult_parameters2.json | 38 ++ .../oozie_app/config-default.xml | 54 +++ .../oozie_app/workflow.xml | 71 ++++ 13 files changed, 841 insertions(+), 80 deletions(-) create mode 100644 dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep1.java create mode 100644 dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep2.java create mode 100644 dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/ResultOrcidList.java delete mode 100644 dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/ResultWithOrcid.java create mode 100644 dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob3.java delete mode 100644 dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromremrel/oozie_app/config-default.xml delete mode 100644 dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromremrel/oozie_app/workflow.xml rename dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/{orcidtoresultfromremrel => orcidtoresultfromsemrel}/input_orcidtoresult_parameters.json (55%) create mode 100644 dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_prepareorcidtoresult_parameters.json create mode 100644 dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_prepareorcidtoresult_parameters2.json create mode 100644 dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/oozie_app/workflow.xml diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob2.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob2.java index 029be645d6..745a99db81 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob2.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob2.java @@ -121,9 +121,9 @@ public class SparkCountryPropagationJob2 { .map(r -> new Tuple2<>(r.getId(), r), Encoders.tuple(Encoders.STRING(), Encoders.bean(resultClazz))); - Dataset> potential_update_pair = potentialUpdates.map(pu -> new Tuple2<>(pu.getResultId(), - pu), - Encoders.tuple(Encoders.STRING(), Encoders.bean(ResultCountrySet.class))); +// Dataset> potential_update_pair = potentialUpdates.map(pu -> new Tuple2<>(pu.getResultId(), +// pu), +// Encoders.tuple(Encoders.STRING(), Encoders.bean(ResultCountrySet.class))); Dataset new_table = result_pair .joinWith(potentialUpdates, result_pair.col("_1").equalTo(potentialUpdates.col("resultId")), "left_outer") diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep1.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep1.java new file mode 100644 index 0000000000..c8a7b90ce1 --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep1.java @@ -0,0 +1,88 @@ +package eu.dnetlib.dhp.orcidtoresultfromsemrel; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.gson.Gson; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.Result; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import static eu.dnetlib.dhp.PropagationConstant.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; + +public class PrepareResultOrcidAssociation { + private static final Logger log = LoggerFactory.getLogger(PrepareResultOrcidAssociation.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils.toString(SparkOrcidToResultFromSemRelJob3.class + .getResourceAsStream("/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_prepareorcidtoresult_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + jsonConfiguration); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = isSparkSessionManaged(parser); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + final String resultClassName = parser.get("resultTableName"); + log.info("resultTableName: {}", resultClassName); + + final List allowedsemrel = Arrays.asList(parser.get("allowedsemrel").split(";")); + log.info("allowedSemRel: {}", new Gson().toJson(allowedsemrel)); + + final String resultType = resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase(); + log.info("resultType: {}", resultType); + + + Class resultClazz = (Class) Class.forName(resultClassName); + + SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); + + runWithSparkHiveSession(conf, isSparkSessionManaged, + spark -> { + if (isTest(parser)) { + removeOutputDir(spark, outputPath); + } + prepareInfo(spark, inputPath, outputPath, resultClazz, resultType); + }); + } + + private static void prepareInfo(SparkSession spark, String inputPath, + String outputPath, Class resultClazz, + String resultType) { + + //read the relation table and the table related to the result it is using + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + org.apache.spark.sql.Dataset relation = spark.createDataset(sc.textFile(inputPath + "/relation") + .map(item -> new ObjectMapper().readValue(item, Relation.class)).rdd(), Encoders.bean(Relation.class)); + relation.createOrReplaceTempView("relation"); + + log.info("Reading Graph table from: {}", inputPath + "/" + resultType); + Dataset result = readPathEntity(spark, inputPath + "/" + resultType, resultClazz); + + + + + } +} diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep2.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep2.java new file mode 100644 index 0000000000..4af652ef0d --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep2.java @@ -0,0 +1,119 @@ +package eu.dnetlib.dhp.orcidtoresultfromsemrel; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.gson.Gson; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.Result; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; + +import static eu.dnetlib.dhp.PropagationConstant.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; + +public class PrepareResultOrcidAssociationStep1 { + private static final Logger log = LoggerFactory.getLogger(PrepareResultOrcidAssociationStep1.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils.toString(SparkOrcidToResultFromSemRelJob3.class + .getResourceAsStream("/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_prepareorcidtoresult_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + jsonConfiguration); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = isSparkSessionManaged(parser); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + final String resultClassName = parser.get("resultTableName"); + log.info("resultTableName: {}", resultClassName); + + final List allowedsemrel = Arrays.asList(parser.get("allowedsemrel").split(";")); + log.info("allowedSemRel: {}", new Gson().toJson(allowedsemrel)); + + final String resultType = resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase(); + log.info("resultType: {}", resultType); + + + Class resultClazz = (Class) Class.forName(resultClassName); + + SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); + + runWithSparkHiveSession(conf, isSparkSessionManaged, + spark -> { + if (isTest(parser)) { + removeOutputDir(spark, outputPath); + } + prepareInfo(spark, inputPath, outputPath, resultClazz, resultType, allowedsemrel); + }); + } + + private static void prepareInfo(SparkSession spark, String inputPath, + String outputPath, Class resultClazz, + String resultType, + List allowedsemrel) { + + //read the relation table and the table related to the result it is using + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + org.apache.spark.sql.Dataset relation = spark.createDataset(sc.textFile(inputPath + "/relation") + .map(item -> new ObjectMapper().readValue(item, Relation.class)).rdd(), Encoders.bean(Relation.class)); + relation.createOrReplaceTempView("relation"); + + log.info("Reading Graph table from: {}", inputPath + "/" + resultType); + Dataset result = readPathEntity(spark, inputPath + "/" + resultType, resultClazz); + + result.createOrReplaceTempView("result"); + + getPossibleResultOrcidAssociation(spark, allowedsemrel, outputPath); + + } + + private static void getPossibleResultOrcidAssociation(SparkSession spark, List allowedsemrel, String outputPath){ + String query = " select target resultId, author authorList" + + " from (select id, collect_set(named_struct('name', name, 'surname', surname, 'fullname', fullname, 'orcid', orcid)) author " + + " from ( " + + " select id, MyT.fullname, MyT.name, MyT.surname, MyP.value orcid " + + " from result " + + " lateral view explode (author) a as MyT " + + " lateral view explode (MyT.pid) p as MyP " + + " where MyP.qualifier.classid = 'ORCID') tmp " + + " group by id) r_t " + + " join (" + + " select source, target " + + " from relation " + + " where datainfo.deletedbyinference = false " + + getConstraintList(" relclass = '" ,allowedsemrel) + ") rel_rel " + + " on source = id"; + + spark.sql(query) + .as(Encoders.bean(ResultOrcidList.class)) + .toJSON() + .write() + .mode(SaveMode.Append) + .option("compression","gzip") + .text(outputPath) + ; + } + + +} diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/ResultOrcidList.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/ResultOrcidList.java new file mode 100644 index 0000000000..e90795b1df --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/ResultOrcidList.java @@ -0,0 +1,26 @@ +package eu.dnetlib.dhp.orcidtoresultfromsemrel; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +public class ResultWithOrcid implements Serializable { + String id; + List authorList = new ArrayList<>(); + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public List getAuthorList() { + return authorList; + } + + public void setAuthorList(List authorList) { + this.authorList = authorList; + } +} diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/ResultWithOrcid.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/ResultWithOrcid.java deleted file mode 100644 index 49fbea567f..0000000000 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/ResultWithOrcid.java +++ /dev/null @@ -1,4 +0,0 @@ -package eu.dnetlib.dhp.orcidtoresultfromsemrel; - -public class ResultWithOrcid { -} diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob3.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob3.java new file mode 100644 index 0000000000..ddeee391c9 --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob3.java @@ -0,0 +1,398 @@ +package eu.dnetlib.dhp.orcidtoresultfromsemrel; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.*; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import scala.Tuple2; + +import java.util.*; + +import static eu.dnetlib.dhp.PropagationConstant.*; + +public class SparkOrcidToResultFromSemRelJob2 { + public static void main(String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkOrcidToResultFromSemRelJob2.class.getResourceAsStream("/eu/dnetlib/dhp/orcidtoresultfromremrel/input_orcidtoresult_parameters.json"))); + parser.parseArgument(args); + SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); + final SparkSession spark = SparkSession + .builder() + .appName(SparkOrcidToResultFromSemRelJob2.class.getSimpleName()) + .master(parser.get("master")) + .config(conf) + .enableHiveSupport() + .getOrCreate(); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final String inputPath = parser.get("sourcePath"); + final String outputPath = "/tmp/provision/propagation/orcidtoresult"; + + final List allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";")); + boolean writeUpdate = TRUE.equals(parser.get("writeUpdate")); + boolean saveGraph = TRUE.equals(parser.get("saveGraph")); + + createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); + + org.apache.spark.sql.Dataset relation = spark.createDataset(sc.textFile(inputPath + "/relation") + .map(item -> new ObjectMapper().readValue(item, Relation.class)).rdd(), Encoders.bean(Relation.class)); + + org.apache.spark.sql.Dataset dataset = spark.createDataset(sc.textFile(inputPath + "/dataset") + .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.Dataset.class)).rdd(), + Encoders.bean(eu.dnetlib.dhp.schema.oaf.Dataset.class)); + + org.apache.spark.sql.Dataset other = spark.createDataset(sc.textFile(inputPath + "/otherresearchproduct") + .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.OtherResearchProduct.class)).rdd(), + Encoders.bean(eu.dnetlib.dhp.schema.oaf.OtherResearchProduct.class)); + + org.apache.spark.sql.Dataset software = spark.createDataset(sc.textFile(inputPath + "/software") + .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.Software.class)).rdd(), + Encoders.bean(eu.dnetlib.dhp.schema.oaf.Software.class)); + + org.apache.spark.sql.Dataset publication = spark.createDataset(sc.textFile(inputPath + "/publication") + .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.Publication.class)).rdd(), + Encoders.bean(eu.dnetlib.dhp.schema.oaf.Publication.class)); + + + relation.createOrReplaceTempView("relation"); + String query = "Select source, target " + + "from relation " + + "where datainfo.deletedbyinference = false " + getConstraintList(" relclass = '" , allowedsemrel); + + org.apache.spark.sql.Dataset result_result = spark.sql(query); + + publication.createOrReplaceTempView("publication"); + org.apache.spark.sql.Dataset pubs_with_orcid = getResultWithOrcid("publication", spark) + .as(Encoders.bean(ResultWithOrcid.class)); + + dataset.createOrReplaceTempView("dataset"); + org.apache.spark.sql.Dataset dats_with_orcid = getResultWithOrcid("dataset", spark) + .as(Encoders.bean(ResultWithOrcid.class)); + + other.createOrReplaceTempView("orp"); + org.apache.spark.sql.Dataset orp_with_orcid = getResultWithOrcid("orp", spark) + .as(Encoders.bean(ResultWithOrcid.class)); + + dataset.createOrReplaceTempView("software"); + org.apache.spark.sql.Dataset software_with_orcid = getResultWithOrcid("software", spark) + .as(Encoders.bean(ResultWithOrcid.class)); + //get the results having at least one author pid we are interested in + + //target of the relation from at least one source with orcid. + //the set of authors contains all those that have orcid and are related to target + //from any source with allowed semantic relationship + JavaPairRDD> target_authorlist_from_pubs = getTargetAutoritativeAuthorList(pubs_with_orcid); + + JavaPairRDD> target_authorlist_from_dats = getTargetAutoritativeAuthorList(dats_with_orcid); + + JavaPairRDD> target_authorlist_from_orp = getTargetAutoritativeAuthorList(orp_with_orcid); + + JavaPairRDD> target_authorlist_from_sw = getTargetAutoritativeAuthorList(software_with_orcid); + + if(writeUpdate){ + target_authorlist_from_dats.map(r -> new ObjectMapper().writeValueAsString(r)) + .saveAsTextFile(outputPath + "/" + "update_dats"); + target_authorlist_from_pubs.map(r -> new ObjectMapper().writeValueAsString(r)) + .saveAsTextFile(outputPath + "/" + "update_pubs"); + target_authorlist_from_orp.map(r -> new ObjectMapper().writeValueAsString(r)) + .saveAsTextFile(outputPath + "/" + "update_orp"); + target_authorlist_from_sw.map(r -> new ObjectMapper().writeValueAsString(r)) + .saveAsTextFile(outputPath + "/" + "update_sw"); + } + + if(saveGraph){ + sc.textFile(inputPath + "/publication") + .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.Publication.class)) + .mapToPair(p -> new Tuple2<>(p.getId(),p)) + .leftOuterJoin(target_authorlist_from_pubs) + .map(c -> { + Result r = c._2()._1(); + if(!c._2()._2().isPresent()){ + return r; + } + List toenrich_author = r.getAuthor(); + List autoritativeAuthors = c._2()._2().get(); + for(eu.dnetlib.dhp.schema.oaf.Author author: toenrich_author){ + if (!containsAllowedPid(author)){ + enrichAuthor(author, autoritativeAuthors); + } + } + return r; + }); + + } + + } + + private static void enrichAuthor(eu.dnetlib.dhp.schema.oaf.Author a, List au){ + for (AutoritativeAuthor aa: au){ + if(enrichAuthor(aa, a)){ + return; + } + } + + } + +// private static JavaPairRDD> getTargetAutoritativeAuthorList(org.apache.spark.sql.Dataset result_result, org.apache.spark.sql.Dataset pubs_with_orcid) { +// return pubs_with_orcid +// .toJavaRDD() +// .mapToPair(p -> new Tuple2<>(p.getId(), p.getAuthorList())) +// .join(result_result.toJavaRDD().mapToPair(rel -> new Tuple2<>(rel.getString(0), rel.getString(1)))) +// .mapToPair(c -> new Tuple2<>(c._2._2(), c._2()._1())) +// .reduceByKey((a, b) -> { +// if(a == null){ +// return b; +// } +// if(b==null){ +// return a; +// } +// +// Set authSet = new HashSet<>(); +// a.stream().forEach(au -> authSet.add(au.getOrcid())); +// +// b.stream().forEach(au -> { +// if (!authSet.contains(au.getOrcid())) { +// a.add(au); +// } +// } +// ); +// return a; +// }); +// } +private static JavaPairRDD> getTargetAutoritativeAuthorList( org.apache.spark.sql.Dataset pubs_with_orcid) { + return pubs_with_orcid + .toJavaRDD() + .mapToPair(p -> new Tuple2<>(p.getId(), p.getAuthorList())) + .reduceByKey((a, b) -> { + if(a == null){ + return b; + } + if(b==null){ + return a; + } + Set authSet = new HashSet<>(); + a.stream().forEach(au -> authSet.add(au.getOrcid())); + + b.stream().forEach(au -> { + if (!authSet.contains(au.getOrcid())) { + a.add(au); + } + } + ); + return a; + }); +} + + private static org.apache.spark.sql.Dataset getResultWithOrcid(String table, SparkSession spark){ + String query = " select target, author " + + " from (select id, collect_set(named_struct('name', name, 'surname', surname, 'fullname', fullname, 'orcid', orcid)) author " + + " from ( " + + " select id, MyT.fullname, MyT.name, MyT.surname, MyP.value orcid " + + " from " + table + + " lateral view explode (author) a as MyT " + + " lateral view explode (MyT.pid) p as MyP " + + " where MyP.qualifier.classid = 'ORCID') tmp " + + " group by id) r_t " + + " join (" + + " select source, target " + + " from relation " + + " where datainfo.deletedbyinference = false and (relclass = 'isSupplementedBy' or relclass = 'isSupplementTo') rel_rel " + + " on source = id"; + + return spark.sql(query); + } + + + private static boolean enrichAuthor(AutoritativeAuthor autoritative_author, eu.dnetlib.dhp.schema.oaf.Author author) { + boolean toaddpid = false; + + if (StringUtils.isNoneEmpty(autoritative_author.getSurname())) { + if (StringUtils.isNoneEmpty(author.getSurname())) { + if (autoritative_author.getSurname().trim().equalsIgnoreCase(author.getSurname().trim())) { + + //have the same surname. Check the name + if (StringUtils.isNoneEmpty(autoritative_author.getName())) { + if (StringUtils.isNoneEmpty(author.getName())) { + if (autoritative_author.getName().trim().equalsIgnoreCase(author.getName().trim())) { + toaddpid = true; + } + //they could be differently written (i.e. only the initials of the name in one of the two + if (autoritative_author.getName().trim().substring(0, 0).equalsIgnoreCase(author.getName().trim().substring(0, 0))) { + toaddpid = true; + } + } + } + } + } + } + if (toaddpid){ + StructuredProperty pid = new StructuredProperty(); + String aa_pid = autoritative_author.getOrcid(); + pid.setValue(aa_pid); + pid.setQualifier(getQualifier(PROPAGATION_AUTHOR_PID, PROPAGATION_AUTHOR_PID )); + pid.setDataInfo(getDataInfo(PROPAGATION_DATA_INFO_TYPE, PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_ID, PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_NAME)); + if(author.getPid() == null){ + author.setPid(Arrays.asList(pid)); + }else{ + author.getPid().add(pid); + } + + } + return toaddpid; + + } + + +// private static List enrichAuthors(List autoritative_authors, List to_enrich_authors, boolean filter){ +//// List autoritative_authors = p._2()._2().get().getAuthors(); +//// List to_enrich_authors = r.getAuthor(); +// +// return to_enrich_authors +// .stream() +// .map(a -> { +// if (filter) { +// if (containsAllowedPid(a)) { +// return a; +// } +// } +// +// List lst = autoritative_authors.stream() +// .map(aa -> enrichAuthor(aa, a)).filter(au -> !(au == null)).collect(Collectors.toList()); +// if (lst.size() == 0) { +// return a; +// } +// return lst.get(0);//Each author can be enriched at most once. It cannot be the same as many different people +// +// }).collect(Collectors.toList()); +// } +// +// private static void writeResult(JavaPairRDD results, JavaPairRDD toupdateresult, +// String outputPath, String type) { +// +// results.join(toupdateresult) +// .map(p -> { +// Result r = p._2()._1(); +// +// List autoritative_authors = p._2()._2().getAuthors(); +// List to_enrich_authors = r.getAuthor(); +// +// r.setAuthor(enrichAutors(autoritative_authors, to_enrich_authors, false)); +//// .stream() +//// .map(a -> { +//// if(filter) { +//// if (containsAllowedPid(a)) { +//// return a; +//// } +//// } +//// +//// List lst = autoritative_authors.stream() +//// .map(aa -> enrichAuthor(aa, a)).filter(au -> !(au == null)).collect(Collectors.toList()); +//// if(lst.size() == 0){ +//// return a; +//// } +//// return lst.get(0);//Each author can be enriched at most once. It cannot be the same as many different people +//// +//// }).collect(Collectors.toList())); +// +// return r; +// }) +// .map(p -> new ObjectMapper().writeValueAsString(p)) +// .saveAsTextFile(outputPath + "/" + type + "_update"); +// } + + +// private static void updateResult(JavaPairRDD results, JavaPairRDD toupdateresult, +// String outputPath, String type) { +// results.leftOuterJoin(toupdateresult) +// .map(p -> { +// Result r = p._2()._1(); +// if (p._2()._2().isPresent()){ +// List autoritative_authors = p._2()._2().get().getAuthors(); +// List to_enrich_authors = r.getAuthor(); +// +// r.setAuthor(enrichAutors(autoritative_authors, to_enrich_authors, true)); +//// .stream() +//// .map(a -> { +//// if(filter) { +//// if (containsAllowedPid(a)) { +//// return a; +//// } +//// } +//// +//// List lst = autoritative_authors.stream() +//// .map(aa -> enrichAuthor(aa, a)).filter(au -> !(au == null)).collect(Collectors.toList()); +//// if(lst.size() == 0){ +//// return a; +//// } +//// return lst.get(0);//Each author can be enriched at most once. It cannot be the same as many different people +//// +//// }).collect(Collectors.toList())); +// } +// return r; +// }) +// .map(p -> new ObjectMapper().writeValueAsString(p)) +// .saveAsTextFile(outputPath+"/"+type); +// } + + + + private static boolean containsAllowedPid(eu.dnetlib.dhp.schema.oaf.Author a) { + for (StructuredProperty pid : a.getPid()) { + if (PROPAGATION_AUTHOR_PID.equals(pid.getQualifier().getClassid())) { + return true; + } + } + return false; + } + +} + + +/*private ResultProtos.Result.Metadata.Builder searchMatch(List author_list){ + ResultProtos.Result.Metadata.Builder metadataBuilder = ResultProtos.Result.Metadata.newBuilder(); + boolean updated = false; + + for (FieldTypeProtos.Author a: author_list){ + FieldTypeProtos.Author.Builder author = searchAuthor(a, autoritative_authors); + if(author != null){ + updated = true; + metadataBuilder.addAuthor(author); + }else{ + metadataBuilder.addAuthor(FieldTypeProtos.Author.newBuilder(a)); + } + } + if(updated) + return metadataBuilder; + return null; + } + private FieldTypeProtos.Author.Builder searchAuthor(FieldTypeProtos.Author a, List author_list){ + if(containsOrcid(a.getPidList())) + return null; + for(FieldTypeProtos.Author autoritative_author : author_list) { + if (equals(autoritative_author, a)) { + if(!containsOrcid(a.getPidList())) + return update(a, autoritative_author); + } + } + return null; + + } + + private boolean containsOrcid(List pidList){ + if(pidList == null) + return false; + return pidList + .stream() + .filter(kv -> kv.getKey().equals(PropagationConstants.AUTHOR_PID)) + .collect(Collectors.toList()).size() > 0; + } + */ \ No newline at end of file diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromremrel/oozie_app/config-default.xml b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromremrel/oozie_app/config-default.xml deleted file mode 100644 index 2e0ed9aeea..0000000000 --- a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromremrel/oozie_app/config-default.xml +++ /dev/null @@ -1,18 +0,0 @@ - - - jobTracker - yarnRM - - - nameNode - hdfs://nameservice1 - - - oozie.use.system.libpath - true - - - oozie.action.sharelib.for.spark - spark2 - - \ No newline at end of file diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromremrel/oozie_app/workflow.xml b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromremrel/oozie_app/workflow.xml deleted file mode 100644 index 15065b35a5..0000000000 --- a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromremrel/oozie_app/workflow.xml +++ /dev/null @@ -1,55 +0,0 @@ - - - - sourcePath - the source path - - - allowedsemrels - the semantic relationships allowed for propagation - - - sparkDriverMemory - memory for driver process - - - sparkExecutorMemory - memory for individual executor - - - sparkExecutorCores - number of cores used by single executor - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - ${jobTracker} - ${nameNode} - yarn-cluster - cluster - AffiliatioPropagation - eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob - dhp-propagation-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} - --executor-cores ${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" - --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" - - -mt yarn-cluster - --sourcePath${sourcePath} - --allowedsemrels${allowedsemrels} - - - - - - - \ No newline at end of file diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromremrel/input_orcidtoresult_parameters.json b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_orcidtoresult_parameters.json similarity index 55% rename from dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromremrel/input_orcidtoresult_parameters.json rename to dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_orcidtoresult_parameters.json index ffb314cdf5..ea2f0b96ce 100644 --- a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromremrel/input_orcidtoresult_parameters.json +++ b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_orcidtoresult_parameters.json @@ -16,5 +16,17 @@ "paramLongName":"allowedsemrels", "paramDescription": "the allowed sematinc relations for propagation", "paramRequired": true + }, + { + "paramName":"wu", + "paramLongName":"writeUpdate", + "paramDescription": "true if the update must be writte. No double check if information is already present", + "paramRequired": true + }, + { + "paramName":"sg", + "paramLongName":"saveGraph", + "paramDescription": "true if the new version of the graph must be saved", + "paramRequired": true } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_prepareorcidtoresult_parameters.json b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_prepareorcidtoresult_parameters.json new file mode 100644 index 0000000000..ea2f0b96ce --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_prepareorcidtoresult_parameters.json @@ -0,0 +1,32 @@ +[ + { + "paramName":"mt", + "paramLongName":"master", + "paramDescription": "should be local or yarn", + "paramRequired": true + }, + { + "paramName":"s", + "paramLongName":"sourcePath", + "paramDescription": "the path of the sequencial file to read", + "paramRequired": true + }, + { + "paramName":"as", + "paramLongName":"allowedsemrels", + "paramDescription": "the allowed sematinc relations for propagation", + "paramRequired": true + }, + { + "paramName":"wu", + "paramLongName":"writeUpdate", + "paramDescription": "true if the update must be writte. No double check if information is already present", + "paramRequired": true + }, + { + "paramName":"sg", + "paramLongName":"saveGraph", + "paramDescription": "true if the new version of the graph must be saved", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_prepareorcidtoresult_parameters2.json b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_prepareorcidtoresult_parameters2.json new file mode 100644 index 0000000000..08648d61a1 --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_prepareorcidtoresult_parameters2.json @@ -0,0 +1,38 @@ +[ + { + "paramName":"s", + "paramLongName":"sourcePath", + "paramDescription": "the path of the sequencial file to read", + "paramRequired": true + }, + { + "paramName":"as", + "paramLongName":"allowedsemrels", + "paramDescription": "the allowed sematinc relations for propagation", + "paramRequired": true + }, + { + "paramName":"h", + "paramLongName":"hive_metastore_uris", + "paramDescription": "the hive metastore uris", + "paramRequired": true + }, + { + "paramName": "out", + "paramLongName": "outputPath", + "paramDescription": "the path used to store temporary output files", + "paramRequired": true + }, + { + "paramName": "ssm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "true if the spark session is managed, false otherwise", + "paramRequired": false + }, + { + "paramName":"tn", + "paramLongName":"resultTableName", + "paramDescription": "the name of the result table we are currently working on", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/oozie_app/config-default.xml b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/oozie_app/config-default.xml new file mode 100644 index 0000000000..56d0ac43c8 --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/oozie_app/config-default.xml @@ -0,0 +1,54 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + spark2YarnHistoryServerAddress + http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089 + + + spark2EventLogDir + /user/spark/spark2ApplicationHistory + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + + + sparkExecutorNumber + 4 + + + sparkDriverMemory + 15G + + + sparkExecutorMemory + 6G + + + sparkExecutorCores + 1 + + + spark2MaxExecutors + 50 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/oozie_app/workflow.xml b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/oozie_app/workflow.xml new file mode 100644 index 0000000000..b4fca5e581 --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/oozie_app/workflow.xml @@ -0,0 +1,71 @@ + + + + sourcePath + the source path + + + allowedsemrels + the semantic relationships allowed for propagation + + + + + + + + + + + + + + + writeUpdate + writes the information found for the update. No double check done if the information is already present + + + saveGraph + writes new version of the graph after the propagation step + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + OrcidToResultFromSemRelPropagation + eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkOrcidToResultFromSemRelJob + dhp-propagation-${projectVersion}.jar + + --num-executors=${sparkExecutorNumber} + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.dynamicAllocation.enabled=true + --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} + + -mt yarn-cluster + --sourcePath${sourcePath} + --allowedsemrels${allowedsemrels} + --writeUpdate${writeUpdate} + --saveGraph${saveGraph} + + + + + + + \ No newline at end of file