diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/ircdl_extention/PrepareNormalizedResultSpark.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/ircdl_extention/PrepareNormalizedResultSpark.java index 604b5475cf..6e8068861b 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/ircdl_extention/PrepareNormalizedResultSpark.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/ircdl_extention/PrepareNormalizedResultSpark.java @@ -15,6 +15,7 @@ import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.ircdl_extention.model.Author; import eu.dnetlib.dhp.ircdl_extention.model.Result; public class PrepareNormalizedResultSpark { @@ -53,33 +54,33 @@ public class PrepareNormalizedResultSpark { } private static void execNormalize(SparkSession spark, String outputPath, String inputPath) { - Dataset normalized_result = Utils - .readPath(spark, inputPath + "publicationsWithOrcid", Result.class) - .union(Utils.readPath(spark, inputPath + "datasetWithOrcid", Result.class)) - .union(Utils.readPath(spark, inputPath + "softwareWithOrcid", Result.class)) - .union(Utils.readPath(spark, inputPath + "otherWithOrcid", Result.class)) - .map((MapFunction) r -> { + Dataset normalized_result = Utils + .readPath(spark, inputPath + "publicationsWithOrcid", Author.class) + .union(Utils.readPath(spark, inputPath + "datasetWithOrcid", Author.class)) + .union(Utils.readPath(spark, inputPath + "softwareWithOrcid", Author.class)) + .union(Utils.readPath(spark, inputPath + "otherWithOrcid", Author.class)) + .map((MapFunction) r -> { r.setName(Utils.normalizeString(r.getName())); r.setSurname(Utils.normalizeString(r.getSurname())); r.setFullname(Utils.normalizeString(r.getFullname())); return r; - }, Encoders.bean(Result.class)); + }, Encoders.bean(Author.class)); normalized_result .write() .option("compression", "gzip") .mode(SaveMode.Overwrite) - .json(outputPath + "ResultWithOrcid"); + .json(outputPath + "ResultAuthorNormalized"); normalized_result - .filter((FilterFunction) r -> !r.getId().startsWith("50|dedup")) + .filter((FilterFunction) r -> !r.getId().startsWith("50|dedup")) .write() .option("compression", "gzip") .mode(SaveMode.Overwrite) .json(outputPath + "collectedResultWithOrcid"); normalized_result - .filter((FilterFunction) r -> !r.getDeletedbyinference()) + .filter((FilterFunction) r -> !r.getDeletedbyinference()) .write() .option("compression", "gzip") .mode(SaveMode.Overwrite) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/ircdl_extention/PrepareResultSpark.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/ircdl_extention/PrepareResultSpark.java index 5c6813142d..2b40232fc6 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/ircdl_extention/PrepareResultSpark.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/ircdl_extention/PrepareResultSpark.java @@ -18,6 +18,7 @@ import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.ircdl_extention.model.Author; import eu.dnetlib.dhp.ircdl_extention.model.KeyValue; import eu.dnetlib.dhp.ircdl_extention.model.Result; import eu.dnetlib.dhp.schema.common.ModelConstants; @@ -65,52 +66,58 @@ public class PrepareResultSpark { String input_path, Class resultClazz, String output_path) { Dataset publicationDataset = Utils.readPath(spark, input_path, resultClazz); - Dataset result = publicationDataset.filter((FilterFunction) p -> { + Dataset result = publicationDataset.filter((FilterFunction) p -> { if (p.getAuthor() == null) return false; if (p.getAuthor().size() == 0) return false; return true; - }) - .flatMap((FlatMapFunction) p -> { - List reslist = new ArrayList<>(); - p.getAuthor().forEach(a -> { - a.getPid().forEach(apid -> { - if (apid.getQualifier().getClassid().equals(ModelConstants.ORCID) - || apid.getQualifier().getClassid().equals(ModelConstants.ORCID_PENDING)) { - Result r = new Result(); - r.setDeletedbyinference(p.getDataInfo().getDeletedbyinference()); - r.setId(p.getId()); - r - .setCf( - p - .getCollectedfrom() - .stream() - .map(cf -> KeyValue.newInstance(cf.getKey(), cf.getValue())) - .collect(Collectors.toList())); - r - .setPid( - p - .getPid() - .stream() - .map( - pid -> KeyValue - .newInstance(pid.getQualifier().getClassid(), pid.getValue())) - .collect(Collectors.toList())); - r.setName(a.getName()); - r.setSurname(a.getSurname()); - r.setFullname(a.getFullname()); - r.setOid(apid.getValue()); - reslist.add(r); - } - }); + }); - }); - return reslist.iterator(); - }, Encoders.bean(Result.class)); - result + result.flatMap((FlatMapFunction) p -> { + List reslist = new ArrayList<>(); + p.getAuthor().forEach(a -> { + Author r = new Author(); + r.setDeletedbyinference(p.getDataInfo().getDeletedbyinference()); + r.setId(p.getId()); + + r + .setCf( + p + .getCollectedfrom() + .stream() + .map(cf -> KeyValue.newInstance(cf.getKey(), cf.getValue())) + .collect(Collectors.toList())); + + r.setName(a.getName()); + r.setSurname(a.getSurname()); + r.setFullname(a.getFullname()); + r + .setPid( + p + .getPid() + .stream() + .map( + pid -> KeyValue + .newInstance(pid.getQualifier().getClassid(), pid.getValue())) + .collect(Collectors.toList())); + r + .setApid( + Optional + .ofNullable(a.getPid()) + .map( + pids -> pids + .stream() + .map(pd -> KeyValue.newInstance(pd.getQualifier().getClassid(), pd.getValue())) + .collect(Collectors.toList())) + .orElse(new ArrayList<>())); + reslist.add(r); + + }); + return reslist.iterator(); + }, Encoders.bean(Author.class)) .write() - .option("compressio", "gzip") + .option("compression", "gzip") .mode(SaveMode.Overwrite) .json(output_path); diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/ircdl_extention/SelectAuthorWithOrcidOnlySpark.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/ircdl_extention/SelectAuthorWithOrcidOnlySpark.java index d546f8dc25..554e274199 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/ircdl_extention/SelectAuthorWithOrcidOnlySpark.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/ircdl_extention/SelectAuthorWithOrcidOnlySpark.java @@ -1,4 +1,78 @@ + package eu.dnetlib.dhp.ircdl_extention; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.ircdl_extention.model.Author; +import eu.dnetlib.dhp.ircdl_extention.model.KeyValue; +import eu.dnetlib.dhp.ircdl_extention.model.Result; +import eu.dnetlib.dhp.schema.common.ModelConstants; + public class SelectAuthorWithOrcidOnlySpark { + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + PrepareResultSpark.class + .getResourceAsStream( + "/eu/dnetlib/dhp/ircdl_extention/prepare_result_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + + final String inputPath = parser.get("inputPath"); + + final String outputPath = parser.get("outputPath"); + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + selectAuthors(spark, inputPath, outputPath); + }); + } + + private static void selectAuthors(SparkSession spark, String input_path, String output_path) { + Dataset resultDataset = Utils.readPath(spark, input_path, Author.class); + resultDataset.flatMap((FlatMapFunction) p -> { + List reslist = new ArrayList<>(); + p.getApid().forEach(a -> { + if (a.getKey().equals(ModelConstants.ORCID_PENDING) || a.getKey().equals(ModelConstants.ORCID)) { + Result r = Result.fromAuthor(p); + r.setOid(a.getValue()); + reslist.add(r); + } + }); + return reslist.iterator(); + }, Encoders.bean(Result.class)) + .write() + .option("compressio", "gzip") + .mode(SaveMode.Overwrite) + .json(output_path); + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/ircdl_extention/Utils.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/ircdl_extention/Utils.java index b9be54a630..417969af57 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/ircdl_extention/Utils.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/ircdl_extention/Utils.java @@ -8,6 +8,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; import org.apache.commons.text.similarity.CosineDistance; @@ -15,7 +16,10 @@ import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.wcohen.ss.JaroWinkler; @@ -28,6 +32,9 @@ public class Utils implements Serializable { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final Logger log = LoggerFactory + .getLogger(eu.dnetlib.dhp.ircdl_extention.Utils.class); + public static String normalizeString(String input) { if (input == null || input.equals("void")) return new String(); @@ -55,44 +62,103 @@ public class Utils implements Serializable { } private static List getList(List input) { - return input.stream().map(st -> st.trim()).filter(st -> st.length() > 0).collect(Collectors.toList()); + return input + .stream() + .map(st -> st.trim()) + .filter(st -> st.length() > 0) + .sorted() + .collect(Collectors.toList()); } - public static boolean filterFunction(Tuple2 input) { - - List res = getList(Arrays.asList(input._1().getFullname().split(" "))) + private static List getListInitials(List input) { + List ret = new ArrayList<>(); + List tmp = input .stream() + .map(st -> st.trim()) + .filter(st -> st.length() > 0) + .map(st -> st.charAt(0)) .sorted() .collect(Collectors.toList()); + if (tmp.size() == 1) + ret.add(String.valueOf(tmp.get(0))); + for (int i = 0; i < tmp.size(); i++) { + for (int j = i + 1; j < tmp.size(); j++) { + ret.add(String.valueOf(tmp.get(i)) + String.valueOf(tmp.get(j))); + } + } + + return ret; + + } + + // selezione delle coppie di primi caratteri per ogni parola che compone il nome + // se ci sono match il nome e' giusto + // aggiungere verifica che la lunghezza delle liste non sia troppo sbilanciata: se una lista e' lunga + public static boolean conservativeFilterFunction(Tuple2 input) { + + List res = getListInitials(Arrays.asList(input._1().getFullname().split(" "))); Orcid or = input._2(); List tmp = new ArrayList<>(); Collections.addAll(tmp, or.getName().split(" ")); Collections.addAll(tmp, or.getSurname().split(" ")); return checkContains( - res, getList(tmp) - .stream() - .sorted() - .collect(Collectors.toList())) + res, getListInitials(tmp), false) || checkContains( - res, getList(Arrays.asList(or.getCreditname().split(" "))) - .stream() - .sorted() - .collect(Collectors.toList())) + res, getListInitials(Arrays.asList(or.getCreditname().split(" "))), false) || or .getOtherNames() .stream() .anyMatch( on -> checkContains( - res, getList(Arrays.asList(on.split(" "))) - .stream() - .sorted() - .collect(Collectors.toList()))); + res, getListInitials(Arrays.asList(on.split(" "))), false)); + + } + + public static boolean filterFunction(Tuple2 input) throws JsonProcessingException { + + try { + List res = getList(Arrays.asList(input._1().getFullname().split(" "))); + Orcid or = input._2(); + List tmp = new ArrayList<>(); + Collections.addAll(tmp, or.getName().split(" ")); + Collections.addAll(tmp, or.getSurname().split(" ")); + return checkContains( + res, getList(tmp) + .stream() + .sorted() + .collect(Collectors.toList())) + || + checkContains( + res, getList(Arrays.asList(or.getCreditname().split(" "))) + .stream() + .sorted() + .collect(Collectors.toList())) + || + or + .getOtherNames() + .stream() + .anyMatch( + on -> checkContains( + res, getList(Arrays.asList(on.split(" "))) + .stream() + .sorted() + .collect(Collectors.toList()))); + } catch (Exception e) { + + log.info("EXCEPTIONNNN: " + new ObjectMapper().writeValueAsString(input)); + throw e; + } + } private static boolean checkContains(List result, List orcid) { + return checkContains(result, orcid, true); + } + + private static boolean checkContains(List result, List orcid, boolean jaro) { if (result.size() == 0 || orcid.size() == 0) { return true; } @@ -118,10 +184,13 @@ public class Utils implements Serializable { if (isIn(result, orcid)) return true; } - // apply JaroWinkler distance - double score = new JaroWinkler() - .score(StringUtils.joinWith(" ", result), StringUtils.joinWith(" ", orcid)); - return score > 0.95; + if (jaro) { + // apply JaroWinkler distance + double score = new JaroWinkler() + .score(StringUtils.joinWith(" ", result), StringUtils.joinWith(" ", orcid)); + return score > 0.95; + } + return false; } private static boolean isIn(List lst1, List lst2) { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/ircdl_extention/WrongSpark.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/ircdl_extention/WrongSpark.java index 82e4122e90..b7d6f756cc 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/ircdl_extention/WrongSpark.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/ircdl_extention/WrongSpark.java @@ -18,7 +18,10 @@ import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; +import com.fasterxml.jackson.core.JsonProcessingException; + import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.ircdl_extention.model.Author; import eu.dnetlib.dhp.ircdl_extention.model.Orcid; import eu.dnetlib.dhp.ircdl_extention.model.Result; import eu.dnetlib.dhp.ircdl_extention.model.ShuffleInfo; @@ -53,6 +56,8 @@ public class WrongSpark { final String resultPath = parser.get("inputPath"); + final String authorPath = parser.get("authorPath"); + SparkConf conf = new SparkConf(); conf.set("hive.metastore.uris", "thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083"); @@ -62,16 +67,17 @@ public class WrongSpark { spark -> { Utils.removeOutputDir(spark, outputPath); findWrong(spark, orcidPath, outputPath + "/wrong", resultPath); - findShuffle(spark, orcidPath, outputPath + "/shuffle", resultPath); + findShuffle(spark, orcidPath, outputPath + "/shuffle", resultPath, authorPath); }); } - private static void findShuffle(SparkSession spark, String orcidPath, String outputPath, String resultPath) { + private static void findShuffle(SparkSession spark, String orcidPath, String outputPath, String resultPath, + String authorPath) { Utils - .readPath(spark, resultPath, Result.class) + .readPath(spark, authorPath, Author.class) .map( - (MapFunction) r -> ShuffleInfo + (MapFunction) r -> ShuffleInfo .newInstance(r.getName(), r.getSurname(), r.getFullname(), r.getId()), Encoders.bean(ShuffleInfo.class)) .union( @@ -82,7 +88,7 @@ public class WrongSpark { .newInstance( t2._1().getName(), t2._1().getSurname(), t2._1().getFullname(), t2._1().getId(), t2._2().getName(), t2._2().getSurname(), - t2._2().getCreditname(), t2._2().getOtherNames()), + t2._2().getCreditname(), t2._2().getOtherNames(), t2._2().getOrcid()), Encoders.bean(ShuffleInfo.class))) .groupByKey((MapFunction) si -> si.getId(), Encoders.STRING()) .flatMapGroups((FlatMapGroupsFunction) (s, it) -> { @@ -99,7 +105,12 @@ public class WrongSpark { }); return ret.iterator(); }, Encoders.bean(ShuffleInfo.class)) - .filter(Objects::nonNull); + .filter(Objects::nonNull) + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(outputPath); + /* * def checkShuffle(x): alis = [a for a in x[1]] dic = {} count = 0 for entry in alis: if entry['orcid'] != '': * dic[entry['orcid']] = entry for orcid in dic: name = dic[orcid]['oname'] surname = dic[orcid]['osurname'] for @@ -116,15 +127,24 @@ public class WrongSpark { private static boolean checkShuffle(ShuffleInfo e, List shuffleInfoList) { - return shuffleInfoList + boolean b = shuffleInfoList .stream() .anyMatch( - si -> Utils - .filterFunction( - new Tuple2<>(Result.newInstance(si.getAfullname()), - Orcid - .newInstance( - si.getOname(), si.getOsurname(), si.getOcreditName(), si.getoOtherNames())))); + si -> { + try { + return Utils + .filterFunction( + new Tuple2<>(Result.newInstance(si.getAfullname()), + Orcid + .newInstance( + e.getOname(), e.getOsurname(), e.getOcreditName(), + e.getoOtherNames()))); + } catch (JsonProcessingException ex) { + throw new RuntimeException(ex); + } + }); + + return b; } private static Dataset> getWrong(SparkSession spark, String orcidPath, String resultPath) { @@ -139,7 +159,7 @@ public class WrongSpark { .col("oid") .equalTo(orcidDataset.col("orcid")), "inner") - .filter((FilterFunction>) t2 -> !Utils.filterFunction(t2)); + .filter((FilterFunction>) t2 -> !Utils.conservativeFilterFunction(t2)); } private static void findWrong(SparkSession spark, String orcidPath, String outputPath, String resultPath) { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/ircdl_extention/model/Author.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/ircdl_extention/model/Author.java index df64995f7a..1164bf3c92 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/ircdl_extention/model/Author.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/ircdl_extention/model/Author.java @@ -1,4 +1,18 @@ + package eu.dnetlib.dhp.ircdl_extention.model; -public class Author { +import java.io.Serializable; +import java.util.List; + +public class Author extends Result implements Serializable { + + private List apid; + + public List getApid() { + return apid; + } + + public void setApid(List apid) { + this.apid = apid; + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/ircdl_extention/model/Orcid.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/ircdl_extention/model/Orcid.java index 23629b7113..0808c4c7fb 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/ircdl_extention/model/Orcid.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/ircdl_extention/model/Orcid.java @@ -14,13 +14,12 @@ public class Orcid implements Serializable { private Boolean works; private String name; - public static Orcid newInstance(String oname, String osurname, String ocreditName, List getoOtherNames) { + public static Orcid newInstance(String oname, String osurname, String ocreditName, List oOtherNames) { Orcid o = new Orcid(); o.name = oname; o.surname = osurname; o.creditname = ocreditName; - o.otherNames = getoOtherNames; - + o.otherNames = oOtherNames; return o; } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/ircdl_extention/model/Result.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/ircdl_extention/model/Result.java index d9eb81bf79..7e5220b73b 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/ircdl_extention/model/Result.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/ircdl_extention/model/Result.java @@ -21,6 +21,19 @@ public class Result implements Serializable { return r; } + public static Result fromAuthor(Author p) { + Result r = new Result(); + r.deletedbyinference = p.getDeletedbyinference(); + r.id = p.getId(); + r.cf = p.getCf(); + r.pid = p.getPid(); + r.name = p.getName(); + r.surname = p.getSurname(); + r.fullname = p.getFullname(); + + return r; + } + public Boolean getDeletedbyinference() { return deletedbyinference; } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/ircdl_extention/model/ShuffleInfo.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/ircdl_extention/model/ShuffleInfo.java index 0f790d97a4..47c2ae04d0 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/ircdl_extention/model/ShuffleInfo.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/ircdl_extention/model/ShuffleInfo.java @@ -15,6 +15,15 @@ public class ShuffleInfo implements Serializable { private List oOtherNames; private String orcid; private String id; + private String pid; + + public String getPid() { + return pid; + } + + public void setPid(String pid) { + this.pid = pid; + } public String getAname() { return aname; @@ -98,7 +107,7 @@ public class ShuffleInfo implements Serializable { } public static ShuffleInfo newInstance(String aname, String asurname, String afullname, String id, String oname, - String osurname, String ocredtname, List oOthername) { + String osurname, String ocredtname, List oOthername, String orcid, String pid) { ShuffleInfo si = new ShuffleInfo(); si.afullname = afullname; si.aname = aname; @@ -108,6 +117,8 @@ public class ShuffleInfo implements Serializable { si.osurname = osurname; si.ocreditName = ocredtname; si.oOtherNames = oOthername; + si.orcid = orcid; + si.pid = pid; return si; } diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/ircdl_extention/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/ircdl_extention/oozie_app/workflow.xml index 3bae12d1a9..989f2a3386 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/ircdl_extention/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/ircdl_extention/oozie_app/workflow.xml @@ -73,6 +73,7 @@ --inputPath${inputPath}/publication --resultClasseu.dnetlib.dhp.schema.oaf.Publication --outputPath${workingDir}/GRAPH/publicationsWithOrcid + @@ -98,6 +99,7 @@ --inputPath${inputPath}/dataset --resultClasseu.dnetlib.dhp.schema.oaf.Dataset --outputPath${workingDir}/GRAPH/datasetWithOrcid + @@ -123,6 +125,7 @@ --inputPath${inputPath}/software --resultClasseu.dnetlib.dhp.schema.oaf.Software --outputPath${workingDir}/GRAPH/softwareWithOrcid + @@ -148,6 +151,7 @@ --inputPath${inputPath}/otherresearchproduct --resultClasseu.dnetlib.dhp.schema.oaf.OtherResearchProduct --outputPath${workingDir}/GRAPH/otherWithOrcid + @@ -156,6 +160,7 @@ + yarn @@ -176,6 +181,31 @@ --inputPath${workingDir}/GRAPH/ --outputPath${workingDir}/GRAPH/Normalized/ + + + + + + + + yarn + cluster + PrepareResult + eu.dnetlib.dhp.ircdl_extention.SelectAuthorWithOrcidOnlySpark + dhp-aggregation-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --inputPath${workingDir}/GRAPH/Normalized/ResultAuthorNormalized/ + --outputPath${workingDir}/GRAPH/Normalized/ResultWithOrcid/ + @@ -322,6 +352,7 @@ --inputPath${workingDir}/GRAPH/InstRepo/ --outputPath${outputPath}/InstRepo/ --orcidPath${workingDir}/ORCID/entrySetMayNormalized/ + --ap${workingDir}/GRAPH/Normalized/ResultAuthorNormalized/ @@ -344,8 +375,9 @@ --conf spark.sql.shuffle.partitions=3840 --inputPath${workingDir}/GRAPH/Datacite/allDatacite/ - --outputPath${outputPath} + --outputPath${outputPath}/Datacite/ --orcidPath${workingDir}/ORCID/entrySetMayNormalized/ + --ap${workingDir}/GRAPH/Normalized/ResultAuthorNormalized/ @@ -371,6 +403,7 @@ --inputPath${workingDir}/GRAPH/Crossref/ --outputPath${outputPath}/Crossref/ --orcidPath${workingDir}/ORCID/entrySetMayNormalized/ + --ap${workingDir}/GRAPH/Normalized/ResultAuthorNormalized/ @@ -396,6 +429,7 @@ --inputPath${workingDir}/GRAPH/AllTheRest/ --outputPath${outputPath}/AllTheRest/ --orcidPath${workingDir}/ORCID/entrySetMayNormalized/ + --ap${workingDir}/GRAPH/Normalized/ResultAuthorNormalized/ @@ -421,6 +455,7 @@ --inputPath${workingDir}/GRAPH/Datacite/Zenodo/ --outputPath${outputPath}/Zenodo/ --orcidPath${workingDir}/ORCID/entrySetMayNormalized/ + --ap${workingDir}/GRAPH/Normalized/ResultAuthorNormalized/ @@ -445,6 +480,7 @@ --inputPath${workingDir}/GRAPH/Datacite/Figshare/ --outputPath${outputPath}/Figshare/ --orcidPath${workingDir}/ORCID/entrySetMayNormalized/ + --ap${workingDir}/GRAPH/Normalized/ResultAuthorNormalized/ @@ -469,6 +505,7 @@ --inputPath${workingDir}/GRAPH/Datacite/Dryad/ --outputPath${outputPath}/Dryad/ --orcidPath${workingDir}/ORCID/entrySetMayNormalized/ + --ap${workingDir}/GRAPH/Normalized/ResultAuthorNormalized/ diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/ircdl_extention/prepare_result_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/ircdl_extention/prepare_result_parameters.json index 24b665a9d6..b672af62e8 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/ircdl_extention/prepare_result_parameters.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/ircdl_extention/prepare_result_parameters.json @@ -21,6 +21,6 @@ "paramName": "rc", "paramLongName": "resultClass", "paramDescription": "the path of the new ActionSet", - "paramRequired": true + "paramRequired": false } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/ircdl_extention/wrong_orcid_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/ircdl_extention/wrong_orcid_parameters.json index 5db9c7b4ef..f3c6c64b48 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/ircdl_extention/wrong_orcid_parameters.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/ircdl_extention/wrong_orcid_parameters.json @@ -22,5 +22,11 @@ "paramLongName": "inputPath", "paramDescription": "thepath of the new ActionSet", "paramRequired": true + }, + { + "paramName": "ap", + "paramLongName": "authorPath", + "paramDescription": "thepath of the new ActionSet", + "paramRequired": true } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/ircdl_extention/WrongOrcidTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/ircdl_extention/WrongOrcidTest.java index b5cc6787d5..1b604c7b3c 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/ircdl_extention/WrongOrcidTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/ircdl_extention/WrongOrcidTest.java @@ -3,13 +3,21 @@ package eu.dnetlib.dhp.ircdl_extention; import java.util.ArrayList; import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.neethi.Assertion; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.jayway.jsonpath.WriteContext; + import eu.dnetlib.dhp.ircdl_extention.model.Orcid; import eu.dnetlib.dhp.ircdl_extention.model.Result; +import eu.dnetlib.dhp.ircdl_extention.model.ShuffleInfo; import scala.Tuple2; public class WrongOrcidTest { @@ -45,51 +53,228 @@ public class WrongOrcidTest { @Test public void wrongOrcidFalse3() throws Exception { Assertions - .assertTrue( - Utils - .filterFunction( - new Tuple2<>(Result.newInstance("ghosh ss"), - Orcid - .newInstance( - "satrajit", "ghosh", - "satra", - Arrays.asList("satra","satrajit s ghosh"))))); + .assertTrue( + Utils + .filterFunction( + new Tuple2<>(Result.newInstance("ghosh ss"), + Orcid + .newInstance( + "satrajit", "ghosh", + "satra", + Arrays.asList("satra", "satrajit s ghosh"))))); } @Test public void wrongOrcidTrue() throws Exception { Assertions - .assertFalse( - Utils - .filterFunction( - new Tuple2<>(Result.newInstance("kluft lukas"), - Orcid - .newInstance( - "satrajit", "ghosh", - "satra", - Arrays.asList("satra","satrajit s ghosh"))))); + .assertFalse( + Utils + .filterFunction( + new Tuple2<>(Result.newInstance("kluft lukas"), + Orcid + .newInstance( + "satrajit", "ghosh", + "satra", + Arrays.asList("satra", "satrajit s ghosh"))))); } @Test public void wrongOrcidFalse4() throws Exception { Assertions - .assertTrue( - Utils - .filterFunction( - new Tuple2<>(Result.newInstance("schulz s a"), - Orcid - .newInstance( - "sebastian", "schulz", - "sebastian a schulz", - new ArrayList<>())))); + .assertTrue( + Utils + .filterFunction( + new Tuple2<>(Result.newInstance("schulz s a"), + Orcid + .newInstance( + "sebastian", "schulz", + "sebastian a schulz", + new ArrayList<>())))); } + @Test + public void wrongOrcidFalse5() throws Exception { + Assertions + .assertTrue( + Utils + .filterFunction( + new Tuple2<>(Result.newInstance("domingues af"), + Orcid + .newInstance( + "allysson", "domingues", + "allysson f domingues", + new ArrayList<>())))); + } + @Test + public void wrongOrcidFalseConservative() throws Exception { + Assertions + .assertTrue( + Utils + .conservativeFilterFunction( + new Tuple2<>(Result.newInstance("veigas pires cristina"), + Orcid + .newInstance( + "cristina", "veiga pires", "c veiga pires", + Arrays.asList("c c veiga pires"))))); + } + @Test + public void wrongOrcidFalseConservative2() throws Exception { + Assertions + .assertTrue( + Utils + .conservativeFilterFunction( + new Tuple2<>(Result.newInstance("yushkevich p"), + Orcid + .newInstance( + "paul", "yushkevich", "paul a yushkevich", + new ArrayList<>())))); + } + + @Test + public void wrongOrcidFalseConservative3() throws Exception { + Assertions + .assertTrue( + Utils + .conservativeFilterFunction( + new Tuple2<>(Result.newInstance("ghosh ss"), + Orcid + .newInstance( + "satrajit", "ghosh", + "satra", + Arrays.asList("satra", "satrajit s ghosh"))))); + + } + + @Test + public void wrongOrcidTrueConservative() throws Exception { + Assertions + .assertFalse( + Utils + .conservativeFilterFunction( + new Tuple2<>(Result.newInstance("kluft lukas"), + Orcid + .newInstance( + "satrajit", "ghosh", + "satra", + Arrays.asList("satra", "satrajit s ghosh"))))); + + } + + @Test + public void wrongOrcidFalseConservative4() throws Exception { + Assertions + .assertTrue( + Utils + .conservativeFilterFunction( + new Tuple2<>(Result.newInstance("schulz s a"), + Orcid + .newInstance( + "sebastian", "schulz", + "sebastian a schulz", + new ArrayList<>())))); + + } + + @Test + public void wrongOrcidFalseConservative5() throws Exception { + Assertions + .assertTrue( + Utils + .conservativeFilterFunction( + new Tuple2<>(Result.newInstance("domingues af"), + Orcid + .newInstance( + "allysson", "domingues", + "allysson f domingues", + new ArrayList<>())))); + + } + + @Test + public void wrongOrcidTrueConservative2() throws Exception { + Assertions + .assertFalse( + Utils + .conservativeFilterFunction( + new Tuple2<>(Result.newInstance("figueiredo pontes lorena lobo de"), + Orcid + .newInstance( + "moyses", "soares", + "moyses antonio porto soares", + new ArrayList<>())))); + + } + + @Test + public void wrongOrcidFalseConservative6() throws Exception { + Assertions + .assertTrue( + Utils + .conservativeFilterFunction( + new Tuple2<>(Result.newInstance("da luz geraldo eduardo"), + Orcid + .newInstance( + "geraldo", "luz jr", + "luz jr g e", + new ArrayList<>())))); + + } + + @Test + public void testShuffle() throws Exception { + + List shuffleInfoList = new ArrayList<>(); + + shuffleInfoList + .add( + ShuffleInfo + .newInstance( + "Miriam", "Baglioni", "Miriam Baglioni", "50|fake_1", + "Alessia", "Bardi", "", new ArrayList(), "orcid_alessia")); + shuffleInfoList.add(ShuffleInfo.newInstance("Alessia", "Bardi", "Alessia Bardi", "50|fake_1")); + shuffleInfoList.add(ShuffleInfo.newInstance("Miriam", "Baglioni", "Miriam Baglioni", "50|fake_1")); + shuffleInfoList + .add( + ShuffleInfo + .newInstance( + "Alessia", "Bardi", "Alessia Bardi", "50|fake_1", + "Miriam", "Baglioni", "", new ArrayList(), "orcid_miriam")); + shuffleInfoList.add(ShuffleInfo.newInstance("Claudio", "Atzori", "Claudio Atzori", "50|fake_1")); + + List tmp = shuffleInfoList + .stream() + .filter(e -> Optional.ofNullable(e.getOrcid()).isPresent()) + .collect(Collectors.toList()); + int count = 0; + for (ShuffleInfo e : tmp) { + if (verifyShuffle(e, shuffleInfoList)) + count++; + + } + + System.out.println(count); + } + + private boolean verifyShuffle(ShuffleInfo e, List shuffleInfoList) { + return shuffleInfoList.stream().anyMatch(si -> { + try { + final Orcid orcid = Orcid + .newInstance(e.getOname(), e.getOsurname(), e.getOcreditName(), e.getoOtherNames()); + return Utils + .filterFunction( + new Tuple2<>(Result.newInstance(si.getAfullname()), orcid)); + } catch (JsonProcessingException ex) { + ex.printStackTrace(); + } + return false; + }); + } } diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/ircdl_extention/shuffle.json b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/ircdl_extention/shuffle.json index e69de29bb2..6ef5f43c82 100644 --- a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/ircdl_extention/shuffle.json +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/ircdl_extention/shuffle.json @@ -0,0 +1,4 @@ +{"aname":"Miriam", "asurname":"Baglioni", "afullname":"Miriam Baglioni","oname": "Alessia","osurname": "Bardi","ocreditName": "", "oOtherNames": [],"orcid": "orcid_alessia","id": "50|fake1"} +{"aname":"Alessia", "asurname":"Bardi", "afullname":"Alessia Bardi","oname": null,"osurname": null,"ocreditName": null, "oOtherNames": null,"orcid": null,"id": "50|fake1"} +{"aname":"Claudio", "asurname":"Atzori", "afullname":"Claudio Atzori","oname": null,"osurname": null,"ocreditName": null, "oOtherNames": null,"orcid": null,"id": "50|fake1"} +{"aname":"Alessia", "asurname":"Bardi", "afullname":"Alessia Bardi","oname": "Miriam","osurname": "Baglioni","ocreditName": "", "oOtherNames": [],"orcid": "orcid_miriam","id": "50|fake1"} \ No newline at end of file