diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java deleted file mode 100644 index cddca14ed..000000000 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java +++ /dev/null @@ -1,312 +0,0 @@ -package eu.dnetlib.dhp.orcidtoresultfromsemrel; - -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.TypedRow; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.oaf.*; -import eu.dnetlib.dhp.schema.oaf.Author; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.fs.FileSystem; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.SparkSession; -import scala.Tuple2; - -import java.util.Arrays; -import java.util.List; -import java.util.stream.Collectors; - -import static eu.dnetlib.dhp.PropagationConstant.*; - -public class SparkOrcidToResultFromSemRelJob { - public static void main(String[] args) throws Exception { - - final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkOrcidToResultFromSemRelJob.class.getResourceAsStream("/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_orcidtoresult_parameters.json"))); - parser.parseArgument(args); - final SparkSession spark = SparkSession - .builder() - .appName(SparkOrcidToResultFromSemRelJob.class.getSimpleName()) - .master(parser.get("master")) - .enableHiveSupport() - .getOrCreate(); - - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - final String inputPath = parser.get("sourcePath"); - final String outputPath = "/tmp/provision/propagation/orcidtoresult"; - - final List allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";")); - boolean writeUpdate = TRUE.equals(parser.get("writeUpdate")); - boolean saveGraph = TRUE.equals(parser.get("saveGraph")); - - createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); - - JavaRDD relations = sc.textFile(inputPath + "/relation") - .map(item -> new ObjectMapper().readValue(item, Relation.class)).cache(); - - JavaPairRDD result_result = getResultResultSemRel(allowedsemrel, relations); - - JavaRDD publications = sc.textFile(inputPath + "/publication") - .map(item -> new ObjectMapper().readValue(item, Publication.class)); - JavaRDD datasets = sc.textFile(inputPath + "/dataset") - .map(item -> new ObjectMapper().readValue(item, Dataset.class)); - JavaRDD software = sc.textFile(inputPath + "/software") - .map(item -> new ObjectMapper().readValue(item, Software.class)); - JavaRDD other = sc.textFile(inputPath + "/otherresearchproduct") - .map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class)); - - //get the results having at least one author pid we are interested in - JavaPairRDD resultswithorcid = publications.map(p -> getTypedRow(p)) - .filter(p -> !(p == null)) - .mapToPair(toPair()) - .union(datasets.map(p -> getTypedRow(p)) - .filter(p -> !(p == null)) - .mapToPair(toPair())) - .union(software.map(p -> getTypedRow(p)) - .filter(p -> !(p == null)) - .mapToPair(toPair())) - .union(other.map(p -> getTypedRow(p)) - .filter(p -> !(p == null)) - .mapToPair(toPair())); - - - JavaPairRDD to_add_orcid_to_result = resultswithorcid.join(result_result) - .map(p -> p._2()._1().setSourceId(p._2()._2().getTargetId())) //associate the pid of the result (target) which should get the orcid to the typed row containing the authors with the orcid from the result(source) - .mapToPair(toPair()); - - JavaPairRDD pubs = publications.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD dss = datasets.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p)); - JavaPairRDD orp = other.mapToPair(p -> new Tuple2<>(p.getId(),p)); - - if(writeUpdate){ - writeResult(pubs, to_add_orcid_to_result, outputPath, "publication"); - writeResult(dss, to_add_orcid_to_result, outputPath, "dataset"); - writeResult(sfw, to_add_orcid_to_result, outputPath, "software"); - writeResult(orp, to_add_orcid_to_result, outputPath, "otherresearchproduct"); - } - - if (saveGraph){ - updateResult(pubs, to_add_orcid_to_result, outputPath, "publication"); - updateResult(dss, to_add_orcid_to_result, outputPath, "dataset"); - updateResult(sfw, to_add_orcid_to_result, outputPath, "software"); - updateResult(orp, to_add_orcid_to_result, outputPath, "otherresearchproduct"); - } - - - } - - - public static eu.dnetlib.dhp.schema.oaf.Author enrichAutor(eu.dnetlib.dhp.schema.oaf.Author autoritative_author, eu.dnetlib.dhp.schema.oaf.Author author) { - boolean toaddpid = false; - - if (StringUtils.isNoneEmpty(autoritative_author.getSurname())) { - if (StringUtils.isNoneEmpty(author.getSurname())) { - if (autoritative_author.getSurname().trim().equalsIgnoreCase(author.getSurname().trim())) { - - //have the same surname. Check the name - if (StringUtils.isNoneEmpty(autoritative_author.getName())) { - if (StringUtils.isNoneEmpty(author.getName())) { - if (autoritative_author.getName().trim().equalsIgnoreCase(author.getName().trim())) { - toaddpid = true; - } - //they could be differently written (i.e. only the initials of the name in one of the two - if (autoritative_author.getName().trim().substring(0, 0).equalsIgnoreCase(author.getName().trim().substring(0, 0))) { - toaddpid = true; - } - } - } - } - } - } - if (toaddpid){ - StructuredProperty pid = new StructuredProperty(); - for(StructuredProperty sp : autoritative_author.getPid()){ - if (PROPAGATION_AUTHOR_PID.equals(sp.getQualifier().getClassid())){ - pid.setValue(sp.getValue()); - pid.setQualifier(getQualifier(sp.getQualifier().getClassid(),sp.getQualifier().getClassname() )); - pid.setDataInfo(getDataInfo(PROPAGATION_DATA_INFO_TYPE, PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_ID, PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_NAME)); - if(author.getPid() == null){ - author.setPid(Arrays.asList(pid)); - }else{ - author.getPid().add(pid); - } - } - } - return author; - } - return null; - } - - - private static List enrichAutors(List autoritative_authors, - List to_enrich_authors, boolean filter){ -// List autoritative_authors = p._2()._2().get().getAuthors(); -// List to_enrich_authors = r.getAuthor(); - - return to_enrich_authors - .stream() - .map(a -> { - if (filter) { - if (containsAllowedPid(a)) { - return a; - } - } - - List lst = autoritative_authors.stream() - .map(aa -> enrichAutor(aa, a)).filter(au -> !(au == null)).collect(Collectors.toList()); - if (lst.size() == 0) { - return a; - } - return lst.get(0);//Each author can be enriched at most once. It cannot be the same as many different people - - }).collect(Collectors.toList()); - } - - private static void writeResult(JavaPairRDD results, JavaPairRDD toupdateresult, - String outputPath, String type) { - - results.join(toupdateresult) - .map(p -> { - Result r = p._2()._1(); - - List autoritative_authors = p._2()._2().getAuthors(); - List to_enrich_authors = r.getAuthor(); - - r.setAuthor(enrichAutors(autoritative_authors, to_enrich_authors, false)); -// .stream() -// .map(a -> { -// if(filter) { -// if (containsAllowedPid(a)) { -// return a; -// } -// } -// -// List lst = autoritative_authors.stream() -// .map(aa -> enrichAutor(aa, a)).filter(au -> !(au == null)).collect(Collectors.toList()); -// if(lst.size() == 0){ -// return a; -// } -// return lst.get(0);//Each author can be enriched at most once. It cannot be the same as many different people -// -// }).collect(Collectors.toList())); - - return r; - }) - .map(p -> new ObjectMapper().writeValueAsString(p)) - .saveAsTextFile(outputPath + "/" + type + "_update"); - } - - - private static void updateResult(JavaPairRDD results, JavaPairRDD toupdateresult, - String outputPath, String type) { - results.leftOuterJoin(toupdateresult) - .map(p -> { - Result r = p._2()._1(); - if (p._2()._2().isPresent()){ - List autoritative_authors = p._2()._2().get().getAuthors(); - List to_enrich_authors = r.getAuthor(); - - r.setAuthor(enrichAutors(autoritative_authors, to_enrich_authors, true)); -// .stream() -// .map(a -> { -// if(filter) { -// if (containsAllowedPid(a)) { -// return a; -// } -// } -// -// List lst = autoritative_authors.stream() -// .map(aa -> enrichAutor(aa, a)).filter(au -> !(au == null)).collect(Collectors.toList()); -// if(lst.size() == 0){ -// return a; -// } -// return lst.get(0);//Each author can be enriched at most once. It cannot be the same as many different people -// -// }).collect(Collectors.toList())); - } - return r; - }) - .map(p -> new ObjectMapper().writeValueAsString(p)) - .saveAsTextFile(outputPath+"/"+type); - } - - private static TypedRow getTypedRow(Result p) { - TypedRow tp = new TypedRow(); - tp.setSourceId(p.getId()); - List authorList = p.getAuthor() - .stream() - .map(a -> { - if (a.getPid().stream().map(pid -> { - if (PROPAGATION_AUTHOR_PID.equals(pid.getQualifier().getClassid())) { - return a; - } - return null; - }).filter(aut -> !(aut == null)).collect(Collectors.toList()).size() > 0){ - return a; - } - return null; - }).filter(a -> !(a == null)).collect(Collectors.toList()); - tp.setAuthors(authorList); - if(authorList.size() > 0){ - return tp; - } - return null; - - - } - - private static boolean containsAllowedPid(eu.dnetlib.dhp.schema.oaf.Author a){ - - - return (a.getPid().stream().map(pid -> { - if (PROPAGATION_AUTHOR_PID.equals(pid.getQualifier().getClassid())) { - return true; - } - return false; - }).filter(aut -> (aut == true)).collect(Collectors.toList()).size()) > 0; - } - -} - - -/*private ResultProtos.Result.Metadata.Builder searchMatch(List author_list){ - ResultProtos.Result.Metadata.Builder metadataBuilder = ResultProtos.Result.Metadata.newBuilder(); - boolean updated = false; - - for (FieldTypeProtos.Author a: author_list){ - FieldTypeProtos.Author.Builder author = searchAuthor(a, autoritative_authors); - if(author != null){ - updated = true; - metadataBuilder.addAuthor(author); - }else{ - metadataBuilder.addAuthor(FieldTypeProtos.Author.newBuilder(a)); - } - } - if(updated) - return metadataBuilder; - return null; - } - private FieldTypeProtos.Author.Builder searchAuthor(FieldTypeProtos.Author a, List author_list){ - if(containsOrcid(a.getPidList())) - return null; - for(FieldTypeProtos.Author autoritative_author : author_list) { - if (equals(autoritative_author, a)) { - if(!containsOrcid(a.getPidList())) - return update(a, autoritative_author); - } - } - return null; - - } - - private boolean containsOrcid(List pidList){ - if(pidList == null) - return false; - return pidList - .stream() - .filter(kv -> kv.getKey().equals(PropagationConstants.AUTHOR_PID)) - .collect(Collectors.toList()).size() > 0; - } - */ \ No newline at end of file diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob2.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob2.java deleted file mode 100644 index 1b58badfa..000000000 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob2.java +++ /dev/null @@ -1,398 +0,0 @@ -package eu.dnetlib.dhp.orcidtoresultfromsemrel; - -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.oaf.*; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.fs.FileSystem; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; -import scala.Tuple2; - -import java.util.*; - -import static eu.dnetlib.dhp.PropagationConstant.*; - -public class SparkOrcidToResultFromSemRelJob2 { - public static void main(String[] args) throws Exception { - - final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkOrcidToResultFromSemRelJob2.class.getResourceAsStream("/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_orcidtoresult_parameters.json"))); - parser.parseArgument(args); - SparkConf conf = new SparkConf(); - conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); - final SparkSession spark = SparkSession - .builder() - .appName(SparkOrcidToResultFromSemRelJob2.class.getSimpleName()) - .master(parser.get("master")) - .config(conf) - .enableHiveSupport() - .getOrCreate(); - - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - final String inputPath = parser.get("sourcePath"); - final String outputPath = "/tmp/provision/propagation/orcidtoresult"; - - final List allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";")); - boolean writeUpdate = TRUE.equals(parser.get("writeUpdate")); - boolean saveGraph = TRUE.equals(parser.get("saveGraph")); - - createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); - - org.apache.spark.sql.Dataset relation = spark.createDataset(sc.textFile(inputPath + "/relation") - .map(item -> new ObjectMapper().readValue(item, Relation.class)).rdd(), Encoders.bean(Relation.class)); - - org.apache.spark.sql.Dataset dataset = spark.createDataset(sc.textFile(inputPath + "/dataset") - .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.Dataset.class)).rdd(), - Encoders.bean(eu.dnetlib.dhp.schema.oaf.Dataset.class)); - - org.apache.spark.sql.Dataset other = spark.createDataset(sc.textFile(inputPath + "/otherresearchproduct") - .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.OtherResearchProduct.class)).rdd(), - Encoders.bean(eu.dnetlib.dhp.schema.oaf.OtherResearchProduct.class)); - - org.apache.spark.sql.Dataset software = spark.createDataset(sc.textFile(inputPath + "/software") - .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.Software.class)).rdd(), - Encoders.bean(eu.dnetlib.dhp.schema.oaf.Software.class)); - - org.apache.spark.sql.Dataset publication = spark.createDataset(sc.textFile(inputPath + "/publication") - .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.Publication.class)).rdd(), - Encoders.bean(eu.dnetlib.dhp.schema.oaf.Publication.class)); - - - relation.createOrReplaceTempView("relation"); - String query = "Select source, target " + - "from relation " + - "where datainfo.deletedbyinference = false " + getConstraintList(" relclass = '" , allowedsemrel); - - org.apache.spark.sql.Dataset result_result = spark.sql(query); - - publication.createOrReplaceTempView("publication"); - org.apache.spark.sql.Dataset pubs_with_orcid = getResultWithOrcid("publication", spark) - .as(Encoders.bean(ResultOrcidList.class)); - - dataset.createOrReplaceTempView("dataset"); - org.apache.spark.sql.Dataset dats_with_orcid = getResultWithOrcid("dataset", spark) - .as(Encoders.bean(ResultOrcidList.class)); - - other.createOrReplaceTempView("orp"); - org.apache.spark.sql.Dataset orp_with_orcid = getResultWithOrcid("orp", spark) - .as(Encoders.bean(ResultOrcidList.class)); - - dataset.createOrReplaceTempView("software"); - org.apache.spark.sql.Dataset software_with_orcid = getResultWithOrcid("software", spark) - .as(Encoders.bean(ResultOrcidList.class)); - //get the results having at least one author pid we are interested in - - //target of the relation from at least one source with orcid. - //the set of authors contains all those that have orcid and are related to target - //from any source with allowed semantic relationship - JavaPairRDD> target_authorlist_from_pubs = getTargetAutoritativeAuthorList(pubs_with_orcid); - - JavaPairRDD> target_authorlist_from_dats = getTargetAutoritativeAuthorList(dats_with_orcid); - - JavaPairRDD> target_authorlist_from_orp = getTargetAutoritativeAuthorList(orp_with_orcid); - - JavaPairRDD> target_authorlist_from_sw = getTargetAutoritativeAuthorList(software_with_orcid); - - if(writeUpdate){ - target_authorlist_from_dats.map(r -> new ObjectMapper().writeValueAsString(r)) - .saveAsTextFile(outputPath + "/" + "update_dats"); - target_authorlist_from_pubs.map(r -> new ObjectMapper().writeValueAsString(r)) - .saveAsTextFile(outputPath + "/" + "update_pubs"); - target_authorlist_from_orp.map(r -> new ObjectMapper().writeValueAsString(r)) - .saveAsTextFile(outputPath + "/" + "update_orp"); - target_authorlist_from_sw.map(r -> new ObjectMapper().writeValueAsString(r)) - .saveAsTextFile(outputPath + "/" + "update_sw"); - } - - if(saveGraph){ - sc.textFile(inputPath + "/publication") - .map(item -> new ObjectMapper().readValue(item, eu.dnetlib.dhp.schema.oaf.Publication.class)) - .mapToPair(p -> new Tuple2<>(p.getId(),p)) - .leftOuterJoin(target_authorlist_from_pubs) - .map(c -> { - Result r = c._2()._1(); - if(!c._2()._2().isPresent()){ - return r; - } - List toenrich_author = r.getAuthor(); - List autoritativeAuthors = c._2()._2().get(); - for(eu.dnetlib.dhp.schema.oaf.Author author: toenrich_author){ - if (!containsAllowedPid(author)){ - enrichAuthor(author, autoritativeAuthors); - } - } - return r; - }); - - } - - } - - private static void enrichAuthor(eu.dnetlib.dhp.schema.oaf.Author a, List au){ - for (AutoritativeAuthor aa: au){ - if(enrichAuthor(aa, a)){ - return; - } - } - - } - -// private static JavaPairRDD> getTargetAutoritativeAuthorList(org.apache.spark.sql.Dataset result_result, org.apache.spark.sql.Dataset pubs_with_orcid) { -// return pubs_with_orcid -// .toJavaRDD() -// .mapToPair(p -> new Tuple2<>(p.getId(), p.getAuthorList())) -// .join(result_result.toJavaRDD().mapToPair(rel -> new Tuple2<>(rel.getString(0), rel.getString(1)))) -// .mapToPair(c -> new Tuple2<>(c._2._2(), c._2()._1())) -// .reduceByKey((a, b) -> { -// if(a == null){ -// return b; -// } -// if(b==null){ -// return a; -// } -// -// Set authSet = new HashSet<>(); -// a.stream().forEach(au -> authSet.add(au.getOrcid())); -// -// b.stream().forEach(au -> { -// if (!authSet.contains(au.getOrcid())) { -// a.add(au); -// } -// } -// ); -// return a; -// }); -// } -private static JavaPairRDD> getTargetAutoritativeAuthorList( org.apache.spark.sql.Dataset pubs_with_orcid) { - return pubs_with_orcid - .toJavaRDD() - .mapToPair(p -> new Tuple2<>(p.getResultId(), p.getAuthorList())) - .reduceByKey((a, b) -> { - if(a == null){ - return b; - } - if(b==null){ - return a; - } - Set authSet = new HashSet<>(); - a.stream().forEach(au -> authSet.add(au.getOrcid())); - - b.stream().forEach(au -> { - if (!authSet.contains(au.getOrcid())) { - a.add(au); - } - } - ); - return a; - }); -} - - private static org.apache.spark.sql.Dataset getResultWithOrcid(String table, SparkSession spark){ - String query = " select target, author " + - " from (select id, collect_set(named_struct('name', name, 'surname', surname, 'fullname', fullname, 'orcid', orcid)) author " + - " from ( " + - " select id, MyT.fullname, MyT.name, MyT.surname, MyP.value orcid " + - " from " + table + - " lateral view explode (author) a as MyT " + - " lateral view explode (MyT.pid) p as MyP " + - " where MyP.qualifier.classid = 'ORCID') tmp " + - " group by id) r_t " + - " join (" + - " select source, target " + - " from relation " + - " where datainfo.deletedbyinference = false and (relclass = 'isSupplementedBy' or relclass = 'isSupplementTo') rel_rel " + - " on source = id"; - - return spark.sql(query); - } - - - private static boolean enrichAuthor(AutoritativeAuthor autoritative_author, eu.dnetlib.dhp.schema.oaf.Author author) { - boolean toaddpid = false; - - if (StringUtils.isNoneEmpty(autoritative_author.getSurname())) { - if (StringUtils.isNoneEmpty(author.getSurname())) { - if (autoritative_author.getSurname().trim().equalsIgnoreCase(author.getSurname().trim())) { - - //have the same surname. Check the name - if (StringUtils.isNoneEmpty(autoritative_author.getName())) { - if (StringUtils.isNoneEmpty(author.getName())) { - if (autoritative_author.getName().trim().equalsIgnoreCase(author.getName().trim())) { - toaddpid = true; - } - //they could be differently written (i.e. only the initials of the name in one of the two - if (autoritative_author.getName().trim().substring(0, 0).equalsIgnoreCase(author.getName().trim().substring(0, 0))) { - toaddpid = true; - } - } - } - } - } - } - if (toaddpid){ - StructuredProperty pid = new StructuredProperty(); - String aa_pid = autoritative_author.getOrcid(); - pid.setValue(aa_pid); - pid.setQualifier(getQualifier(PROPAGATION_AUTHOR_PID, PROPAGATION_AUTHOR_PID )); - pid.setDataInfo(getDataInfo(PROPAGATION_DATA_INFO_TYPE, PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_ID, PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_NAME)); - if(author.getPid() == null){ - author.setPid(Arrays.asList(pid)); - }else{ - author.getPid().add(pid); - } - - } - return toaddpid; - - } - - -// private static List enrichAuthors(List autoritative_authors, List to_enrich_authors, boolean filter){ -//// List autoritative_authors = p._2()._2().get().getAuthors(); -//// List to_enrich_authors = r.getAuthor(); -// -// return to_enrich_authors -// .stream() -// .map(a -> { -// if (filter) { -// if (containsAllowedPid(a)) { -// return a; -// } -// } -// -// List lst = autoritative_authors.stream() -// .map(aa -> enrichAuthor(aa, a)).filter(au -> !(au == null)).collect(Collectors.toList()); -// if (lst.size() == 0) { -// return a; -// } -// return lst.get(0);//Each author can be enriched at most once. It cannot be the same as many different people -// -// }).collect(Collectors.toList()); -// } -// -// private static void writeResult(JavaPairRDD results, JavaPairRDD toupdateresult, -// String outputPath, String type) { -// -// results.join(toupdateresult) -// .map(p -> { -// Result r = p._2()._1(); -// -// List autoritative_authors = p._2()._2().getAuthors(); -// List to_enrich_authors = r.getAuthor(); -// -// r.setAuthor(enrichAutors(autoritative_authors, to_enrich_authors, false)); -//// .stream() -//// .map(a -> { -//// if(filter) { -//// if (containsAllowedPid(a)) { -//// return a; -//// } -//// } -//// -//// List lst = autoritative_authors.stream() -//// .map(aa -> enrichAuthor(aa, a)).filter(au -> !(au == null)).collect(Collectors.toList()); -//// if(lst.size() == 0){ -//// return a; -//// } -//// return lst.get(0);//Each author can be enriched at most once. It cannot be the same as many different people -//// -//// }).collect(Collectors.toList())); -// -// return r; -// }) -// .map(p -> new ObjectMapper().writeValueAsString(p)) -// .saveAsTextFile(outputPath + "/" + type + "_update"); -// } - - -// private static void updateResult(JavaPairRDD results, JavaPairRDD toupdateresult, -// String outputPath, String type) { -// results.leftOuterJoin(toupdateresult) -// .map(p -> { -// Result r = p._2()._1(); -// if (p._2()._2().isPresent()){ -// List autoritative_authors = p._2()._2().get().getAuthors(); -// List to_enrich_authors = r.getAuthor(); -// -// r.setAuthor(enrichAutors(autoritative_authors, to_enrich_authors, true)); -//// .stream() -//// .map(a -> { -//// if(filter) { -//// if (containsAllowedPid(a)) { -//// return a; -//// } -//// } -//// -//// List lst = autoritative_authors.stream() -//// .map(aa -> enrichAuthor(aa, a)).filter(au -> !(au == null)).collect(Collectors.toList()); -//// if(lst.size() == 0){ -//// return a; -//// } -//// return lst.get(0);//Each author can be enriched at most once. It cannot be the same as many different people -//// -//// }).collect(Collectors.toList())); -// } -// return r; -// }) -// .map(p -> new ObjectMapper().writeValueAsString(p)) -// .saveAsTextFile(outputPath+"/"+type); -// } - - - - private static boolean containsAllowedPid(eu.dnetlib.dhp.schema.oaf.Author a) { - for (StructuredProperty pid : a.getPid()) { - if (PROPAGATION_AUTHOR_PID.equals(pid.getQualifier().getClassid())) { - return true; - } - } - return false; - } - -} - - -/*private ResultProtos.Result.Metadata.Builder searchMatch(List author_list){ - ResultProtos.Result.Metadata.Builder metadataBuilder = ResultProtos.Result.Metadata.newBuilder(); - boolean updated = false; - - for (FieldTypeProtos.Author a: author_list){ - FieldTypeProtos.Author.Builder author = searchAuthor(a, autoritative_authors); - if(author != null){ - updated = true; - metadataBuilder.addAuthor(author); - }else{ - metadataBuilder.addAuthor(FieldTypeProtos.Author.newBuilder(a)); - } - } - if(updated) - return metadataBuilder; - return null; - } - private FieldTypeProtos.Author.Builder searchAuthor(FieldTypeProtos.Author a, List author_list){ - if(containsOrcid(a.getPidList())) - return null; - for(FieldTypeProtos.Author autoritative_author : author_list) { - if (equals(autoritative_author, a)) { - if(!containsOrcid(a.getPidList())) - return update(a, autoritative_author); - } - } - return null; - - } - - private boolean containsOrcid(List pidList){ - if(pidList == null) - return false; - return pidList - .stream() - .filter(kv -> kv.getKey().equals(PropagationConstants.AUTHOR_PID)) - .collect(Collectors.toList()).size() > 0; - } - */ \ No newline at end of file