From 5e72a51f11bf98d4dfc20fefe95b592c00850879 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Thu, 16 Apr 2020 16:11:20 +0200 Subject: [PATCH] - --- .../SparkOrcidToResultFromSemRelJob.java | 160 +++++-- .../SparkOrcidToResultFromSemRelJob2.java | 447 +++++++++++------- 2 files changed, 375 insertions(+), 232 deletions(-) diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java index adc92aa75..cddca14ed 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java @@ -4,31 +4,26 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.TypedRow; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.Author; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.io.Text; +import org.apache.hadoop.fs.FileSystem; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.SparkSession; import scala.Tuple2; -import java.io.File; import java.util.Arrays; import java.util.List; -import java.util.Set; import java.util.stream.Collectors; -import java.util.stream.Stream; import static eu.dnetlib.dhp.PropagationConstant.*; public class SparkOrcidToResultFromSemRelJob { public static void main(String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkOrcidToResultFromSemRelJob.class.getResourceAsStream("/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_orcid" + - "toresult_parameters.json"))); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkOrcidToResultFromSemRelJob.class.getResourceAsStream("/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_orcidtoresult_parameters.json"))); parser.parseArgument(args); final SparkSession spark = SparkSession .builder() @@ -42,26 +37,24 @@ public class SparkOrcidToResultFromSemRelJob { final String outputPath = "/tmp/provision/propagation/orcidtoresult"; final List allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";")); + boolean writeUpdate = TRUE.equals(parser.get("writeUpdate")); + boolean saveGraph = TRUE.equals(parser.get("saveGraph")); - File directory = new File(outputPath); + createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); - if (!directory.exists()) { - directory.mkdirs(); - } - - JavaRDD relations = sc.sequenceFile(inputPath + "/relation", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Relation.class)).cache(); + JavaRDD relations = sc.textFile(inputPath + "/relation") + .map(item -> new ObjectMapper().readValue(item, Relation.class)).cache(); JavaPairRDD result_result = getResultResultSemRel(allowedsemrel, relations); - JavaRDD publications = sc.sequenceFile(inputPath + "/publication", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Publication.class)); - JavaRDD datasets = sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Dataset.class)); - JavaRDD software = sc.sequenceFile(inputPath + "/software", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Software.class)); - JavaRDD other = sc.sequenceFile(inputPath + "/otherresearchproduct", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), OtherResearchProduct.class)); + JavaRDD publications = sc.textFile(inputPath + "/publication") + .map(item -> new ObjectMapper().readValue(item, Publication.class)); + JavaRDD datasets = sc.textFile(inputPath + "/dataset") + .map(item -> new ObjectMapper().readValue(item, Dataset.class)); + JavaRDD software = sc.textFile(inputPath + "/software") + .map(item -> new ObjectMapper().readValue(item, Software.class)); + JavaRDD other = sc.textFile(inputPath + "/otherresearchproduct") + .map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class)); //get the results having at least one author pid we are interested in JavaPairRDD resultswithorcid = publications.map(p -> getTypedRow(p)) @@ -87,15 +80,25 @@ public class SparkOrcidToResultFromSemRelJob { JavaPairRDD sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p)); JavaPairRDD orp = other.mapToPair(p -> new Tuple2<>(p.getId(),p)); - updateResult(pubs, to_add_orcid_to_result, outputPath, "publication"); - updateResult(dss, to_add_orcid_to_result, outputPath, "dataset"); - updateResult(sfw, to_add_orcid_to_result, outputPath, "software"); - updateResult(orp, to_add_orcid_to_result, outputPath, "otherresearchproduct"); + if(writeUpdate){ + writeResult(pubs, to_add_orcid_to_result, outputPath, "publication"); + writeResult(dss, to_add_orcid_to_result, outputPath, "dataset"); + writeResult(sfw, to_add_orcid_to_result, outputPath, "software"); + writeResult(orp, to_add_orcid_to_result, outputPath, "otherresearchproduct"); + } + + if (saveGraph){ + updateResult(pubs, to_add_orcid_to_result, outputPath, "publication"); + updateResult(dss, to_add_orcid_to_result, outputPath, "dataset"); + updateResult(sfw, to_add_orcid_to_result, outputPath, "software"); + updateResult(orp, to_add_orcid_to_result, outputPath, "otherresearchproduct"); + } + } - private static Author enrichAutor(Author autoritative_author, Author author) { + public static eu.dnetlib.dhp.schema.oaf.Author enrichAutor(eu.dnetlib.dhp.schema.oaf.Author autoritative_author, eu.dnetlib.dhp.schema.oaf.Author author) { boolean toaddpid = false; if (StringUtils.isNoneEmpty(autoritative_author.getSurname())) { @@ -137,32 +140,91 @@ public class SparkOrcidToResultFromSemRelJob { } + private static List enrichAutors(List autoritative_authors, + List to_enrich_authors, boolean filter){ +// List autoritative_authors = p._2()._2().get().getAuthors(); +// List to_enrich_authors = r.getAuthor(); - private static void updateResult(JavaPairRDD results, JavaPairRDD toupdateresult, String outputPath, String type) { + return to_enrich_authors + .stream() + .map(a -> { + if (filter) { + if (containsAllowedPid(a)) { + return a; + } + } + + List lst = autoritative_authors.stream() + .map(aa -> enrichAutor(aa, a)).filter(au -> !(au == null)).collect(Collectors.toList()); + if (lst.size() == 0) { + return a; + } + return lst.get(0);//Each author can be enriched at most once. It cannot be the same as many different people + + }).collect(Collectors.toList()); + } + + private static void writeResult(JavaPairRDD results, JavaPairRDD toupdateresult, + String outputPath, String type) { + + results.join(toupdateresult) + .map(p -> { + Result r = p._2()._1(); + + List autoritative_authors = p._2()._2().getAuthors(); + List to_enrich_authors = r.getAuthor(); + + r.setAuthor(enrichAutors(autoritative_authors, to_enrich_authors, false)); +// .stream() +// .map(a -> { +// if(filter) { +// if (containsAllowedPid(a)) { +// return a; +// } +// } +// +// List lst = autoritative_authors.stream() +// .map(aa -> enrichAutor(aa, a)).filter(au -> !(au == null)).collect(Collectors.toList()); +// if(lst.size() == 0){ +// return a; +// } +// return lst.get(0);//Each author can be enriched at most once. It cannot be the same as many different people +// +// }).collect(Collectors.toList())); + + return r; + }) + .map(p -> new ObjectMapper().writeValueAsString(p)) + .saveAsTextFile(outputPath + "/" + type + "_update"); + } + + + private static void updateResult(JavaPairRDD results, JavaPairRDD toupdateresult, + String outputPath, String type) { results.leftOuterJoin(toupdateresult) .map(p -> { Result r = p._2()._1(); if (p._2()._2().isPresent()){ - List autoritative_authors = p._2()._2().get().getAuthors(); - List to_enrich_authors = r.getAuthor(); - //.stream().filter(a -> !containsAllowedPid(a)) - //.collect(Collectors.toList()); + List autoritative_authors = p._2()._2().get().getAuthors(); + List to_enrich_authors = r.getAuthor(); - r.setAuthor(to_enrich_authors - .stream() - .map(a -> { - if (containsAllowedPid(a)) { - return a; - } - - List lst = autoritative_authors.stream() - .map(aa -> enrichAutor(aa, a)).filter(au -> !(au == null)).collect(Collectors.toList()); - if(lst.size() == 0){ - return a; - } - return lst.get(0);//Each author can be enriched at most once. It cannot be the same as many different people - - }).collect(Collectors.toList())); + r.setAuthor(enrichAutors(autoritative_authors, to_enrich_authors, true)); +// .stream() +// .map(a -> { +// if(filter) { +// if (containsAllowedPid(a)) { +// return a; +// } +// } +// +// List lst = autoritative_authors.stream() +// .map(aa -> enrichAutor(aa, a)).filter(au -> !(au == null)).collect(Collectors.toList()); +// if(lst.size() == 0){ +// return a; +// } +// return lst.get(0);//Each author can be enriched at most once. It cannot be the same as many different people +// +// }).collect(Collectors.toList())); } return r; }) @@ -195,7 +257,7 @@ public class SparkOrcidToResultFromSemRelJob { } - private static boolean containsAllowedPid(Author a){ + private static boolean containsAllowedPid(eu.dnetlib.dhp.schema.oaf.Author a){ return (a.getPid().stream().map(pid -> { diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob2.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob2.java index 73b8895e1..1b58badfa 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob2.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob2.java @@ -1,40 +1,35 @@ package eu.dnetlib.dhp.orcidtoresultfromsemrel; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.TypedRow; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.*; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.io.Text; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import scala.Tuple2; -import java.io.File; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.Stream; +import java.util.*; import static eu.dnetlib.dhp.PropagationConstant.*; -public class SparkOrcidToResultFromSemRelJob { +public class SparkOrcidToResultFromSemRelJob2 { public static void main(String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkOrcidToResultFromSemRelJob.class.getResourceAsStream("/eu/dnetlib/dhp/orcidtoresultfromremrel/input_orcidtoresult_parameters.json"))); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkOrcidToResultFromSemRelJob2.class.getResourceAsStream("/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_orcidtoresult_parameters.json"))); parser.parseArgument(args); + SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); final SparkSession spark = SparkSession .builder() - .appName(SparkOrcidToResultFromSemRelJob.class.getSimpleName()) + .appName(SparkOrcidToResultFromSemRelJob2.class.getSimpleName()) .master(parser.get("master")) + .config(conf) .enableHiveSupport() .getOrCreate(); @@ -48,63 +43,176 @@ public class SparkOrcidToResultFromSemRelJob { createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); - JavaRDD relations = sc.textFile(inputPath + "/relation") - .map(item -> new ObjectMapper().readValue(item, Relation.class)).cache(); + org.apache.spark.sql.Dataset relation = spark.createDataset(sc.textFile(inputPath + "/relation") + .map(item -> new ObjectMapper().readValue(item, Relation.class)).rdd(), Encoders.bean(Relation.class)); - JavaPairRDD result_result = getResultResultSemRel(allowedsemrel, relations); + org.apache.spark.sql.Dataset dataset = spark.createDataset(sc.textFile(inputPath + "/dataset") + .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.Dataset.class)).rdd(), + Encoders.bean(eu.dnetlib.dhp.schema.oaf.Dataset.class)); - JavaRDD publications = sc.textFile(inputPath + "/publication") - .map(item -> new ObjectMapper().readValue(item, Publication.class)); - JavaRDD datasets = sc.textFile(inputPath + "/dataset") - .map(item -> new ObjectMapper().readValue(item, Dataset.class)); - JavaRDD software = sc.textFile(inputPath + "/software") - .map(item -> new ObjectMapper().readValue(item, Software.class)); - JavaRDD other = sc.textFile(inputPath + "/otherresearchproduct") - .map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class)); + org.apache.spark.sql.Dataset other = spark.createDataset(sc.textFile(inputPath + "/otherresearchproduct") + .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.OtherResearchProduct.class)).rdd(), + Encoders.bean(eu.dnetlib.dhp.schema.oaf.OtherResearchProduct.class)); + org.apache.spark.sql.Dataset software = spark.createDataset(sc.textFile(inputPath + "/software") + .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.Software.class)).rdd(), + Encoders.bean(eu.dnetlib.dhp.schema.oaf.Software.class)); + + org.apache.spark.sql.Dataset publication = spark.createDataset(sc.textFile(inputPath + "/publication") + .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.Publication.class)).rdd(), + Encoders.bean(eu.dnetlib.dhp.schema.oaf.Publication.class)); + + + relation.createOrReplaceTempView("relation"); + String query = "Select source, target " + + "from relation " + + "where datainfo.deletedbyinference = false " + getConstraintList(" relclass = '" , allowedsemrel); + + org.apache.spark.sql.Dataset result_result = spark.sql(query); + + publication.createOrReplaceTempView("publication"); + org.apache.spark.sql.Dataset pubs_with_orcid = getResultWithOrcid("publication", spark) + .as(Encoders.bean(ResultOrcidList.class)); + + dataset.createOrReplaceTempView("dataset"); + org.apache.spark.sql.Dataset dats_with_orcid = getResultWithOrcid("dataset", spark) + .as(Encoders.bean(ResultOrcidList.class)); + + other.createOrReplaceTempView("orp"); + org.apache.spark.sql.Dataset orp_with_orcid = getResultWithOrcid("orp", spark) + .as(Encoders.bean(ResultOrcidList.class)); + + dataset.createOrReplaceTempView("software"); + org.apache.spark.sql.Dataset software_with_orcid = getResultWithOrcid("software", spark) + .as(Encoders.bean(ResultOrcidList.class)); //get the results having at least one author pid we are interested in - JavaPairRDD resultswithorcid = publications.map(p -> getTypedRow(p)) - .filter(p -> !(p == null)) - .mapToPair(toPair()) - .union(datasets.map(p -> getTypedRow(p)) - .filter(p -> !(p == null)) - .mapToPair(toPair())) - .union(software.map(p -> getTypedRow(p)) - .filter(p -> !(p == null)) - .mapToPair(toPair())) - .union(other.map(p -> getTypedRow(p)) - .filter(p -> !(p == null)) - .mapToPair(toPair())); + //target of the relation from at least one source with orcid. + //the set of authors contains all those that have orcid and are related to target + //from any source with allowed semantic relationship + JavaPairRDD> target_authorlist_from_pubs = getTargetAutoritativeAuthorList(pubs_with_orcid); - JavaPairRDD to_add_orcid_to_result = resultswithorcid.join(result_result) - .map(p -> p._2()._1().setSourceId(p._2()._2().getTargetId())) //associate the pid of the result (target) which should get the orcid to the typed row containing the authors with the orcid from the result(source) - .mapToPair(toPair()); + JavaPairRDD> target_authorlist_from_dats = getTargetAutoritativeAuthorList(dats_with_orcid); - JavaPairRDD pubs = publications.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD dss = datasets.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD orp = other.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD> target_authorlist_from_orp = getTargetAutoritativeAuthorList(orp_with_orcid); + + JavaPairRDD> target_authorlist_from_sw = getTargetAutoritativeAuthorList(software_with_orcid); if(writeUpdate){ - writeResult(pubs, to_add_orcid_to_result, outputPath, "publication"); - writeResult(dss, to_add_orcid_to_result, outputPath, "dataset"); - writeResult(sfw, to_add_orcid_to_result, outputPath, "software"); - writeResult(orp, to_add_orcid_to_result, outputPath, "otherresearchproduct"); + target_authorlist_from_dats.map(r -> new ObjectMapper().writeValueAsString(r)) + .saveAsTextFile(outputPath + "/" + "update_dats"); + target_authorlist_from_pubs.map(r -> new ObjectMapper().writeValueAsString(r)) + .saveAsTextFile(outputPath + "/" + "update_pubs"); + target_authorlist_from_orp.map(r -> new ObjectMapper().writeValueAsString(r)) + .saveAsTextFile(outputPath + "/" + "update_orp"); + target_authorlist_from_sw.map(r -> new ObjectMapper().writeValueAsString(r)) + .saveAsTextFile(outputPath + "/" + "update_sw"); + } + + if(saveGraph){ + sc.textFile(inputPath + "/publication") + .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.Publication.class)) + .mapToPair(p -> new Tuple2<>(p.getId(),p)) + .leftOuterJoin(target_authorlist_from_pubs) + .map(c -> { + Result r = c._2()._1(); + if(!c._2()._2().isPresent()){ + return r; + } + List toenrich_author = r.getAuthor(); + List autoritativeAuthors = c._2()._2().get(); + for(eu.dnetlib.dhp.schema.oaf.Author author: toenrich_author){ + if (!containsAllowedPid(author)){ + enrichAuthor(author, autoritativeAuthors); + } + } + return r; + }); + } - if (saveGraph){ - updateResult(pubs, to_add_orcid_to_result, outputPath, "publication"); - updateResult(dss, to_add_orcid_to_result, outputPath, "dataset"); - updateResult(sfw, to_add_orcid_to_result, outputPath, "software"); - updateResult(orp, to_add_orcid_to_result, outputPath, "otherresearchproduct"); + } + + private static void enrichAuthor(eu.dnetlib.dhp.schema.oaf.Author a, List au){ + for (AutoritativeAuthor aa: au){ + if(enrichAuthor(aa, a)){ + return; + } } - } +// private static JavaPairRDD> getTargetAutoritativeAuthorList(org.apache.spark.sql.Dataset result_result, org.apache.spark.sql.Dataset pubs_with_orcid) { +// return pubs_with_orcid +// .toJavaRDD() +// .mapToPair(p -> new Tuple2<>(p.getId(), p.getAuthorList())) +// .join(result_result.toJavaRDD().mapToPair(rel -> new Tuple2<>(rel.getString(0), rel.getString(1)))) +// .mapToPair(c -> new Tuple2<>(c._2._2(), c._2()._1())) +// .reduceByKey((a, b) -> { +// if(a == null){ +// return b; +// } +// if(b==null){ +// return a; +// } +// +// Set authSet = new HashSet<>(); +// a.stream().forEach(au -> authSet.add(au.getOrcid())); +// +// b.stream().forEach(au -> { +// if (!authSet.contains(au.getOrcid())) { +// a.add(au); +// } +// } +// ); +// return a; +// }); +// } +private static JavaPairRDD> getTargetAutoritativeAuthorList( org.apache.spark.sql.Dataset pubs_with_orcid) { + return pubs_with_orcid + .toJavaRDD() + .mapToPair(p -> new Tuple2<>(p.getResultId(), p.getAuthorList())) + .reduceByKey((a, b) -> { + if(a == null){ + return b; + } + if(b==null){ + return a; + } + Set authSet = new HashSet<>(); + a.stream().forEach(au -> authSet.add(au.getOrcid())); - private static Author enrichAutor(Author autoritative_author, Author author) { + b.stream().forEach(au -> { + if (!authSet.contains(au.getOrcid())) { + a.add(au); + } + } + ); + return a; + }); +} + + private static org.apache.spark.sql.Dataset getResultWithOrcid(String table, SparkSession spark){ + String query = " select target, author " + + " from (select id, collect_set(named_struct('name', name, 'surname', surname, 'fullname', fullname, 'orcid', orcid)) author " + + " from ( " + + " select id, MyT.fullname, MyT.name, MyT.surname, MyP.value orcid " + + " from " + table + + " lateral view explode (author) a as MyT " + + " lateral view explode (MyT.pid) p as MyP " + + " where MyP.qualifier.classid = 'ORCID') tmp " + + " group by id) r_t " + + " join (" + + " select source, target " + + " from relation " + + " where datainfo.deletedbyinference = false and (relclass = 'isSupplementedBy' or relclass = 'isSupplementTo') rel_rel " + + " on source = id"; + + return spark.sql(query); + } + + + private static boolean enrichAuthor(AutoritativeAuthor autoritative_author, eu.dnetlib.dhp.schema.oaf.Author author) { boolean toaddpid = false; if (StringUtils.isNoneEmpty(autoritative_author.getSurname())) { @@ -128,149 +236,122 @@ public class SparkOrcidToResultFromSemRelJob { } if (toaddpid){ StructuredProperty pid = new StructuredProperty(); - for(StructuredProperty sp : autoritative_author.getPid()){ - if (PROPAGATION_AUTHOR_PID.equals(sp.getQualifier().getClassid())){ - pid.setValue(sp.getValue()); - pid.setQualifier(getQualifier(sp.getQualifier().getClassid(),sp.getQualifier().getClassname() )); - pid.setDataInfo(getDataInfo(PROPAGATION_DATA_INFO_TYPE, PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_ID, PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_NAME)); - if(author.getPid() == null){ - author.setPid(Arrays.asList(pid)); - }else{ - author.getPid().add(pid); - } - } + String aa_pid = autoritative_author.getOrcid(); + pid.setValue(aa_pid); + pid.setQualifier(getQualifier(PROPAGATION_AUTHOR_PID, PROPAGATION_AUTHOR_PID )); + pid.setDataInfo(getDataInfo(PROPAGATION_DATA_INFO_TYPE, PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_ID, PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_NAME)); + if(author.getPid() == null){ + author.setPid(Arrays.asList(pid)); + }else{ + author.getPid().add(pid); } - return author; + } - return null; + return toaddpid; + } - private static List enrichAutors(List autoritative_authors, List to_enrich_authors, boolean filter){ -// List autoritative_authors = p._2()._2().get().getAuthors(); -// List to_enrich_authors = r.getAuthor(); - - return to_enrich_authors - .stream() - .map(a -> { - if (filter) { - if (containsAllowedPid(a)) { - return a; - } - } - - List lst = autoritative_authors.stream() - .map(aa -> enrichAutor(aa, a)).filter(au -> !(au == null)).collect(Collectors.toList()); - if (lst.size() == 0) { - return a; - } - return lst.get(0);//Each author can be enriched at most once. It cannot be the same as many different people - - }).collect(Collectors.toList()); - } - - private static void writeResult(JavaPairRDD results, JavaPairRDD toupdateresult, - String outputPath, String type) { - - results.join(toupdateresult) - .map(p -> { - Result r = p._2()._1(); - - List autoritative_authors = p._2()._2().getAuthors(); - List to_enrich_authors = r.getAuthor(); - - r.setAuthor(enrichAutors(autoritative_authors, to_enrich_authors, false)); -// .stream() -// .map(a -> { -// if(filter) { -// if (containsAllowedPid(a)) { -// return a; -// } -// } +// private static List enrichAuthors(List autoritative_authors, List to_enrich_authors, boolean filter){ +//// List autoritative_authors = p._2()._2().get().getAuthors(); +//// List to_enrich_authors = r.getAuthor(); // -// List lst = autoritative_authors.stream() -// .map(aa -> enrichAutor(aa, a)).filter(au -> !(au == null)).collect(Collectors.toList()); -// if(lst.size() == 0){ -// return a; -// } -// return lst.get(0);//Each author can be enriched at most once. It cannot be the same as many different people +// return to_enrich_authors +// .stream() +// .map(a -> { +// if (filter) { +// if (containsAllowedPid(a)) { +// return a; +// } +// } // -// }).collect(Collectors.toList())); - - return r; - }) - .map(p -> new ObjectMapper().writeValueAsString(p)) - .saveAsTextFile(outputPath + "/" + type + "_update"); - } - - - private static void updateResult(JavaPairRDD results, JavaPairRDD toupdateresult, - String outputPath, String type) { - results.leftOuterJoin(toupdateresult) - .map(p -> { - Result r = p._2()._1(); - if (p._2()._2().isPresent()){ - List autoritative_authors = p._2()._2().get().getAuthors(); - List to_enrich_authors = r.getAuthor(); - - r.setAuthor(enrichAutors(autoritative_authors, to_enrich_authors, true)); -// .stream() -// .map(a -> { -// if(filter) { -// if (containsAllowedPid(a)) { -// return a; -// } -// } +// List lst = autoritative_authors.stream() +// .map(aa -> enrichAuthor(aa, a)).filter(au -> !(au == null)).collect(Collectors.toList()); +// if (lst.size() == 0) { +// return a; +// } +// return lst.get(0);//Each author can be enriched at most once. It cannot be the same as many different people // -// List lst = autoritative_authors.stream() -// .map(aa -> enrichAutor(aa, a)).filter(au -> !(au == null)).collect(Collectors.toList()); -// if(lst.size() == 0){ -// return a; -// } -// return lst.get(0);//Each author can be enriched at most once. It cannot be the same as many different people +// }).collect(Collectors.toList()); +// } // -// }).collect(Collectors.toList())); - } - return r; - }) - .map(p -> new ObjectMapper().writeValueAsString(p)) - .saveAsTextFile(outputPath+"/"+type); - } - - private static TypedRow getTypedRow(Result p) { - TypedRow tp = new TypedRow(); - tp.setSourceId(p.getId()); - List authorList = p.getAuthor() - .stream() - .map(a -> { - if (a.getPid().stream().map(pid -> { - if (PROPAGATION_AUTHOR_PID.equals(pid.getQualifier().getClassid())) { - return a; - } - return null; - }).filter(aut -> !(aut == null)).collect(Collectors.toList()).size() > 0){ - return a; - } - return null; - }).filter(a -> !(a == null)).collect(Collectors.toList()); - tp.setAuthors(authorList); - if(authorList.size() > 0){ - return tp; - } - return null; +// private static void writeResult(JavaPairRDD results, JavaPairRDD toupdateresult, +// String outputPath, String type) { +// +// results.join(toupdateresult) +// .map(p -> { +// Result r = p._2()._1(); +// +// List autoritative_authors = p._2()._2().getAuthors(); +// List to_enrich_authors = r.getAuthor(); +// +// r.setAuthor(enrichAutors(autoritative_authors, to_enrich_authors, false)); +//// .stream() +//// .map(a -> { +//// if(filter) { +//// if (containsAllowedPid(a)) { +//// return a; +//// } +//// } +//// +//// List lst = autoritative_authors.stream() +//// .map(aa -> enrichAuthor(aa, a)).filter(au -> !(au == null)).collect(Collectors.toList()); +//// if(lst.size() == 0){ +//// return a; +//// } +//// return lst.get(0);//Each author can be enriched at most once. It cannot be the same as many different people +//// +//// }).collect(Collectors.toList())); +// +// return r; +// }) +// .map(p -> new ObjectMapper().writeValueAsString(p)) +// .saveAsTextFile(outputPath + "/" + type + "_update"); +// } - } +// private static void updateResult(JavaPairRDD results, JavaPairRDD toupdateresult, +// String outputPath, String type) { +// results.leftOuterJoin(toupdateresult) +// .map(p -> { +// Result r = p._2()._1(); +// if (p._2()._2().isPresent()){ +// List autoritative_authors = p._2()._2().get().getAuthors(); +// List to_enrich_authors = r.getAuthor(); +// +// r.setAuthor(enrichAutors(autoritative_authors, to_enrich_authors, true)); +//// .stream() +//// .map(a -> { +//// if(filter) { +//// if (containsAllowedPid(a)) { +//// return a; +//// } +//// } +//// +//// List lst = autoritative_authors.stream() +//// .map(aa -> enrichAuthor(aa, a)).filter(au -> !(au == null)).collect(Collectors.toList()); +//// if(lst.size() == 0){ +//// return a; +//// } +//// return lst.get(0);//Each author can be enriched at most once. It cannot be the same as many different people +//// +//// }).collect(Collectors.toList())); +// } +// return r; +// }) +// .map(p -> new ObjectMapper().writeValueAsString(p)) +// .saveAsTextFile(outputPath+"/"+type); +// } - private static boolean containsAllowedPid(Author a){ + - - return (a.getPid().stream().map(pid -> { + private static boolean containsAllowedPid(eu.dnetlib.dhp.schema.oaf.Author a) { + for (StructuredProperty pid : a.getPid()) { if (PROPAGATION_AUTHOR_PID.equals(pid.getQualifier().getClassid())) { return true; } - return false; - }).filter(aut -> (aut == true)).collect(Collectors.toList()).size()) > 0; + } + return false; } }