From d6cd700a3236b4a33bbf71c4bf83caca18844c48 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Thu, 16 Apr 2020 15:55:25 +0200 Subject: [PATCH] new implementation that exploits prepared information (the list of possible updates: resultId - possible list of orcid to be added --- .../SparkOrcidToResultFromSemRelJob3.java | 410 +++++------------- 1 file changed, 97 insertions(+), 313 deletions(-) diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob3.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob3.java index ddeee391c..f9dfa2970 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob3.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob3.java @@ -9,131 +9,115 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.*; +import org.apache.spark.sql.Dataset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Tuple2; import java.util.*; import static eu.dnetlib.dhp.PropagationConstant.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; + +public class SparkOrcidToResultFromSemRelJob3 { + private static final Logger log = LoggerFactory.getLogger(SparkOrcidToResultFromSemRelJob3.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); -public class SparkOrcidToResultFromSemRelJob2 { public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils.toString(SparkOrcidToResultFromSemRelJob3.class + .getResourceAsStream("/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_orcidtoresult_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + jsonConfiguration); - final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkOrcidToResultFromSemRelJob2.class.getResourceAsStream("/eu/dnetlib/dhp/orcidtoresultfromremrel/input_orcidtoresult_parameters.json"))); parser.parseArgument(args); + + Boolean isSparkSessionManaged = isSparkSessionManaged(parser); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + final String possibleUpdates = parser.get("possibleUpdatesPath"); + log.info("possibleUpdatesPath: {}", possibleUpdates); + + final String resultClassName = parser.get("resultTableName"); + log.info("resultTableName: {}", resultClassName); + + final String resultType = resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase(); + log.info("resultType: {}", resultType); + + final Boolean saveGraph = Optional + .ofNullable(parser.get("saveGraph")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("saveGraph: {}", saveGraph); + + Class resultClazz = (Class) Class.forName(resultClassName); + SparkConf conf = new SparkConf(); conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); - final SparkSession spark = SparkSession - .builder() - .appName(SparkOrcidToResultFromSemRelJob2.class.getSimpleName()) - .master(parser.get("master")) - .config(conf) - .enableHiveSupport() - .getOrCreate(); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - final String inputPath = parser.get("sourcePath"); - final String outputPath = "/tmp/provision/propagation/orcidtoresult"; - - final List allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";")); - boolean writeUpdate = TRUE.equals(parser.get("writeUpdate")); - boolean saveGraph = TRUE.equals(parser.get("saveGraph")); - - createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); - - org.apache.spark.sql.Dataset relation = spark.createDataset(sc.textFile(inputPath + "/relation") - .map(item -> new ObjectMapper().readValue(item, Relation.class)).rdd(), Encoders.bean(Relation.class)); - - org.apache.spark.sql.Dataset dataset = spark.createDataset(sc.textFile(inputPath + "/dataset") - .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.Dataset.class)).rdd(), - Encoders.bean(eu.dnetlib.dhp.schema.oaf.Dataset.class)); - - org.apache.spark.sql.Dataset other = spark.createDataset(sc.textFile(inputPath + "/otherresearchproduct") - .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.OtherResearchProduct.class)).rdd(), - Encoders.bean(eu.dnetlib.dhp.schema.oaf.OtherResearchProduct.class)); - - org.apache.spark.sql.Dataset software = spark.createDataset(sc.textFile(inputPath + "/software") - .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.Software.class)).rdd(), - Encoders.bean(eu.dnetlib.dhp.schema.oaf.Software.class)); - - org.apache.spark.sql.Dataset publication = spark.createDataset(sc.textFile(inputPath + "/publication") - .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.Publication.class)).rdd(), - Encoders.bean(eu.dnetlib.dhp.schema.oaf.Publication.class)); - - - relation.createOrReplaceTempView("relation"); - String query = "Select source, target " + - "from relation " + - "where datainfo.deletedbyinference = false " + getConstraintList(" relclass = '" , allowedsemrel); - - org.apache.spark.sql.Dataset result_result = spark.sql(query); - - publication.createOrReplaceTempView("publication"); - org.apache.spark.sql.Dataset pubs_with_orcid = getResultWithOrcid("publication", spark) - .as(Encoders.bean(ResultWithOrcid.class)); - - dataset.createOrReplaceTempView("dataset"); - org.apache.spark.sql.Dataset dats_with_orcid = getResultWithOrcid("dataset", spark) - .as(Encoders.bean(ResultWithOrcid.class)); - - other.createOrReplaceTempView("orp"); - org.apache.spark.sql.Dataset orp_with_orcid = getResultWithOrcid("orp", spark) - .as(Encoders.bean(ResultWithOrcid.class)); - - dataset.createOrReplaceTempView("software"); - org.apache.spark.sql.Dataset software_with_orcid = getResultWithOrcid("software", spark) - .as(Encoders.bean(ResultWithOrcid.class)); - //get the results having at least one author pid we are interested in - - //target of the relation from at least one source with orcid. - //the set of authors contains all those that have orcid and are related to target - //from any source with allowed semantic relationship - JavaPairRDD> target_authorlist_from_pubs = getTargetAutoritativeAuthorList(pubs_with_orcid); - - JavaPairRDD> target_authorlist_from_dats = getTargetAutoritativeAuthorList(dats_with_orcid); - - JavaPairRDD> target_authorlist_from_orp = getTargetAutoritativeAuthorList(orp_with_orcid); - - JavaPairRDD> target_authorlist_from_sw = getTargetAutoritativeAuthorList(software_with_orcid); - - if(writeUpdate){ - target_authorlist_from_dats.map(r -> new ObjectMapper().writeValueAsString(r)) - .saveAsTextFile(outputPath + "/" + "update_dats"); - target_authorlist_from_pubs.map(r -> new ObjectMapper().writeValueAsString(r)) - .saveAsTextFile(outputPath + "/" + "update_pubs"); - target_authorlist_from_orp.map(r -> new ObjectMapper().writeValueAsString(r)) - .saveAsTextFile(outputPath + "/" + "update_orp"); - target_authorlist_from_sw.map(r -> new ObjectMapper().writeValueAsString(r)) - .saveAsTextFile(outputPath + "/" + "update_sw"); - } - - if(saveGraph){ - sc.textFile(inputPath + "/publication") - .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.Publication.class)) - .mapToPair(p -> new Tuple2<>(p.getId(),p)) - .leftOuterJoin(target_authorlist_from_pubs) - .map(c -> { - Result r = c._2()._1(); - if(!c._2()._2().isPresent()){ - return r; - } - List toenrich_author = r.getAuthor(); - List autoritativeAuthors = c._2()._2().get(); - for(eu.dnetlib.dhp.schema.oaf.Author author: toenrich_author){ - if (!containsAllowedPid(author)){ - enrichAuthor(author, autoritativeAuthors); - } - } - return r; - }); - - } + runWithSparkHiveSession(conf, isSparkSessionManaged, + spark -> { + if(isTest(parser)) { + removeOutputDir(spark, outputPath); + } + if(saveGraph) + execPropagation(spark, possibleUpdates, inputPath, outputPath, resultClazz); + }); } - - private static void enrichAuthor(eu.dnetlib.dhp.schema.oaf.Author a, List au){ + + private static void execPropagation(SparkSession spark, String possibleUpdatesPath, String inputPath, + String outputPath, Class resultClazz ) { + + //read possible updates (resultId and list of possible orcid to add + Dataset possible_updates = readAssocResultOrcidList(spark, possibleUpdatesPath); + //read the result we have been considering + Dataset result = readPathEntity(spark, inputPath, resultClazz); + //make join result left_outer with possible updates + + result.joinWith(possible_updates, result.col("id").equalTo(possible_updates.col("resultId")), + "left_outer") + .map(value -> { + R ret = value._1(); + Optional rol = Optional.ofNullable(value._2()); + if(rol.isPresent()) { + List toenrich_author = ret.getAuthor(); + List autoritativeAuthors = rol.get().getAuthorList(); + for(Author author: toenrich_author){ + if (!containsAllowedPid(author)){ + enrichAuthor(author, autoritativeAuthors); + } + } + } + + return ret; + }, Encoders.bean(resultClazz)) + .toJSON() + .write() + .mode(SaveMode.Overwrite) + .option("compression","gzip") + .text(outputPath); + + + } + + private static Dataset readAssocResultOrcidList(SparkSession spark, String relationPath) { + return spark + .read() + .textFile(relationPath) + .map(value -> OBJECT_MAPPER.readValue(value, ResultOrcidList.class), Encoders.bean(ResultOrcidList.class)); + } + + private static void enrichAuthor(Author a, List au){ for (AutoritativeAuthor aa: au){ if(enrichAuthor(aa, a)){ return; @@ -142,77 +126,9 @@ public class SparkOrcidToResultFromSemRelJob2 { } -// private static JavaPairRDD> getTargetAutoritativeAuthorList(org.apache.spark.sql.Dataset result_result, org.apache.spark.sql.Dataset pubs_with_orcid) { -// return pubs_with_orcid -// .toJavaRDD() -// .mapToPair(p -> new Tuple2<>(p.getId(), p.getAuthorList())) -// .join(result_result.toJavaRDD().mapToPair(rel -> new Tuple2<>(rel.getString(0), rel.getString(1)))) -// .mapToPair(c -> new Tuple2<>(c._2._2(), c._2()._1())) -// .reduceByKey((a, b) -> { -// if(a == null){ -// return b; -// } -// if(b==null){ -// return a; -// } -// -// Set authSet = new HashSet<>(); -// a.stream().forEach(au -> authSet.add(au.getOrcid())); -// -// b.stream().forEach(au -> { -// if (!authSet.contains(au.getOrcid())) { -// a.add(au); -// } -// } -// ); -// return a; -// }); -// } -private static JavaPairRDD> getTargetAutoritativeAuthorList( org.apache.spark.sql.Dataset pubs_with_orcid) { - return pubs_with_orcid - .toJavaRDD() - .mapToPair(p -> new Tuple2<>(p.getId(), p.getAuthorList())) - .reduceByKey((a, b) -> { - if(a == null){ - return b; - } - if(b==null){ - return a; - } - Set authSet = new HashSet<>(); - a.stream().forEach(au -> authSet.add(au.getOrcid())); - - b.stream().forEach(au -> { - if (!authSet.contains(au.getOrcid())) { - a.add(au); - } - } - ); - return a; - }); -} - - private static org.apache.spark.sql.Dataset getResultWithOrcid(String table, SparkSession spark){ - String query = " select target, author " + - " from (select id, collect_set(named_struct('name', name, 'surname', surname, 'fullname', fullname, 'orcid', orcid)) author " + - " from ( " + - " select id, MyT.fullname, MyT.name, MyT.surname, MyP.value orcid " + - " from " + table + - " lateral view explode (author) a as MyT " + - " lateral view explode (MyT.pid) p as MyP " + - " where MyP.qualifier.classid = 'ORCID') tmp " + - " group by id) r_t " + - " join (" + - " select source, target " + - " from relation " + - " where datainfo.deletedbyinference = false and (relclass = 'isSupplementedBy' or relclass = 'isSupplementTo') rel_rel " + - " on source = id"; - - return spark.sql(query); - } - private static boolean enrichAuthor(AutoritativeAuthor autoritative_author, eu.dnetlib.dhp.schema.oaf.Author author) { + private static boolean enrichAuthor(AutoritativeAuthor autoritative_author, Author author) { boolean toaddpid = false; if (StringUtils.isNoneEmpty(autoritative_author.getSurname())) { @@ -252,100 +168,9 @@ private static JavaPairRDD> getTargetAutoritati } -// private static List enrichAuthors(List autoritative_authors, List to_enrich_authors, boolean filter){ -//// List autoritative_authors = p._2()._2().get().getAuthors(); -//// List to_enrich_authors = r.getAuthor(); -// -// return to_enrich_authors -// .stream() -// .map(a -> { -// if (filter) { -// if (containsAllowedPid(a)) { -// return a; -// } -// } -// -// List lst = autoritative_authors.stream() -// .map(aa -> enrichAuthor(aa, a)).filter(au -> !(au == null)).collect(Collectors.toList()); -// if (lst.size() == 0) { -// return a; -// } -// return lst.get(0);//Each author can be enriched at most once. It cannot be the same as many different people -// -// }).collect(Collectors.toList()); -// } -// -// private static void writeResult(JavaPairRDD results, JavaPairRDD toupdateresult, -// String outputPath, String type) { -// -// results.join(toupdateresult) -// .map(p -> { -// Result r = p._2()._1(); -// -// List autoritative_authors = p._2()._2().getAuthors(); -// List to_enrich_authors = r.getAuthor(); -// -// r.setAuthor(enrichAutors(autoritative_authors, to_enrich_authors, false)); -//// .stream() -//// .map(a -> { -//// if(filter) { -//// if (containsAllowedPid(a)) { -//// return a; -//// } -//// } -//// -//// List lst = autoritative_authors.stream() -//// .map(aa -> enrichAuthor(aa, a)).filter(au -> !(au == null)).collect(Collectors.toList()); -//// if(lst.size() == 0){ -//// return a; -//// } -//// return lst.get(0);//Each author can be enriched at most once. It cannot be the same as many different people -//// -//// }).collect(Collectors.toList())); -// -// return r; -// }) -// .map(p -> new ObjectMapper().writeValueAsString(p)) -// .saveAsTextFile(outputPath + "/" + type + "_update"); -// } -// private static void updateResult(JavaPairRDD results, JavaPairRDD toupdateresult, -// String outputPath, String type) { -// results.leftOuterJoin(toupdateresult) -// .map(p -> { -// Result r = p._2()._1(); -// if (p._2()._2().isPresent()){ -// List autoritative_authors = p._2()._2().get().getAuthors(); -// List to_enrich_authors = r.getAuthor(); -// -// r.setAuthor(enrichAutors(autoritative_authors, to_enrich_authors, true)); -//// .stream() -//// .map(a -> { -//// if(filter) { -//// if (containsAllowedPid(a)) { -//// return a; -//// } -//// } -//// -//// List lst = autoritative_authors.stream() -//// .map(aa -> enrichAuthor(aa, a)).filter(au -> !(au == null)).collect(Collectors.toList()); -//// if(lst.size() == 0){ -//// return a; -//// } -//// return lst.get(0);//Each author can be enriched at most once. It cannot be the same as many different people -//// -//// }).collect(Collectors.toList())); -// } -// return r; -// }) -// .map(p -> new ObjectMapper().writeValueAsString(p)) -// .saveAsTextFile(outputPath+"/"+type); -// } - - - - private static boolean containsAllowedPid(eu.dnetlib.dhp.schema.oaf.Author a) { + private static boolean containsAllowedPid(Author a) { for (StructuredProperty pid : a.getPid()) { if (PROPAGATION_AUTHOR_PID.equals(pid.getQualifier().getClassid())) { return true; @@ -355,44 +180,3 @@ private static JavaPairRDD> getTargetAutoritati } } - - -/*private ResultProtos.Result.Metadata.Builder searchMatch(List author_list){ - ResultProtos.Result.Metadata.Builder metadataBuilder = ResultProtos.Result.Metadata.newBuilder(); - boolean updated = false; - - for (FieldTypeProtos.Author a: author_list){ - FieldTypeProtos.Author.Builder author = searchAuthor(a, autoritative_authors); - if(author != null){ - updated = true; - metadataBuilder.addAuthor(author); - }else{ - metadataBuilder.addAuthor(FieldTypeProtos.Author.newBuilder(a)); - } - } - if(updated) - return metadataBuilder; - return null; - } - private FieldTypeProtos.Author.Builder searchAuthor(FieldTypeProtos.Author a, List author_list){ - if(containsOrcid(a.getPidList())) - return null; - for(FieldTypeProtos.Author autoritative_author : author_list) { - if (equals(autoritative_author, a)) { - if(!containsOrcid(a.getPidList())) - return update(a, autoritative_author); - } - } - return null; - - } - - private boolean containsOrcid(List pidList){ - if(pidList == null) - return false; - return pidList - .stream() - .filter(kv -> kv.getKey().equals(PropagationConstants.AUTHOR_PID)) - .collect(Collectors.toList()).size() > 0; - } - */ \ No newline at end of file