Compare commits

...

43 Commits

Author SHA1 Message Date
Miriam Baglioni a9cc70d3b0 fixed issues on wf definition and proeprty name 2020-11-30 12:08:33 +01:00
Miriam Baglioni 02589717b0 change in wf definition to have the cleaning part follow the report part directly. Modification of the name of the properties. Adjustments in test classes 2020-11-30 12:00:26 +01:00
Miriam Baglioni 44a66ad8a6 first try in adding orcid emend after cleaning step 2020-11-27 17:28:29 +01:00
Miriam Baglioni 3d66a7c0d6 - 2020-11-25 17:41:29 +01:00
Miriam Baglioni 2f9ac993e0 merge branch with master 2020-11-25 14:42:06 +01:00
Miriam Baglioni 33c27b6f75 - 2020-11-20 10:07:50 +01:00
Miriam Baglioni d08dca0745 merge branch with master 2020-11-19 19:17:06 +01:00
Miriam Baglioni cb3cb8df04 - 2020-11-18 18:11:27 +01:00
Miriam Baglioni c702f8e6a3 added dependency for consrtaint computing, and added test 2020-11-18 14:16:48 +01:00
Miriam Baglioni f4fee8f43c changed upper bound for whitelist 2020-11-18 14:04:17 +01:00
Miriam Baglioni 96d50080b4 merge branch with master 2020-11-18 12:26:14 +01:00
Miriam Baglioni 0e407b5f23 new tests 2020-11-18 12:18:11 +01:00
Miriam Baglioni 07837e51a9 - 2020-11-18 12:16:00 +01:00
Miriam Baglioni 1f52649d13 changed workflow and added parameters (whitelist) 2020-11-18 12:14:58 +01:00
Miriam Baglioni 683275a5db added logic to filter out other possible right matches 2020-11-18 12:14:13 +01:00
Miriam Baglioni 2aa48a132a added alternative names on the report 2020-11-18 12:13:43 +01:00
Miriam Baglioni 8555082bb1 changed to allow and logic in constraints verification 2020-11-18 12:13:13 +01:00
Miriam Baglioni a4781ddf65 changed to consider and logic within constraints 2020-11-18 12:12:48 +01:00
Miriam Baglioni a708652093 added check for empty(null) values 2020-11-18 11:58:00 +01:00
Miriam Baglioni 457c07a522 added the ignore case for both the constraints 2020-11-18 11:56:40 +01:00
Miriam Baglioni ec5b5c3a23 added set of classes for the verification of constraints on the result. 2020-11-16 18:52:53 +01:00
Miriam Baglioni 19167c9b9d merge branch with master 2020-11-16 14:15:39 +01:00
Miriam Baglioni 0ad1e237e6 added check to avoid break when name/surname is only composed of the word dr 2020-11-16 14:15:03 +01:00
Miriam Baglioni c29d142087 - 2020-11-16 10:53:12 +01:00
Miriam Baglioni 0f1a4f6637 added collectedfrom information on record 2020-11-09 16:07:17 +01:00
Miriam Baglioni 0ef5e7dc34 fixed issue for authors with no name 2020-11-09 16:06:52 +01:00
Miriam Baglioni 902b0db85a try to make workflow and sub-workflow for making report and actual orcid cleaning 2020-11-06 17:19:28 +01:00
Miriam Baglioni c56a43c90b - 2020-11-06 15:46:31 +01:00
Miriam Baglioni 863ce76820 merge branch with master 2020-11-06 15:30:19 +01:00
Miriam Baglioni a1aed813a5 done workflow for report and actual cleaning in the results. Renamed and moved some files 2020-11-06 15:29:28 +01:00
Miriam Baglioni fff512a87a added one level of checking (search all the words of name surname in orcid and in paper) 2020-11-04 18:30:09 +01:00
Miriam Baglioni 44cf0b712f added repartition(1) to have all the output in a single json file 2020-11-04 17:01:08 +01:00
Miriam Baglioni 1293dd276a merge branch with master 2020-11-04 13:37:34 +01:00
Miriam Baglioni b610d08399 added test for the report generation 2020-11-04 13:20:16 +01:00
Miriam Baglioni c694457acc added new attribute to store the orcid fullname when provided 2020-11-04 13:19:57 +01:00
Miriam Baglioni 72abbb0510 added the link to the property file 2020-11-04 13:19:25 +01:00
Miriam Baglioni fd00d44e1e new classes for generating the report for the orcid cleaning plus new property file 2020-11-04 13:18:49 +01:00
Miriam Baglioni 4ee84ae724 added files for testing purposes 2020-11-02 18:25:41 +01:00
Miriam Baglioni 7dcb2eff02 added needed dependency 2020-11-02 18:25:07 +01:00
Miriam Baglioni 43ddeedd6a first part of the test 2020-11-02 18:24:25 +01:00
Miriam Baglioni 72fb425787 new logis for the cleaning of the authors orcid 2020-11-02 18:23:16 +01:00
Miriam Baglioni 967d839ba1 merge branch with master 2020-11-02 10:23:11 +01:00
Miriam Baglioni f3de9c02ae first classes for the orcid cleaning 2020-10-30 16:02:28 +01:00
34 changed files with 3902 additions and 4 deletions

View File

@ -47,6 +47,12 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-text -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
<version>1.9</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
@ -117,6 +123,32 @@
<groupId>org.json4s</groupId>
<artifactId>json4s-jackson_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.0.cloudera2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/me.xdrop/fuzzywuzzy -->
<dependency>
<groupId>me.xdrop</groupId>
<artifactId>fuzzywuzzy</artifactId>
<version>1.3.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.intuit.fuzzymatcher/fuzzy-matcher -->
<dependency>
<groupId>com.intuit.fuzzymatcher</groupId>
<artifactId>fuzzy-matcher</artifactId>
<version>1.0.4</version>
</dependency>
<dependency>
<groupId>io.github.classgraph</groupId>
<artifactId>classgraph</artifactId>
<version>4.8.71</version>
<scope>compile</scope>
</dependency>
</dependencies>

View File

@ -0,0 +1,182 @@
package eu.dnetlib.dhp.oa.graph.clean.authorpids;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.util.*;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.text.similarity.CosineDistance;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.PacePerson;
import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.schema.oaf.*;
import net.sf.saxon.trans.Maker;
import scala.Serializable;
import scala.Tuple2;
/**
* It cleans the wrong association between authors and orcid ids
*/
public class CleanAuthorPidsSparkJob implements Serializable {
private static final Logger log = LoggerFactory.getLogger(CleanAuthorPidsSparkJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
CleanAuthorPidsSparkJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/clean_orcid/input_clean_author_pids.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputPath = parser.get("inputPath");
log.info("inputPath: {}", inputPath);
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
String graphTableClassName = parser.get("graphTableClassName");
log.info("graphTableClassName: {}", graphTableClassName);
String reportPath = parser.get("reportPath");
log.info("reportPath: {}", reportPath);
Class<? extends Result> entityClazz = (Class<? extends Result>) Class.forName(graphTableClassName);
SparkConf conf = new SparkConf();
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
cleanAuthorPid(spark, inputPath, outputPath, reportPath, entityClazz);
// cleanAuthorPid(
// spark, inputPath, entityClazz, outputPath, preparedInfoPath,
// loadAuthoritativeOrcid(spark, orcidInputPath));
});
}
/**
* Cleans the author pids in accordance to what found in the report
* @param spark
* @param inputPath the path where to find the result to emend
* @param outputPath the path where to store the emeded result
* @param reportPath the path where to find the report for the result type
* @param entityClazz the class of the result type
* @param <R>
*/
private static <R extends Result> void cleanAuthorPid(SparkSession spark, String inputPath, String outputPath,
String reportPath, Class<R> entityClazz) {
Dataset<R> result = Utils.readPath(spark, inputPath, entityClazz);
Dataset<Tuple2<String, ResultInfo>> wrong = Utils
.readPath(spark, reportPath, ReportInfo.class)
.flatMap((FlatMapFunction<ReportInfo, Tuple2<String, ResultInfo>>) ri -> {
List<Tuple2<String, ResultInfo>> ret = new ArrayList<>();
ri.getAssociatedAuthors().forEach(aa -> ret.add(new Tuple2<>(aa.getId(), aa)));
return ret.iterator();
}, Encoders.tuple(Encoders.STRING(), Encoders.bean(ResultInfo.class)));
result
.joinWith(wrong, result.col("id").equalTo(wrong.col("_1")), "left")
.groupByKey((MapFunction<Tuple2<R, Tuple2<String, ResultInfo>>, String>) value -> {
return value._1().getId();
}, Encoders.STRING())
.mapGroups((MapGroupsFunction<String, Tuple2<R, Tuple2<String, ResultInfo>>, R>) (key, it) -> {
Tuple2<R, Tuple2<String, ResultInfo>> first = it.next();
R ret = first._1();
if (Optional.ofNullable(first._2()).isPresent()) {
emendOrcidIfNeeded(first._2()._2(), ret);
it.forEachRemaining(t2 -> emendOrcidIfNeeded(t2._2()._2(), ret));
}
return ret;
}, Encoders.bean(entityClazz))
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(outputPath);
}
private static <R extends Result> void emendOrcidIfNeeded(ResultInfo ri, R ret) {
String orcid = ri.getOrcid();
ret.getAuthor().forEach(author -> {
List<StructuredProperty> pids = author.getPid();
for (StructuredProperty pid : pids) {
if (pid.getQualifier().getClassid().equals("orcid")) {
if (pid.getValue().equals(orcid) && sameAuthor(author, ri)) {
pids.remove(pid);
break;
}
}
}
});
}
private static boolean sameAuthor(Author author, ResultInfo associatedAuthors) {
String aname = author.getName();
String asurname = author.getSurname();
if (StringUtils.isBlank(asurname)) {
PacePerson pp = new PacePerson(author.getFullname(), false);
asurname = pp.getNormalisedSurname();
aname = pp.getNormalisedFirstName();
}
Double cosineDistance = new CosineDistance()
.apply(
MakeReportSparkJob.handleNameSurname(aname + " " + asurname),
MakeReportSparkJob
.handleNameSurname(associatedAuthors.getName() + " " + associatedAuthors.getSurname()));
if (Math.round(cosineDistance) == 0) {
return true;
}
return false;
}
private static <R> Dataset<R> readPath(
SparkSession spark, String inputPath, Class<R> clazz) {
return spark
.read()
.textFile(inputPath)
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
}

View File

@ -0,0 +1,55 @@
package eu.dnetlib.dhp.oa.graph.clean.authorpids;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import eu.dnetlib.dhp.oa.graph.clean.authorpids.constraints.ConstraintResolver;
import eu.dnetlib.dhp.oa.graph.clean.authorpids.constraints.Selection;
public class Constraints implements Serializable {
private String verb;
private String field;
private String value;
private Selection selection;
public String getVerb() {
return verb;
}
public void setVerb(String verb) {
this.verb = verb;
}
public String getField() {
return field;
}
public void setField(String field) {
this.field = field;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
public void setSelection(Selection sel) {
selection = sel;
}
public void setSelection(ConstraintResolver resolver)
throws InvocationTargetException, NoSuchMethodException, InstantiationException,
IllegalAccessException {
selection = resolver.getSelectionCriteria(verb, value);
}
public boolean verifyCriteria(Map<String, String> metadata) {
return selection.apply(metadata.get(field));
}
}

View File

@ -0,0 +1,643 @@
package eu.dnetlib.dhp.oa.graph.clean.authorpids;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.text.similarity.CosineDistance;
import org.apache.commons.text.similarity.LevenshteinDistance;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.intuit.fuzzymatcher.component.MatchService;
import com.intuit.fuzzymatcher.domain.Document;
import com.intuit.fuzzymatcher.domain.Element;
import com.intuit.fuzzymatcher.domain.ElementType;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.PacePerson;
import eu.dnetlib.dhp.oa.graph.clean.authorpids.constraints.ConstraintResolver;
import eu.dnetlib.dhp.oa.graph.clean.authorpids.constraints.ConstraintResolverFactory;
import eu.dnetlib.dhp.oa.graph.clean.authorpids.constraints.Selection;
import eu.dnetlib.dhp.schema.oaf.Result;
import me.xdrop.fuzzywuzzy.FuzzySearch;
import py4j.StringUtil;
import scala.Tuple2;
/**
* It checks if the orcid provided by ORCID and the one found in the result have the same author information. The
* author information is handled before the checking. Handling steps:
* words are lower-cased and trimmed, accents are replaced with their equivalent not accented. Only alfabethical
* characters and white space are retained. All the other chars are substituted with space.
*
* The check is made on different specification levels:
*
* Level1: orcid author surname and result author surname are identical. We consider the match to be right
*
* Level2: we verify if orcid author surname contains result author surname or vice versa. If it is the case we consider
* the match to be right
*
* Level3: we verify if one of the two surnames is composed by two words. In that case we concatenate the words and do
* the checking again. If the two match, we consider the match to be checked
*
* Level4: name and surname can be inverted in one of the two entities. We consider the set of words composing the name
* and the surname that are longer than 2 for orcid and result. If all the words of the shorter list are contained in
* the longer one, we consider the match to be checked
*
* Level5: name and surname are inverted but one of the two is composed by two words. Mix of Level3 and level4. We consider
* the match to be checked
*
* Level6: surnames differ for some chars. We apply the levenstein distance on surnames if their lenght is bigger than 3.
* If the distance is less than 2 we consider the match to be checked
*
* In all the other cases the match is considered wrong
*
*/
public class MakeReportSparkJob implements Serializable {
private static final Logger log = LoggerFactory.getLogger(MakeReportSparkJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
MakeReportSparkJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/clean_orcid/input_report_author_pids.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
String preparedInfoPath = parser.get("preparedInfoPath");
log.info("preparedInfoPath: {} ", preparedInfoPath);
String orcidInputPath = parser.get("orcidInputPath");
log.info("orcidInputPath: {}", orcidInputPath);
String whiteListString = parser.get("whitelist");
log.info("whitelist: {}", whiteListString);
SparkConf conf = new SparkConf();
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
makeReport(
spark, outputPath, preparedInfoPath,
loadAuthoritativeOrcid(spark, orcidInputPath), whiteListString);
});
}
/**
* Loads the sequence file containing the information about the orcid id, name and surname of the author.
* It returns a dataset whose model maps the one defined in the class OrcidAuthoritative
* @param spark the sparkSession
* @param orcidInputPath the path where to read the sequence file
* @return the dataset
*/
private static Dataset<OrcidAuthotitative> loadAuthoritativeOrcid(SparkSession spark, String orcidInputPath) {
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
return spark
.createDataset(
JavaRDD
.toRDD(
sc
.sequenceFile(orcidInputPath, Text.class, Text.class)
.map(pair -> OBJECT_MAPPER.readValue(pair._2().toString(), OrcidAuthotitative.class))),
Encoders.bean(OrcidAuthotitative.class));
}
private static void addInList(List<String> list, String to_add) {
for (String word : to_add.split(" ")) {
if (word.length() > 2) {
list.add(word);
}
}
}
public static String handleNameSurname(String input) {
input = input.toLowerCase().replace(".", "");
if (input.startsWith("dr")) {
if (input.length() > 3)
input = input.substring(3);
else
return "";
}
return StringUtils
.stripAccents(input.trim())
.replaceAll("[^a-z\\s]+", " ")
.trim();
}
private static <I extends Result> void makeReport(SparkSession spark,
String outputPath, String preparedInfoPath,
Dataset<OrcidAuthotitative> authoritative, String whiteliststring) {
WhiteList whitelist = new Gson().fromJson(whiteliststring, WhiteList.class);
log.info("whitelist_size: {}", whitelist.getWhitelist().size());
ConstraintResolver resolver = ConstraintResolverFactory.newInstance();
whitelist.getWhitelist().forEach(constraint -> {
try {
constraint.setSelection(resolver);
} catch (NoSuchMethodException e) {
e.printStackTrace();
} catch (InstantiationException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
});
Dataset<ResultInfo> resultInfo = readPath(spark, preparedInfoPath, ResultInfo.class);
Dataset<Tuple2<String, ReportInfo>> checkedResult = resultInfo
.joinWith(
authoritative, authoritative
.col("oid")
.equalTo(resultInfo.col("orcid")),
"left")
.map((MapFunction<Tuple2<ResultInfo, OrcidAuthotitative>, Tuple2<String, ReportInfo>>) pair -> {
return getStringReportInfoFuzzyTuple2(pair, whitelist);
}, Encoders.tuple(Encoders.STRING(), Encoders.bean(ReportInfo.class)))
.filter(Objects::nonNull);
writeSet(
checkedResult.filter((FilterFunction<Tuple2<String, ReportInfo>>) result -> result._1().equals("wrong")),
outputPath + "/wrong");
writeSet(
checkedResult.filter((FilterFunction<Tuple2<String, ReportInfo>>) result -> result._1().equals("right")),
outputPath + "/right");
writeSet(
checkedResult.filter((FilterFunction<Tuple2<String, ReportInfo>>) result -> result._1().equals("check")),
outputPath + "/check");
writeSet(
checkedResult.filter((FilterFunction<Tuple2<String, ReportInfo>>) result -> result._1().equals("missing")),
outputPath + "/missing");
writeSet(
checkedResult
.filter((FilterFunction<Tuple2<String, ReportInfo>>) result -> result._1().equals("constraint")),
outputPath + "/constraint");
}
private static double fuzzyMatch(String orcid, String result) {
// apply one or more fuzzy functions to determine if the input string match
// match 1.0 con fuzzy => giusti
// quelli che matchano sopra 0.66 con fuzzy li metto fra i giusti
// quelli che non stanno nel match di prima, ma matchano fuzzywizzy sopra 0.5 li metto in check
// probabilmente giusti
// quelli che matchano fuzzywizzy da 0.5 a 0.3
return 0;
}
public static Tuple2<String, ReportInfo> getStringReportInfoFuzzyTuple2(
Tuple2<ResultInfo, OrcidAuthotitative> pair, WhiteList whiteList) {
Optional<OrcidAuthotitative> ooa = Optional.ofNullable(pair._2());
if (!ooa.isPresent()) {
return null;
}
OrcidAuthotitative oa = ooa.get();
ResultInfo ri = pair._1();
if (StringUtils.isBlank(ri.getSurname())) {
PacePerson pp = new PacePerson(ri.getFullname(), false);
ri.setSurname(pp.getNormalisedSurname());
ri.setName(pp.getNormalisedFirstName());
}
ReportInfo reportInfo = new ReportInfo();
reportInfo.setOid(oa.getOid());
reportInfo.setOname(oa.getName());
reportInfo.setOsurname(oa.getSurname());
reportInfo.setOcreditname(oa.getCreditName());
reportInfo.setAlternativeNames(oa.getOtherNames());
reportInfo.setAssociatedAuthors(Arrays.asList(ri));
if (!Optional.ofNullable(oa.getSurname()).isPresent()) {
return new Tuple2<>("missing", reportInfo);
}
final String handledOsurname = handleNameSurname(oa.getSurname());
if (handledOsurname.equalsIgnoreCase("")) {
return new Tuple2<>("missing", reportInfo);
}
final String handledSurname = handleNameSurname(ri.getSurname());
if (handledSurname.equals("")) {
return new Tuple2<>("missing", reportInfo);
}
final String handledOname = Optional
.ofNullable(oa.getName())
.map(name -> handleNameSurname(name))
.orElse("");
final String handledName = Optional
.ofNullable(ri.getName())
.map(name -> handleNameSurname(name))
.orElse("");
if (verifyConstraints(new HashMap<String, String>() {
{
put("oname", handledOname);
put("osurname", handledOsurname);
put("name", handledName);
put("surname", handledSurname);
}
}, whiteList)) {
return new Tuple2<>("constraint", reportInfo);
}
String[][] input = {
{
"1", handledOsurname + " " + handledOname
},
{
"2", handledSurname + " " + handledName
}
};
// exact match word by word
Double cosineDistance = new CosineDistance().apply(input[0][1], input[1][1]);
if (Math.round((1 - cosineDistance) * 100) == 100) {
reportInfo.setLevel("cosine similarity equals to 1");
return new Tuple2<>("right", reportInfo);
}
// check if there is neither a common word. If there is not they could be wrong
if (Math.round((1 - new CosineDistance().apply(input[0][1], input[1][1])) * 100) == 0) {
// verify if there is another name that can be used by the author
if (StringUtils.isNotEmpty(oa.getCreditName())) {
try {
if (Math
.round(
1 - new CosineDistance().apply(input[1][1], handleNameSurname(oa.getCreditName()))) > 0) {
reportInfo.setLevel("not zero cosine on credit names");
return new Tuple2<>("check", reportInfo);
}
} catch (Exception e) {
reportInfo.setLevel(e.getMessage() + " " + oa.getCreditName());
return new Tuple2<>("check", reportInfo);
}
}
if (oa.getOtherNames().size() > 0) {
for (String othername : oa.getOtherNames()) {
if (StringUtils.isNotEmpty(othername))
try {
if (Math
.round(1 - new CosineDistance().apply(input[1][1], handleNameSurname(othername))) > 0) {
reportInfo.setLevel("not zero cosine on othen names");
return new Tuple2<>("check", reportInfo);
}
} catch (Exception e) {
reportInfo.setLevel(e.getMessage() + " " + othername);
return new Tuple2<>("check", reportInfo);
}
}
}
MatchService matchService = new MatchService();
List<Document> documentList = Arrays.asList(input).stream().map(contact -> {
return new Document.Builder(contact[0])
.addElement(
new Element.Builder<String>()
.setValue(contact[1])
.setType(ElementType.NAME)
.createElement())
.createDocument();
}).collect(Collectors.toList());
if (matchService.applyMatchByDocId(documentList).entrySet().size() == 0) {
double out = FuzzySearch.ratio(input[0][1], input[1][1]);
if (out < 29) {
reportInfo.setLevel("less than 29 in fuzzywuzzy");
return new Tuple2<>("wrong", reportInfo);
} else {
// TODO extend the checking to catch range of fuzzy wuzzy that could be wrong
// try using soundex techniques or merge with previous implementation or both
reportInfo.setLevel("more than 29 in fuzzywuzzy");
return new Tuple2<>("check", reportInfo);
}
}
// TODO match size is not equal to zero. Verify the match value and then decide how to preceed
} else {
// MatchService matchService = new MatchService();
//
// List<Document> documentList = Arrays.asList(input).stream().map(contact -> {
// return new Document.Builder(contact[0])
// .addElement(
// new Element.Builder<String>()
// .setValue(contact[1])
// .setType(ElementType.NAME)
// .createElement())
// .createDocument();
// }).collect(Collectors.toList());
// if (matchService.applyMatchByDocId(documentList).entrySet().size() == 1) {
if (FuzzySearch.ratio(input[0][1], input[1][1]) > 90) {
reportInfo.setLevel("more than 90 in fuzzywuzzy");
return new Tuple2<>("right", reportInfo);
} else {
reportInfo.setLevel("less than 90 in fuzzywuzzy");
return new Tuple2<>("check", reportInfo);
}
// }else{
// reportInfo.setLevel("not found a match in name matching");
// return new Tuple2<>("check", reportInfo);
// }
}
// // they have some words in common. check if orcid provides creditName or otherNames to check for distance
// //
// List<Document> documentList = Arrays.asList(input).stream().map(contact -> {
// return new Document.Builder(contact[0])
// .addElement(
// new Element.Builder<String>()
// .setValue(contact[1])
// .setType(ElementType.NAME)
// .createElement())
// .createDocument();
// }).collect(Collectors.toList());
//
// MatchService matchService = new MatchService();
//
// Map<String, List<Match<Document>>> result = matchService.applyMatchByDocId(documentList);
//
// if (result.entrySet().size() > 0) {
// reportInfo.setLevel("fuzzyMatch");
// return new Tuple2<>("right", reportInfo);
// }
return new Tuple2<>("right", reportInfo);
}
// constraints in or
private static boolean verifyConstraints(Map<String, String> param, WhiteList whitelist) {
log.info("whitelist_size : {}", whitelist.getWhitelist().size());
for (SelectionConstraints constraint : whitelist.getWhitelist()) {
if (constraint.verifyCriteria(param)) {
return true;
}
}
return false;
}
public static Tuple2<String, ReportInfo> getStringReportInfoTuple2(Tuple2<ResultInfo, OrcidAuthotitative> pair) {
Optional<OrcidAuthotitative> ooa = Optional.ofNullable(pair._2());
if (!ooa.isPresent()) {
return null;
}
OrcidAuthotitative oa = ooa.get();
ResultInfo ri = pair._1();
if (StringUtils.isBlank(ri.getSurname())) {
PacePerson pp = new PacePerson(ri.getFullname(), false);
ri.setSurname(pp.getNormalisedSurname());
ri.setName(pp.getNormalisedFirstName());
}
ReportInfo reportInfo = new ReportInfo();
reportInfo.setOid(oa.getOid());
reportInfo.setOname(oa.getName());
reportInfo.setOsurname(oa.getSurname());
reportInfo.setOcreditname(oa.getCreditName());
reportInfo.setAssociatedAuthors(Arrays.asList(ri));
int level = 1;
if (!Optional.ofNullable(oa.getSurname()).isPresent()) {
return new Tuple2<>("missing", reportInfo);
}
final String handledOsurname = handleNameSurname(oa.getSurname());
if (handledOsurname.equalsIgnoreCase("")) {
return new Tuple2<>("missing", reportInfo);
}
final String handledSurname = handleNameSurname(ri.getSurname());
if (handledSurname.equals("")) {
return new Tuple2<>("missing", reportInfo);
}
// check if oSurname and surname are equals
if (handledOsurname.equals(handledSurname)) {
reportInfo.setLevel("level" + level);
return new Tuple2<>("right", reportInfo);
}
level++;
// check if one is contained in the other
if (handledOsurname.contains(handledSurname) || handledSurname.contains(handledOsurname)) {
reportInfo.setLevel("level" + level);
return new Tuple2<>("right", reportInfo);
}
level++;
// check if one of the two is composed of more than one word. In this case concatenate the two words
// and check again (Mohammadi Peyhani vs Mohammadipeyhani)
String[] handledorcidSplit = handledOsurname.split(" ");
String[] handledresultSplit = handledSurname.split(" ");
if (handledorcidSplit.length == 2) {
String tmpSurname = handledorcidSplit[0] + handledorcidSplit[1];
if (tmpSurname.equals(handledSurname)) {
reportInfo.setLevel("level" + level);
return new Tuple2<>("check", reportInfo);
}
}
if (handledresultSplit.length == 2) {
String tmpSurname = handledresultSplit[0] + handledresultSplit[1];
if (tmpSurname.equals(handledSurname)) {
reportInfo.setLevel("level" + level);
return new Tuple2<>("check", reportInfo);
}
}
level++;
// check if the words composing the name and the surname are the same or one list contains the
// other.
// do for words of lenght bigger than two
String handledOname = "";
if (Optional.ofNullable(oa.getName()).isPresent()) {
handledOname = handleNameSurname(oa.getName());
}
String handledName = "";
if (Optional.ofNullable(ri.getName()).isPresent()) {
handledName = handleNameSurname(ri.getName());
}
final List<String> orcidList = new ArrayList<>();
final List<String> paperList = new ArrayList<>();
addInList(orcidList, handledOname);
addInList(orcidList, handledOsurname);
addInList(paperList, handledSurname);
addInList(paperList, handledName);
if (checkListContainment(reportInfo, level, orcidList, paperList))
return new Tuple2<>("check", reportInfo);
level++;
handledorcidSplit = handledOsurname.split(" ");
handledresultSplit = handledName.split(" ");
if (handledorcidSplit.length == 2) {
orcidList.clear();
orcidList.add(handledorcidSplit[0] + handledorcidSplit[1]);
addInList(orcidList, handledOname);
if (checkListContainment(reportInfo, level, orcidList, paperList)) {
return new Tuple2<>("check", reportInfo);
}
orcidList.clear();
orcidList.add(handledorcidSplit[1] + handledorcidSplit[0]);
addInList(orcidList, handledOname);
if (checkListContainment(reportInfo, level, orcidList, paperList)) {
return new Tuple2<>("check", reportInfo);
}
}
if (handledresultSplit.length == 2) {
orcidList.clear();
addInList(orcidList, handledOname);
addInList(orcidList, handledOsurname);
paperList.clear();
paperList.add(handledresultSplit[0] + handledresultSplit[1]);
addInList(paperList, handledSurname);
if (checkListContainment(reportInfo, level, orcidList, paperList))
return new Tuple2<>("check", reportInfo);
paperList.clear();
paperList.add(handledresultSplit[1] + handledresultSplit[0]);
addInList(paperList, handledSurname);
if (checkListContainment(reportInfo, level, orcidList, paperList))
return new Tuple2<>("check", reportInfo);
}
level++;
if (handledOsurname.length() > 3 && handledSurname.length() > 3) {
LevenshteinDistance l = new LevenshteinDistance();
if (l.apply(handledOsurname, handledSurname) <= 2) {
reportInfo.setLevel("level" + level);
return new Tuple2<>("check", reportInfo);
}
}
if (handledOsurname.length() > 3 && handledName.length() > 3) {
LevenshteinDistance l = new LevenshteinDistance();
if (l.apply(handledOsurname, handledName) <= 2) {
reportInfo.setLevel("level" + level);
return new Tuple2<>("check", reportInfo);
}
}
return new Tuple2<>("wrong", reportInfo);
}
private static boolean checkListContainment(ReportInfo reportInfo, int level, List<String> orcidList,
List<String> paperList) {
if (orcidList.size() <= paperList.size()) {
if (searchIn(paperList, orcidList)) {
reportInfo.setLevel("level" + level);
return true;
}
} else {
if (searchIn(orcidList, paperList)) {
reportInfo.setLevel("level" + level);
return true;
}
}
return false;
}
/**
* searches in list1 all the words of list 2
* @param list1 the list where to search for the words
* @param list2 the list containing the words to be searched
* @return true if all the words in list 2 are contained in list1
*/
private static boolean searchIn(List<String> list1, List<String> list2) {
for (String word : list2) {
if (!list1.contains(word)) {
return false;
}
}
return true;
}
private static void writeSet(Dataset<Tuple2<String, ReportInfo>> dataset, String outputPath) {
dataset
.groupByKey(
(MapFunction<Tuple2<String, ReportInfo>, String>) value -> value._2().getOid(), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, Tuple2<String, ReportInfo>, ReportInfo>) (oid, tuple2Iterator) -> {
ReportInfo reportInfo = tuple2Iterator.next()._2();
List<ResultInfo> aList = reportInfo.getAssociatedAuthors();
tuple2Iterator.forEachRemaining(tuple -> aList.addAll(tuple._2().getAssociatedAuthors()));
reportInfo.setAssociatedAuthors(aList);
return reportInfo;
}, Encoders.bean(ReportInfo.class))
.repartition(1)
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(outputPath);
}
private static <R> Dataset<R> readPath(
SparkSession spark, String inputPath, Class<R> clazz) {
return spark
.read()
.textFile(inputPath)
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
}

View File

@ -0,0 +1,72 @@
package eu.dnetlib.dhp.oa.graph.clean.authorpids;
import java.io.Serializable;
import java.util.List;
public class OrcidAuthotitative implements Serializable {
private String oid;
private String name;
private String surname;
private String creditName;
private String otherName;
private List<String> otherNames;
private String errorCode;
public String getOtherName() {
return otherName;
}
public void setOtherName(String otherName) {
this.otherName = otherName;
}
public List<String> getOtherNames() {
return otherNames;
}
public void setOtherNames(List<String> otherNames) {
this.otherNames = otherNames;
}
public String getErrorCode() {
return errorCode;
}
public void setErrorCode(String errorCode) {
this.errorCode = errorCode;
}
public String getCreditName() {
return creditName;
}
public void setCreditName(String creditName) {
this.creditName = creditName;
}
public String getOid() {
return oid;
}
public void setOid(String id) {
this.oid = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getSurname() {
return surname;
}
public void setSurname(String surname) {
this.surname = surname;
}
}

View File

@ -0,0 +1,106 @@
package eu.dnetlib.dhp.oa.graph.clean.authorpids;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.io.Serializable;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.oaf.Result;
public class PrepareResultsSparkJob implements Serializable {
private static final Logger log = LoggerFactory.getLogger(PrepareResultsSparkJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
PrepareResultsSparkJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/clean_orcid/input_prepare_results.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputPath = parser.get("inputPath");
log.info("inputPath: {}", inputPath);
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
String graphTableClassName = parser.get("graphTableClassName");
log.info("graphTableClassName: {}", graphTableClassName);
Class<? extends Result> entityClazz = (Class<? extends Result>) Class.forName(graphTableClassName);
SparkConf conf = new SparkConf();
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
prepareResult(spark, inputPath, entityClazz, outputPath);
});
}
private static <I extends Result> void prepareResult(SparkSession spark, String inputPath, Class<I> entityClazz,
String outputPath) {
Dataset<I> result = readPath(spark, inputPath, entityClazz);
result.createOrReplaceTempView("result");
String query = "select auth.name name, auth.surname surname, auth.fullname fullname, pIde.value orcid, id, " +
"collect_set(cf.value) as collectedfrom " +
"from result " +
"lateral view explode(author) a as auth " +
"lateral view explode(auth.pid)p as pIde " +
"lateral view explode (collectedfrom) c as cf " +
"where pIde.qualifier.classid = 'orcid' " +
"group by auth.name, auth.surname, auth.fullname, pIde.value, id";
spark
.sql(query)
.as(Encoders.bean(ResultInfo.class))
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(outputPath);
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
private static <R> Dataset<R> readPath(
SparkSession spark, String inputPath, Class<R> clazz) {
return spark
.read()
.textFile(inputPath)
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
}
}

View File

@ -0,0 +1,73 @@
package eu.dnetlib.dhp.oa.graph.clean.authorpids;
import java.io.Serializable;
import java.util.List;
public class ReportInfo implements Serializable {
private String oid;
private String oname;
private String osurname;
private String ocreditname;
private List<String> alternativeNames;
private List<ResultInfo> associatedAuthors;
public List<String> getAlternativeNames() {
return alternativeNames;
}
public void setAlternativeNames(List<String> alternativeNames) {
this.alternativeNames = alternativeNames;
}
private String level;
public String getLevel() {
return level;
}
public void setLevel(String level) {
this.level = level;
}
public String getOid() {
return oid;
}
public void setOid(String oid) {
this.oid = oid;
}
public String getOname() {
return oname;
}
public void setOname(String oname) {
this.oname = oname;
}
public String getOsurname() {
return osurname;
}
public void setOsurname(String osurname) {
this.osurname = osurname;
}
public String getOcreditname() {
return ocreditname;
}
public void setOcreditname(String ocreditname) {
this.ocreditname = ocreditname;
}
public List<ResultInfo> getAssociatedAuthors() {
return associatedAuthors;
}
public void setAssociatedAuthors(List<ResultInfo> associatedAuthors) {
this.associatedAuthors = associatedAuthors;
}
}

View File

@ -0,0 +1,63 @@
package eu.dnetlib.dhp.oa.graph.clean.authorpids;
import java.io.Serializable;
import java.util.List;
public class ResultInfo implements Serializable {
private String id;
private String name;
private String surname;
private String fullname;
private String orcid;
private List<String> collectedfrom;
public List<String> getCollectedfrom() {
return collectedfrom;
}
public void setCollectedfrom(List<String> collectedfrom) {
this.collectedfrom = collectedfrom;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getSurname() {
return surname;
}
public void setSurname(String surname) {
this.surname = surname;
}
public String getFullname() {
return fullname;
}
public void setFullname(String fullname) {
this.fullname = fullname;
}
public String getOrcid() {
return orcid;
}
public void setOrcid(String orcid) {
this.orcid = orcid;
}
}

View File

@ -0,0 +1,53 @@
package eu.dnetlib.dhp.oa.graph.clean.authorpids;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Type;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import eu.dnetlib.dhp.oa.graph.clean.authorpids.constraints.ConstraintResolver;
public class SelectionConstraints implements Serializable {
private List<Constraints> criteria;
public SelectionConstraints() {
}
public List<Constraints> getCriteria() {
return criteria;
}
public void setCriteria(List<Constraints> criteria) {
this.criteria = criteria;
}
public void setSc(String json) {
Type collectionType = new TypeToken<Collection<Constraints>>() {
}.getType();
criteria = new Gson().fromJson(json, collectionType);
}
// Constraints in and
public boolean verifyCriteria(final Map<String, String> param) {
for (Constraints selc : criteria) {
if (!selc.verifyCriteria(param)) {
return false;
}
}
return true;
}
public void setSelection(ConstraintResolver resolver)
throws NoSuchMethodException, InstantiationException, IllegalAccessException, InvocationTargetException {
for (Constraints cs : criteria) {
cs.setSelection(resolver);
}
}
}

View File

@ -0,0 +1,17 @@
package eu.dnetlib.dhp.oa.graph.clean.authorpids;
import java.io.Serializable;
import java.util.List;
public class WhiteList implements Serializable {
private List<SelectionConstraints> whitelist;
public List<SelectionConstraints> getWhitelist() {
return whitelist;
}
public void setWhitelist(List<SelectionConstraints> whitelist) {
this.whitelist = whitelist;
}
}

View File

@ -0,0 +1,14 @@
package eu.dnetlib.dhp.oa.graph.clean.authorpids.constraints;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@interface ConstraintClass {
String value();
}

View File

@ -0,0 +1,56 @@
package eu.dnetlib.dhp.oa.graph.clean.authorpids.constraints;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import java.util.stream.Collectors;
import io.github.classgraph.ClassGraph;
import io.github.classgraph.ClassInfo;
import io.github.classgraph.ClassInfoList;
import io.github.classgraph.ScanResult;
public class ConstraintResolver implements Serializable {
private Map<String, Class<Selection>> map = null; // = new HashMap<>();
private final ClassGraph classgraph = new ClassGraph();
public ConstraintResolver() {
try (ScanResult scanResult = // Assign scanResult in try-with-resources
classgraph // Create a new ClassGraph instance
.verbose() // If you want to enable logging to stderr
.enableAllInfo() // Scan classes, methods, fields, annotations
.whitelistPackages(
"eu.dnetlib.dhp.oa.graph.clean.authorpids.constraints") // Scan com.xyz and subpackages
.scan()) { // Perform the scan and return a ScanResult
ClassInfoList routeClassInfoList = scanResult
.getClassesWithAnnotation(
"eu.dnetlib.dhp.oa.graph.clean.authorpids.constraints.ConstraintClass");
this.map = routeClassInfoList
.stream()
.collect(
Collectors
.toMap(
value -> (String) value
.getAnnotationInfo()
.get(0)
.getParameterValues()
.get(0)
.getValue(),
value -> (Class<Selection>) ((ClassInfo) value).loadClass()));
} catch (Exception e) {
e.printStackTrace();
}
}
public Selection getSelectionCriteria(String name, String param)
throws NoSuchMethodException, IllegalAccessException, InvocationTargetException,
InstantiationException {
// return Class.forName(tmp_map.get(name)).
return map.get(name).getDeclaredConstructor((String.class)).newInstance(param);
}
}

View File

@ -0,0 +1,10 @@
package eu.dnetlib.dhp.oa.graph.clean.authorpids.constraints;
public class ConstraintResolverFactory {
public static ConstraintResolver newInstance() {
return new ConstraintResolver();
}
}

View File

@ -0,0 +1,30 @@
package eu.dnetlib.dhp.oa.graph.clean.authorpids.constraints;
import java.io.Serializable;
@ConstraintClass("contains")
public class ContainsConstraint implements Selection, Serializable {
private String param;
public ContainsConstraint() {
}
public ContainsConstraint(final String param) {
this.param = param;
}
@Override
public boolean apply(String value) {
return value.contains(param.toLowerCase());
}
public String getParam() {
return param;
}
public void setParam(String param) {
this.param = param;
}
}

View File

@ -0,0 +1,36 @@
package eu.dnetlib.dhp.oa.graph.clean.authorpids.constraints;
import java.io.Serializable;
import org.apache.commons.lang3.StringUtils;
import py4j.StringUtil;
@ConstraintClass("equals")
public class EqualConstraint implements Selection, Serializable {
private String param;
public EqualConstraint() {
}
public EqualConstraint(final String param) {
this.param = param;
}
@Override
public boolean apply(String value) {
if (StringUtils.isEmpty(value))
return false;
return value.equalsIgnoreCase(param);
}
public String getParam() {
return param;
}
public void setParam(String param) {
this.param = param;
}
}

View File

@ -0,0 +1,43 @@
package eu.dnetlib.dhp.oa.graph.clean.authorpids.constraints;
import java.lang.reflect.Type;
import com.google.gson.*;
public class InterfaceAdapter implements JsonSerializer, JsonDeserializer {
private static final String CLASSNAME = "CLASSNAME";
private static final String DATA = "DATA";
public Object deserialize(
JsonElement jsonElement,
Type type,
JsonDeserializationContext jsonDeserializationContext)
throws JsonParseException {
JsonObject jsonObject = jsonElement.getAsJsonObject();
JsonPrimitive prim = (JsonPrimitive) jsonObject.get(CLASSNAME);
String className = prim.getAsString();
Class klass = getObjectClass(className);
return jsonDeserializationContext.deserialize(jsonObject.get(DATA), klass);
}
public JsonElement serialize(
Object jsonElement, Type type, JsonSerializationContext jsonSerializationContext) {
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty(CLASSNAME, jsonElement.getClass().getName());
jsonObject.add(DATA, jsonSerializationContext.serialize(jsonElement));
return jsonObject;
}
/** **** Helper method to get the className of the object to be deserialized **** */
public Class getObjectClass(String className) {
try {
return Class.forName(className);
} catch (ClassNotFoundException e) {
// e.printStackTrace();
throw new JsonParseException(e.getMessage());
}
}
}

View File

@ -0,0 +1,36 @@
package eu.dnetlib.dhp.oa.graph.clean.authorpids.constraints;
import java.io.Serializable;
import org.apache.commons.lang3.StringUtils;
@ConstraintClass("longer")
public class LongerThanConstraints implements Serializable, Selection {
private String param;
public LongerThanConstraints(String param) {
this.param = param;
}
public LongerThanConstraints() {
};
public String getParam() {
return param;
}
public void setParam(String param) {
this.param = param;
}
@Override
public boolean apply(String value) {
if (StringUtils.isEmpty(value)) {
return false;
}
return value.length() > Integer.valueOf(param);
}
}

View File

@ -0,0 +1,7 @@
package eu.dnetlib.dhp.oa.graph.clean.authorpids.constraints;
public interface Selection {
boolean apply(String value);
}

View File

@ -0,0 +1,35 @@
package eu.dnetlib.dhp.oa.graph.clean.authorpids.constraints;
import java.io.Serializable;
import org.apache.commons.lang3.StringUtils;
@ConstraintClass("shorterorequal")
public class ShorterThanOrEqualToConstraints implements Serializable, Selection {
private String param;
public ShorterThanOrEqualToConstraints() {
};
public ShorterThanOrEqualToConstraints(String param) {
this.param = param;
}
public String getParam() {
return param;
}
public void setParam(String param) {
this.param = param;
}
@Override
public boolean apply(String value) {
if (StringUtils.isEmpty(value))
return true;
return value.length() <= Integer.valueOf(param);
}
}

View File

@ -13,6 +13,14 @@
<name>isLookupUrl</name>
<description>the address of the lookUp service</description>
</property>
<property>
<name>orcidInputPath</name>
<description>the path where to find the orcid sequence file</description>
</property>
<property>
<name>emend</name>
<description>true if the cleaning of the ORCID should be executed</description>
</property>
<property>
<name>sparkDriverMemory</name>
@ -109,7 +117,7 @@
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingDir}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/cleaned/publication</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
</spark>
@ -135,7 +143,7 @@
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingDir}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/cleaned/dataset</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
</spark>
@ -161,7 +169,7 @@
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingDir}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/otherresearchproduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/cleaned/otherresearchproduct</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
</spark>
@ -187,7 +195,7 @@
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingDir}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/software</arg>
<arg>--outputPath</arg><arg>${workingDir}/cleaned/software</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
</spark>
@ -299,6 +307,350 @@
<error to="Kill"/>
</action>
<join name="wait_clean" to="should_emend_orcid"/>
<decision name="should_emend_orcid">
<switch>
<case to="prepare_result">${wf:conf('emend') eq true}</case>
<default to="End"/>
</switch>
</decision>
<fork name="prepare_result">
<path start="prepare_publication"/>
<path start="prepare_dataset"/>
<path start="prepare_software"/>
<path start="prepare_orp"/>
</fork>
<action name="prepare_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>orcid prepare publication</name>
<class>eu.dnetlib.dhp.oa.graph.clean.authorpids.PrepareResultsSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingDir}/cleaned/publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/prepared/publication</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
</spark>
<ok to="wait_prepare"/>
<error to="Kill"/>
</action>
<action name="prepare_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>orcid prepare dataset</name>
<class>eu.dnetlib.dhp.oa.graph.clean.authorpids.PrepareResultsSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingDir}/cleaned/dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/prepared/dataset</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
</spark>
<ok to="wait_prepare"/>
<error to="Kill"/>
</action>
<action name="prepare_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>orcid prepare software</name>
<class>eu.dnetlib.dhp.oa.graph.clean.authorpids.PrepareResultsSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingDir}/cleaned/software</arg>
<arg>--outputPath</arg><arg>${workingDir}/prepared/software</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
</spark>
<ok to="wait_prepare"/>
<error to="Kill"/>
</action>
<action name="prepare_orp">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>orcid prepare orp</name>
<class>eu.dnetlib.dhp.oa.graph.clean.authorpids.PrepareResultsSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingDir}/cleaned/otherresearchproduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/prepared/otherresearchproduct</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
</spark>
<ok to="wait_prepare"/>
<error to="Kill"/>
</action>
<join name="wait_prepare" to="make_report"/>
<fork name="make_report">
<path start="report_publication"/>
<path start="report_dataset"/>
<path start="report_software"/>
<path start="report_orp"/>
</fork>
<action name="report_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Report ORCID on Publication</name>
<class>eu.dnetlib.dhp.oa.graph.clean.authorpids.MakeReportSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--outputPath</arg><arg>${workingDir}/report/publication</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/prepared/publication</arg>
<arg>--whitelist</arg><arg>${whitelist}</arg>
<arg>--orcidInputPath</arg><arg>${orcidInputPath}</arg>
</spark>
<ok to="wait_report"/>
<error to="Kill"/>
</action>
<action name="report_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Report ORCID on Dataset</name>
<class>eu.dnetlib.dhp.oa.graph.clean.authorpids.MakeReportSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--preparedInfoPath</arg><arg>${workingDir}/prepared/dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/report/dataset</arg>
<arg>--orcidInputPath</arg><arg>${orcidInputPath}</arg>
<arg>--whitelist</arg><arg>${whitelist}</arg>
</spark>
<ok to="wait_report"/>
<error to="Kill"/>
</action>
<action name="report_orp">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Report ORCID on ORP</name>
<class>eu.dnetlib.dhp.oa.graph.clean.authorpids.MakeReportSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--preparedInfoPath</arg><arg>${workingDir}/prepared/otherresearchproduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/report/otherresearchproduct</arg>
<arg>--orcidInputPath</arg><arg>${orcidInputPath}</arg>
<arg>--whitelist</arg><arg>${whitelist}</arg>
</spark>
<ok to="wait_report"/>
<error to="Kill"/>
</action>
<action name="report_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Report ORCID on Softwar</name>
<class>eu.dnetlib.dhp.oa.graph.clean.authorpids.MakeReportSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--preparedInfoPath</arg><arg>${workingDir}/prepared/software</arg>
<arg>--outputPath</arg><arg>${workingDir}/report/software</arg>
<arg>--orcidInputPath</arg><arg>${orcidInputPath}</arg>
<arg>--whitelist</arg><arg>${whitelist}</arg>
</spark>
<ok to="wait_report"/>
<error to="Kill"/>
</action>
<join name="wait_report" to="clean_orcid"/>
<fork name="clean_orcid">
<path start="clean_publication_orcid"/>
<path start="clean_dataset_orcid"/>
<path start="clean_orp_orcid"/>
<path start="clean_software_orcid"/>
</fork>
<action name="clean_publication_orcid">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Clean ORCID for Publications</name>
<class>eu.dnetlib.dhp.oa.graph.clean.authorpids.CleanAuthorPidsSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingDir}/cleaned/publication</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/publication</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--reportPath</arg><arg>${workingDir}/report/publication</arg>
</spark>
<ok to="wait_clean"/>
<error to="Kill"/>
</action>
<action name="clean_dataset_orcid">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Clean ORCID for Datasets</name>
<class>eu.dnetlib.dhp.oa.graph.clean.authorpids.CleanAuthorPidsSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--reportPath</arg><arg>${workingDir}/report/dataset</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/dataset</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--inputPath</arg><arg>${workingDir}/cleaned/dataset</arg>
</spark>
<ok to="wait_clean"/>
<error to="Kill"/>
</action>
<action name="clean_orp_orcid">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Clean ORCID for ORP</name>
<class>eu.dnetlib.dhp.oa.graph.clean.authorpids.CleanAuthorPidsSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--reportPath</arg><arg>${workingDir}/report/otherresearchproduct</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/otherresearchproduct</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--inputPath</arg><arg>${workingDir}/cleaned/otherresearchproduct</arg>
</spark>
<ok to="wait_clean"/>
<error to="Kill"/>
</action>
<action name="clean_software_orcid">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Clean ORCID for Software</name>
<class>eu.dnetlib.dhp.oa.graph.clean.authorpids.CleanAuthorPidsSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--reportPath</arg><arg>${workingDir}/report/software</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/software</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--inputPath</arg><arg>${workingDir}/cleaned/software</arg>
</spark>
<ok to="wait_clean"/>
<error to="Kill"/>
</action>
<join name="wait_clean" to="End"/>
<end name="End"/>

View File

@ -0,0 +1,33 @@
[
{
"paramName":"i",
"paramLongName":"inputPath",
"paramDescription": "the path of the sequencial file to read",
"paramRequired": true
},
{
"paramName": "out",
"paramLongName": "outputPath",
"paramDescription": "the path used to store temporary output files",
"paramRequired": true
},
{
"paramName": "ssm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "true if the spark session is managed, false otherwise",
"paramRequired": false
},
{
"paramName": "gtn",
"paramLongName": "graphTableClassName",
"paramDescription": "the table name of the result currently considering",
"paramRequired": true
},
{
"paramName": "rp",
"paramLongName": "reportPath",
"paramDescription": "The path of the prepared result informaiton",
"paramRequired": true
}
]

View File

@ -0,0 +1,26 @@
[
{
"paramName":"i",
"paramLongName":"inputPath",
"paramDescription": "the path of the sequencial file to read",
"paramRequired": true
},
{
"paramName": "out",
"paramLongName": "outputPath",
"paramDescription": "the path used to store temporary output files",
"paramRequired": true
},
{
"paramName": "ssm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "true if the spark session is managed, false otherwise",
"paramRequired": false
},
{
"paramName": "gtn",
"paramLongName": "graphTableClassName",
"paramDescription": "the table name of the result currently considering",
"paramRequired": true
}
]

View File

@ -0,0 +1,33 @@
[
{
"paramName": "rop",
"paramLongName": "reportOutputPath",
"paramDescription": "the path used to store temporary output files",
"paramRequired": true
},
{
"paramName": "ssm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "true if the spark session is managed, false otherwise",
"paramRequired": false
},
{
"paramName": "pip",
"paramLongName": "preparedInfoPath",
"paramDescription": "The path of the prepared result informaiton",
"paramRequired": true
},
{
"paramName": "oip",
"paramLongName": "orcidInputPath",
"paramDescription": "the path to the authoritative orcid information",
"paramRequired": true
},
{
"paramName":"wl",
"paramLongName":"whitelist",
"paramDescription": "the whitelist",
"paramRequired": true
}
]

View File

@ -0,0 +1,18 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
</configuration>

View File

@ -0,0 +1,428 @@
<workflow-app name="orcid_status" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>inputPath</name>
<description>the input path to read graph content</description>
</property>
<property>
<name>outputPath</name>
<description>the target path to store cleaned graph</description>
</property>
<property>
<name>orcidInputPath</name>
<description>the input path where to find the orcid authoritative information</description>
</property>
<property>
<name>orcidReportPath</name>
</property>
<property>
<name>clean</name>
<value>false</value>
<description>determines if the orcid should be cleaned in the graph (true) or the report should be produced (false)
</description>
</property>
<property>
<name>whitelist</name>
<description>the withelist</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<start to="reset_outputpath"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="reset_outputpath">
<fs>
<delete path="${outputPath}"/>
<mkdir path="${outputPath}"/>
</fs>
<ok to="perpare_result"/>
<error to="Kill"/>
</action>
<fork name="perpare_result">
<path start="prepare_publication"/>
<path start="prepare_dataset"/>
<path start="prepare_software"/>
<path start="prepare_orp"/>
</fork>
<action name="prepare_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>orcid prepare publication</name>
<class>eu.dnetlib.dhp.oa.graph.clean.authorpids.PrepareResultsSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${inputPath}/publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/publication</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
</spark>
<ok to="wait_prepare"/>
<error to="Kill"/>
</action>
<action name="prepare_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>orcid prepare dataset</name>
<class>eu.dnetlib.dhp.oa.graph.clean.authorpids.PrepareResultsSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${inputPath}/dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/dataset</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
</spark>
<ok to="wait_prepare"/>
<error to="Kill"/>
</action>
<action name="prepare_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>orcid prepare software</name>
<class>eu.dnetlib.dhp.oa.graph.clean.authorpids.PrepareResultsSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${inputPath}/software</arg>
<arg>--outputPath</arg><arg>${workingDir}/software</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
</spark>
<ok to="wait_prepare"/>
<error to="Kill"/>
</action>
<action name="prepare_orp">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>orcid prepare orp</name>
<class>eu.dnetlib.dhp.oa.graph.clean.authorpids.PrepareResultsSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${inputPath}/otherresearchproduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/otherresearchproduct</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
</spark>
<ok to="wait_prepare"/>
<error to="Kill"/>
</action>
<join name="wait_prepare" to="make_report"/>
<decision name="cleanorreport">
<switch>
<case to="make_report">${wf:conf('clean') eq false}</case>
<case to="clean_orcid_copy">${wf:conf('clean') eq true}</case>
<default to="make_report"/>
</switch>
</decision>
<fork name="make_report">
<path start="report_publication"/>
<path start="report_dataset"/>
<path start="report_software"/>
<path start="report_orp"/>
</fork>
<action name="report_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Report ORCID on Publication</name>
<class>eu.dnetlib.dhp.oa.graph.clean.authorpids.MakeReportSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--outputPath</arg><arg>${workingDir}/report/publication</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/publication</arg>
<arg>--orcidInputPath</arg><arg>${orcidInputPath}</arg>
<arg>--whitelist</arg><arg>${whitelist}</arg>
</spark>
<ok to="wait_report"/>
<error to="Kill"/>
</action>
<action name="report_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Report ORCID on Dataset</name>
<class>eu.dnetlib.dhp.oa.graph.clean.authorpids.MakeReportSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--preparedInfoPath</arg><arg>${workingDir}/dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/report/dataset</arg>
<arg>--orcidInputPath</arg><arg>${orcidInputPath}</arg>
<arg>--whitelist</arg><arg>${whitelist}</arg>
</spark>
<ok to="wait_report"/>
<error to="Kill"/>
</action>
<action name="report_orp">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Report ORCID on ORP</name>
<class>eu.dnetlib.dhp.oa.graph.clean.authorpids.MakeReportSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--preparedInfoPath</arg><arg>${workingDir}/otherresearchproduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/report/otherresearchproduct</arg>
<arg>--orcidInputPath</arg><arg>${orcidInputPath}</arg>
<arg>--whitelist</arg><arg>${whitelist}</arg>
</spark>
<ok to="wait_report"/>
<error to="Kill"/>
</action>
<action name="report_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Report ORCID on Softwar</name>
<class>eu.dnetlib.dhp.oa.graph.clean.authorpids.MakeReportSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--preparedInfoPath</arg><arg>${workingDir}/software</arg>
<arg>--outputPath</arg><arg>${workingDir}/report/software</arg>
<arg>--orcidInputPath</arg><arg>${orcidInputPath}</arg>
<arg>--whitelist</arg><arg>${whitelist}</arg>
</spark>
<ok to="wait_report"/>
<error to="Kill"/>
</action>
<join name="wait_report" to="clean_orcid"/>
<fork name="clean_orcid">
<path start="clean_publication_orcid"/>
<path start="clean_dataset_orcid"/>
<path start="clean_orp_orcid"/>
<path start="clean_software_orcid"/>
</fork>
<action name="clean_publication_orcid">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Clean ORCID for Publications</name>
<class>eu.dnetlib.dhp.oa.graph.clean.authorpids.CleanAuthorPidsSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${inputPath}/publication</arg>
<arg>--outputPath</arg><arg>${outputPath}/publication</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--reportPath</arg><arg>${workingDir}/report/publication</arg>
</spark>
<ok to="wait_clean"/>
<error to="Kill"/>
</action>
<action name="clean_dataset_orcid">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Clean ORCID for Datasets</name>
<class>eu.dnetlib.dhp.oa.graph.clean.authorpids.CleanAuthorPidsSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--reportPath</arg><arg>${workingDir}/report/dataset</arg>
<arg>--outputPath</arg><arg>${outputPath}/dataset</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--inputPath</arg><arg>${inputPath}/dataset</arg>
</spark>
<ok to="wait_clean"/>
<error to="Kill"/>
</action>
<action name="clean_orp_orcid">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Clean ORCID for ORP</name>
<class>eu.dnetlib.dhp.oa.graph.clean.authorpids.CleanAuthorPidsSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--reportPath</arg><arg>${workingDir}/report/otherresearchproduct</arg>
<arg>--outputPath</arg><arg>${outputPath}/otherresearchproduct</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--inputPath</arg><arg>${inputPath}/otherresearchproduct</arg>
</spark>
<ok to="wait_clean"/>
<error to="Kill"/>
</action>
<action name="clean_software_orcid">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Clean ORCID for Software</name>
<class>eu.dnetlib.dhp.oa.graph.clean.authorpids.CleanAuthorPidsSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--reportPath</arg><arg>${workingDir}/report/software</arg>
<arg>--outputPath</arg><arg>${outputPath}/software</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--inputPath</arg><arg>${inputPath}/software</arg>
</spark>
<ok to="wait_clean"/>
<error to="Kill"/>
</action>
<join name="wait_clean" to="End"/>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,232 @@
<workflow-app name="clean_orcid" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>inputPath</name>
<description>the input path to read graph content</description>
</property>
<property>
<name>outputPath</name>
<description>the target path to store cleaned graph</description>
</property>
<property>
<name>orcidInputPath</name>
<description>the input path where to find the orcid authoritative information</description>
</property>
<property>
<name>inputPreparedInputPath</name>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<start to="reset_outputpath"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="reset_outputpath">
<fs>
<delete path="${outputPath}"/>
<mkdir path="${outputPath}"/>
</fs>
<ok to="clean_orcid_copy"/>
<error to="Kill"/>
</action>
<fork name="clean_orcid_copy">
<path start="copy_relation"/>
<path start="copy_organization"/>
<path start="copy_projects"/>
<path start="copy_datasources"/>
</fork>
<action name="copy_relation">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${inputPath}/relation</arg>
<arg>${nameNode}/${outputPath}/relation</arg>
</distcp>
<ok to="wait_copy"/>
<error to="Kill"/>
</action>
<action name="copy_organization">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${inputPath}/organization</arg>
<arg>${nameNode}/${outputPath}/organization</arg>
</distcp>
<ok to="wait_copy"/>
<error to="Kill"/>
</action>
<action name="copy_projects">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${inputPath}/project</arg>
<arg>${nameNode}/${outputPath}/project</arg>
</distcp>
<ok to="wait_copy"/>
<error to="Kill"/>
</action>
<action name="copy_datasources">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${sourcePath}/datasource</arg>
<arg>${nameNode}/${outputPath}/datasource</arg>
</distcp>
<ok to="wait_copy"/>
<error to="Kill"/>
</action>
<join name="wait_copy" to="clean_orcid"/>
<fork name="clean_orcid">
<path start="clean_publication_orcid"/>
<path start="clean_dataset_orcid"/>
<path start="clean_orp_orcid"/>
<path start="clean_software_orcid"/>
</fork>
<action name="clean_publication_orcid">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Clean ORCID for Publications</name>
<class>eu.dnetlib.dhp.oa.graph.clean.authorpids.CleanAuthorPidsSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${inputPath}/publication</arg>
<arg>--outputPath</arg><arg>${outputPath}/publication</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--preparedInfoPath</arg><arg>${inputPreparedInfoPath}/publication</arg>
<arg>--orcidInputPath</arg><arg>${orcidInputPath}</arg>
</spark>
<ok to="wait_clean"/>
<error to="Kill"/>
</action>
<action name="clean_dataset_orcid">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Clean ORCID for Datasets</name>
<class>eu.dnetlib.dhp.oa.graph.clean.authorpids.CleanAuthorPidsSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--preparedInfoPath</arg><arg>${inputPreparedInfoPath}/dataset</arg>
<arg>--outputPath</arg><arg>${outputPath}/dataset</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--inputPath</arg><arg>${inputPath}/dataset</arg>
<arg>--orcidInputPath</arg><arg>${orcidInputPath}</arg>
</spark>
<ok to="wait_clean"/>
<error to="Kill"/>
</action>
<action name="clean_orp_orcid">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Clean ORCID for ORP</name>
<class>eu.dnetlib.dhp.oa.graph.clean.authorpids.CleanAuthorPidsSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--preparedInfoPath</arg><arg>${inputPreparedInfoPath}/otherresearchproduct</arg>
<arg>--outputPath</arg><arg>${outputPath}/otherresearchproduct</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--inputPath</arg><arg>${inputPath}/otherresearchproduct</arg>
<arg>--orcidInputPath</arg><arg>${orcidInputPath}</arg>
</spark>
<ok to="wait_clean"/>
<error to="Kill"/>
</action>
<action name="clean_software_orcid">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Clean ORCID for Software</name>
<class>eu.dnetlib.dhp.oa.graph.clean.authorpids.CleanAuthorPidsSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--preparedInfoPath</arg><arg>${inputPreparedInfoPath}/software</arg>
<arg>--outputPath</arg><arg>${outputPath}/software</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--inputPath</arg><arg>${inputPath}/software</arg>
<arg>--orcidInputPath</arg><arg>${orcidInputPath}</arg>
</spark>
<ok to="wait_clean"/>
<error to="Kill"/>
</action>
<join name="wait_clean" to="End"/>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,18 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
</configuration>

View File

@ -0,0 +1,3 @@
## This is a classpath-based import file (this header is required)
make_report classpath eu/dnetlib/dhp/oa/graph/clean_orcid/wf/report/oozie_app
clean_orcid classpath eu/dnetlib/dhp/oa/graph/clean_orcid/wf/clean/oozie_app

View File

@ -0,0 +1,231 @@
<workflow-app name="orcid_status" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>inputPath</name>
<description>the input path to read graph content</description>
</property>
<property>
<name>outputPath</name>
<description>the target path to store cleaned graph</description>
</property>
<property>
<name>orcidInputPath</name>
<description>the input path where to find the orcid authoritative information</description>
</property>
<property>
<name>clean</name>
<value>false</value>
<description>determines if the orcid should be cleaned in the graph (true) or the report should be produced (false)
</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<start to="reset_outputpath"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="reset_outputpath">
<fs>
<delete path="${outputPath}"/>
<mkdir path="${outputPath}"/>
</fs>
<ok to="perpare_result"/>
<error to="Kill"/>
</action>
<fork name="perpare_result">
<path start="prepare_publication"/>
<path start="prepare_dataset"/>
<path start="prepare_software"/>
<path start="prepare_orp"/>
</fork>
<action name="prepare_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>orcid prepare publication</name>
<class>eu.dnetlib.dhp.oa.graph.clean.authorpids.PrepareResultsSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${inputPath}/publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/publication</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
</spark>
<ok to="wait_prepare"/>
<error to="Kill"/>
</action>
<action name="prepare_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>orcid prepare dataset</name>
<class>eu.dnetlib.dhp.oa.graph.clean.authorpids.PrepareResultsSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${inputPath}/dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/dataset</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
</spark>
<ok to="wait_prepare"/>
<error to="Kill"/>
</action>
<action name="prepare_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>orcid prepare software</name>
<class>eu.dnetlib.dhp.oa.graph.clean.authorpids.PrepareResultsSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${inputPath}/software</arg>
<arg>--outputPath</arg><arg>${workingDir}/software</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
</spark>
<ok to="wait_prepare"/>
<error to="Kill"/>
</action>
<action name="prepare_orp">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>orcid prepare orp</name>
<class>eu.dnetlib.dhp.oa.graph.clean.authorpids.PrepareResultsSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${inputPath}/otherresearchproduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/otherresearchproduct</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
</spark>
<ok to="wait_prepare"/>
<error to="Kill"/>
</action>
<join name="wait_prepare" to="cleanorreport"/>
<decision name="cleanorreport">
<switch>
<case to="make_report">${wf:conf('clean') eq false}</case>
<case to="clean_orcid">${wf:conf('clean') eq true}</case>
<default to="make_report"/>
</switch>
</decision>
<action name="make_report">
<sub-workflow>
<app-path>${wf:appPath()}/make_report</app-path>
<propagate-configuration/>
<configuration>
<property>
<name>inputPreparedInfoPath</name>
<value>${workingDir}</value>
</property>
<property>
<name>orcidInputPath</name>
<value>${orcidInputPath}</value>
</property>
</configuration>
</sub-workflow>
<ok to="End"/>
<error to="Kill"/>
</action>
<action name="clean_orcid">
<sub-workflow>
<app-path>${wf:appPath()}/clean_orcid</app-path>
<propagate-configuration/>
<configuration>
<property>
<name>inputPreparedInfoPath</name>
<value>${workingDir}</value>
</property>
<property>
<name>orcidInputPath</name>
<value>${orcidInputPath}</value>
</property>
</configuration>
</sub-workflow>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,189 @@
<workflow-app name="orcid_report" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>inputPath</name>
<description>the input path to read graph content</description>
</property>
<property>
<name>outputPath</name>
<description>the target path to store cleaned graph</description>
</property>
<property>
<name>orcidInputPath</name>
<description>the input path where to find the orcid authoritative information</description>
</property>
<property>
<name>inputPreparedInfoPath</name>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<start to="reset_outputpath"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="reset_outputpath">
<fs>
<delete path="${outputPath}"/>
<mkdir path="${outputPath}"/>
</fs>
<ok to="make_report"/>
<error to="Kill"/>
</action>
<fork name="make_report">
<path start="report_publication"/>
<path start="report_dataset"/>
<path start="report_software"/>
<path start="report_orp"/>
</fork>
<action name="report_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Report ORCID on Publication</name>
<class>eu.dnetlib.dhp.oa.graph.clean.authorpids.MakeReportSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${inputPath}/publication</arg>
<arg>--outputPath</arg><arg>${outputPath}/publication</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--preparedInfoPath</arg><arg>${inputPreparedInfoPath}/publication</arg>
<arg>--orcidInputPath</arg><arg>${orcidInputPath}</arg>
</spark>
<ok to="wait_report"/>
<error to="Kill"/>
</action>
<action name="report_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Report ORCID on Dataset</name>
<class>eu.dnetlib.dhp.oa.graph.clean.authorpids.MakeReportSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--preparedInfoPath</arg><arg>${inputPreparedInfoPath}/dataset</arg>
<arg>--outputPath</arg><arg>${outputPath}/dataset</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--inputPath</arg><arg>${inputPath}/dataset</arg>
<arg>--orcidInputPath</arg><arg>${orcidInputPath}</arg>
</spark>
<ok to="wait_report"/>
<error to="Kill"/>
</action>
<action name="report_orp">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Report ORCID on ORP</name>
<class>eu.dnetlib.dhp.oa.graph.clean.authorpids.MakeReportSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--preparedInfoPath</arg><arg>${inputPreparedInfoPath}/otherresearchproduct</arg>
<arg>--outputPath</arg><arg>${outputPath}/otherresearchproduct</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--inputPath</arg><arg>${inputPath}/otherresearchproduct</arg>
<arg>--orcidInputPath</arg><arg>${orcidInputPath}</arg>
</spark>
<ok to="wait_report"/>
<error to="Kill"/>
</action>
<action name="report_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Report ORCID on Softwar</name>
<class>eu.dnetlib.dhp.oa.graph.clean.authorpids.MakeReportSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--preparedInfoPath</arg><arg>${inputPreparedInfoPath}/software</arg>
<arg>--outputPath</arg><arg>${outputPath}/software</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--inputPath</arg><arg>${inputPath}/software</arg>
<arg>--orcidInputPath</arg><arg>${orcidInputPath}</arg>
</spark>
<ok to="wait_report"/>
<error to="Kill"/>
</action>
<join name="wait_report" to="End"/>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,262 @@
package eu.dnetlib.dhp.oa.graph.clean;
import java.io.*;
import java.lang.reflect.InvocationTargetException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.text.similarity.CosineDistance;
import org.apache.commons.text.similarity.FuzzyScore;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.WhitelistBasedResolver;
import org.apache.neethi.Assertion;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import com.intuit.fuzzymatcher.component.MatchService;
import com.intuit.fuzzymatcher.domain.Document;
import com.intuit.fuzzymatcher.domain.Element;
import com.intuit.fuzzymatcher.domain.ElementType;
import com.intuit.fuzzymatcher.domain.Match;
import com.wcohen.ss.Levenstein;
import eu.dnetlib.dhp.oa.graph.clean.authorpids.*;
import eu.dnetlib.dhp.oa.graph.clean.authorpids.constraints.ConstraintResolver;
import eu.dnetlib.dhp.oa.graph.clean.authorpids.constraints.ConstraintResolverFactory;
import eu.dnetlib.dhp.schema.dump.oaf.graph.Relation;
import eu.dnetlib.dhp.schema.oaf.Publication;
import me.xdrop.fuzzywuzzy.FuzzySearch;
import scala.Tuple2;
public class CleanOrcidTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static SparkSession spark;
private static Path workingDir;
private static final Logger log = LoggerFactory.getLogger(CleanOrcidTest.class);
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files.createTempDirectory(CleanOrcidTest.class.getSimpleName());
log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(CleanOrcidTest.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession
.builder()
.appName(CleanOrcidTest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop();
}
@Test
public void cleanTest() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/clean/orcid/part-00000.gz")
.getPath();
final String reportPath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/clean/orcid/part-00000-wrong.json.gz")
.getPath();
CleanAuthorPidsSparkJob.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-outputPath", workingDir.toString() + "/publication",
"-inputPath", sourcePath,
"-graphTableClassName", "eu.dnetlib.dhp.schema.oaf.Publication",
"-reportPath", reportPath
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Publication> tmp = sc
.textFile(workingDir.toString() + "/publication")
.map(item -> OBJECT_MAPPER.readValue(item, Publication.class));
org.apache.spark.sql.Dataset<Publication> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(Publication.class));
verificationDataset.createOrReplaceTempView("publication");
Dataset<Row> checkds = spark
.sql(
"select id, auth.name, auth.surname, authPid.value " +
"from publication " +
"lateral view explode(author) a as auth " +
"lateral view explode(auth.pid) p as authPid ");
Dataset<Row> verify = checkds.filter("id='50|datacite____::ca9a732b18157bc002cbd55663fc656e'");
Assertions.assertEquals(1, verify.count());
Assertions
.assertTrue(
MakeReportSparkJob
.handleNameSurname(
verify
.select("name")
.collectAsList()
.get(0)
.getString(0))
.equals("aysegul") &&
MakeReportSparkJob
.handleNameSurname(
verify
.select("surname")
.collectAsList()
.get(0)
.getString(0))
.equals("donmez turan")
&&
verify
.select("value")
.collectAsList()
.get(0)
.getString(0)
.equals("0000-0003-4186-2339"));
verify = checkds.filter("id = '50|doiboost____::e8b80c60bc3ff984e797863f87565a52'");
Assertions.assertEquals(2, verify.count());
Assertions
.assertTrue(
MakeReportSparkJob
.handleNameSurname(
verify
.filter("name = 'Bircan'")
.select("surname")
.collectAsList()
.get(0)
.getString(0))
.equals("dinc") &&
verify
.filter("name = 'Bircan'")
.select("value")
.collectAsList()
.get(0)
.getString(0)
.equals("0000-0002-9717-6410"));
Assertions
.assertTrue(
MakeReportSparkJob
.handleNameSurname(
verify
.filter("name = 'Ayhan'")
.select("surname")
.collectAsList()
.get(0)
.getString(0))
.equals("unlu") &&
verify
.filter("name = 'Ayhan'")
.select("value")
.collectAsList()
.get(0)
.getString(0)
.equals("0000-0001-6033-7148"));
verify = checkds.filter("id = '50|datacite____::392a7e811373bf649300cb868d1c38c6'");
Assertions.assertEquals(5, verify.count());
Assertions
.assertEquals(
"0000-0002-2587-1932",
verify
.filter("name = 'Petra'")
.select("value")
.collectAsList()
.get(0)
.getString(0));
Assertions
.assertEquals(
"0000-0003-0084-3465",
verify
.filter("name = 'Petar'")
.select("value")
.collectAsList()
.get(0)
.getString(0));
Assertions
.assertEquals(
"0000-0001-8502-7816",
verify
.filter("name = 'Dario'")
.select("value")
.collectAsList()
.get(0)
.getString(0));
Assertions
.assertEquals(
"0000-0003-4488-0559",
verify
.filter("name = 'Jozica'")
.select("value")
.collectAsList()
.get(0)
.getString(0));
Assertions
.assertEquals(
"0000-0003-3189-8661",
verify
.filter("name = 'Tea'")
.select("value")
.collectAsList()
.get(0)
.getString(0));
Assertions
.assertEquals(
"0000-0003-0084-3465",
verify
.filter("name = 'Petar'")
.select("value")
.collectAsList()
.get(0)
.getString(0));
}
}

View File

@ -0,0 +1,287 @@
package eu.dnetlib.dhp.oa.graph.clean;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.Serializable;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.text.similarity.CosineDistance;
import org.apache.commons.text.similarity.FuzzyScore;
import org.junit.jupiter.api.Test;
import com.google.gson.Gson;
import com.intuit.fuzzymatcher.component.MatchService;
import com.intuit.fuzzymatcher.domain.Document;
import com.intuit.fuzzymatcher.domain.Element;
import com.intuit.fuzzymatcher.domain.ElementType;
import com.intuit.fuzzymatcher.domain.Match;
import eu.dnetlib.dhp.oa.graph.clean.authorpids.MakeReportSparkJob;
import me.xdrop.fuzzywuzzy.FuzzySearch;
public class FuzzyLogicTest {
// needed for fuzzywuzzy to get a lower bound ratio under which the authors are most probably different
String[][] wrong = {
{
"1", MakeReportSparkJob.handleNameSurname("Alex Bullock")
},
{
"2", MakeReportSparkJob.handleNameSurname("Gillian Farnie")
},
{
"3", MakeReportSparkJob.handleNameSurname("Luís Rocha")
},
{
"4", MakeReportSparkJob.handleNameSurname("Pedro Relvas")
},
{
"9", MakeReportSparkJob.handleNameSurname("Prasanth Manohar")
},
{
"10", MakeReportSparkJob.handleNameSurname("Nachimuthu Ramesh")
}
};
String[][] input = {
{
"1", MakeReportSparkJob.handleNameSurname("Dr. Ulrike Elsdoerfer Ph.D.")
},
{
"2", MakeReportSparkJob.handleNameSurname("Ulrike Elsdörfer")
},
{
"3", MakeReportSparkJob.handleNameSurname("Steven Ossont")
},
{
"4", MakeReportSparkJob.handleNameSurname("Steven J. Johnston")
},
{
"5", MakeReportSparkJob.handleNameSurname("Joanna Molyn")
},
{
"6", MakeReportSparkJob.handleNameSurname("Joanna Molyn-Blanchfield")
},
{
"7", MakeReportSparkJob.handleNameSurname("Zhang Tian-Tuo")
},
{
"8", MakeReportSparkJob.handleNameSurname("Zhang Tian tuo")
},
{
"9", MakeReportSparkJob.handleNameSurname("Prasanth Manohar")
},
{
"10", MakeReportSparkJob.handleNameSurname("Nachimuthu Ramesh")
},
{
"9", MakeReportSparkJob.handleNameSurname("Hassan Ahmed")
},
{
"10", MakeReportSparkJob.handleNameSurname("Hassan Mohamed")
},
{
"11", MakeReportSparkJob.handleNameSurname("Jonathan ODonnell")
},
{
"12", MakeReportSparkJob.handleNameSurname("Jonathon A. O Dannell")
},
{
"11", MakeReportSparkJob.handleNameSurname("Amilcar António Teiga Teixeira")
},
{
"12", MakeReportSparkJob.handleNameSurname("Amílcar Teixeira")
},
{
"13", MakeReportSparkJob.handleNameSurname("Bruno Rossion")
},
{
"14", MakeReportSparkJob.handleNameSurname("B. Rossion")
},
{
"15", MakeReportSparkJob.handleNameSurname("TINGYOU WANG")
},
{
"16", MakeReportSparkJob.handleNameSurname("Wang Ting-You")
},
{
"17", MakeReportSparkJob.handleNameSurname("Jacob Moran-Gilad")
},
{
"18", MakeReportSparkJob.handleNameSurname("Moran-Gilad Jacon")
},
{
"19", MakeReportSparkJob.handleNameSurname("Adelle Semmler")
},
{
"20", MakeReportSparkJob.handleNameSurname("Adelle Craig")
},
{
"21", MakeReportSparkJob.handleNameSurname("Ramziddin M")
},
{
"20", MakeReportSparkJob.handleNameSurname("R. Mansurov")
},
{
"21", MakeReportSparkJob.handleNameSurname("Matthew Jones")
},
{
"22", MakeReportSparkJob.handleNameSurname("Martin Fenner")
},
{
"23", MakeReportSparkJob.handleNameSurname("Surachai Karnjanakom")
},
{
"24", MakeReportSparkJob.handleNameSurname("Panya Maneechakr")
},
{
"25", MakeReportSparkJob.handleNameSurname("Surachai Karnjanakom")
},
{
"26", MakeReportSparkJob.handleNameSurname("Panya Maneechakr")
},
{
"27", MakeReportSparkJob.handleNameSurname("Dönmez Turan Aygül")
},
{
"28", MakeReportSparkJob.handleNameSurname("Mendeş Pekdemir Işık")
},
{
"29", MakeReportSparkJob.handleNameSurname("ık Mendeş Pekdemir")
},
{
"30", MakeReportSparkJob.handleNameSurname("Aygül Dönmez Turan")
}
};
@Test
public void cosineDistanceTest() {
for (int i = 0; i < input.length; i += 2) {
double cosineDistance = new CosineDistance().apply(input[i][1], input[i + 1][1]);
System.out
.println(
"CosineDistance of '" + input[i][1] + "' & '" + input[i + 1][1] + "' | Words in strings are "
+ Math.round(cosineDistance * 100) + "% dis-similar or "
+ Math.round((1 - cosineDistance) * 100) + "% similar.");
}
}
@Test
public void testAuthorFuzzyMatch() {
Function<String, String> clean = s -> MakeReportSparkJob.handleNameSurname(s);
List<Document> documentList = Arrays.asList(input).stream().map(contact -> {
return new Document.Builder(contact[0])
.addElement(
new Element.Builder<String>()
.setValue(contact[1])
.setType(ElementType.NAME)
.setPreProcessingFunction(clean)
.createElement())
.createDocument();
}).collect(Collectors.toList());
MatchService matchService = new MatchService();
Map<String, List<Match<Document>>> result = matchService.applyMatchByDocId(documentList);
result.entrySet().forEach(entry -> {
entry.getValue().forEach(match -> {
System.out
.println(
"Data: " + match.getData() + " Matched With: " + match.getMatchedWith() + " Score: "
+ match.getScore().getResult());
});
});
}
// this is not good since it depends on the order of the authors
// e.g. FuzzyScore of 'donmez turan aygul' & 'mendes pekdemir is k' | Similarity ratio 5
// FuzzyScore of 'is k mendes pekdemir' & 'aygul donmez turan' | Similarity ratio 0
@Test
public void testAuthorFuzzyApache() {
for (int i = 0; i < input.length; i += 2) {
System.out
.println(
"FuzzyScore of '" + input[i][1] + "' & '" + input[i + 1][1] + "' | Similarity ratio "
+ new FuzzyScore(Locale.getDefault()).fuzzyScore(input[i][1], input[i + 1][1]));
}
}
@Test
public void FuzzyWuzzyTest() {
applyFuzzyWuzzy(input);
}
@Test
public void FuzzyWuzzyWrongTest() throws IOException {
final String inputPath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/clean/wrongassociation.json")
.getPath();
BufferedReader reader = new BufferedReader(new FileReader(inputPath));
String line;
List<OrcidAuthor> orcidAuthorList = new ArrayList<>();
while (null != (line = reader.readLine())) {
orcidAuthorList.add(new Gson().fromJson(line, OrcidAuthor.class));
}
applyFuzzyWuzzy(orcidAuthorList);
}
private void applyFuzzyWuzzy(List<OrcidAuthor> orcidAuthorList) {
orcidAuthorList.forEach(entry -> {
String orcid = MakeReportSparkJob.handleNameSurname(entry.getOrcid());
String result = MakeReportSparkJob.handleNameSurname(entry.getResult());
System.out
.println(
"FuzzyWuzzy of '" + orcid + "' & '" + result + "' | Similarity ratio "
+ FuzzySearch.ratio(orcid, result));
});
}
private void applyFuzzyWuzzy(String[][] input) {
for (int i = 0; i < input.length; i += 2) {
System.out
.println(
"FuzzyWuzzy of '" + input[i][1] + "' & '" + input[i + 1][1] + "' | Similarity ratio "
+ FuzzySearch.ratio(input[i][1], input[i + 1][1]));
}
}
class OrcidAuthor implements Serializable {
private String orcid;
private String result;
public String getOrcid() {
return orcid;
}
public void setOrcid(String orcid) {
this.orcid = orcid;
}
public String getResult() {
return result;
}
public void setResult(String result) {
this.result = result;
}
}
}

View File

@ -0,0 +1,223 @@
package eu.dnetlib.dhp.oa.graph.clean;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.text.similarity.CosineDistance;
import org.apache.commons.text.similarity.FuzzyScore;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import com.intuit.fuzzymatcher.component.MatchService;
import com.intuit.fuzzymatcher.domain.Document;
import com.intuit.fuzzymatcher.domain.Element;
import com.intuit.fuzzymatcher.domain.ElementType;
import com.intuit.fuzzymatcher.domain.Match;
import com.wcohen.ss.Levenstein;
import eu.dnetlib.dhp.oa.graph.clean.authorpids.*;
import eu.dnetlib.dhp.oa.graph.clean.authorpids.constraints.ConstraintResolver;
import eu.dnetlib.dhp.oa.graph.clean.authorpids.constraints.ConstraintResolverFactory;
import me.xdrop.fuzzywuzzy.FuzzySearch;
import scala.Tuple2;
public class MakeReportTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static SparkSession spark;
private static Path workingDir;
private static final Logger log = LoggerFactory.getLogger(MakeReportTest.class);
final String whitelist = "{\"whitelist\":[{\"criteria\":[{\"verb\":\"shorterorequal\",\"field\":\"oname\",\"value\":\"2\"},{\"verb\":\"shorterorequal\",\"field\":\"osurname\",\"value\":\"2\"}]},{\"criteria\":[{\"verb\":\"shorterorequal\", \"field\":\"name\", \"value\":\"2\"},{\"verb\":\"shorterorequal\", \"field\":\"surname\", \"value\":\"2\"}]}, {\"criteria\":[{\"verb\":\"equals\", \"field\":\"oname\", \"value\":\"Given Names Deactivated\"},{\"verb\":\"equals\", \"field\":\"osurname\", \"value\":\"Family Name Deactivated\"}]}]}";
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files.createTempDirectory(MakeReportTest.class.getSimpleName());
log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(MakeReportTest.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession
.builder()
.appName(MakeReportTest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop();
}
@Test
public void loadBlackList() {
WhiteList loadedWhiteList = new Gson().fromJson(whitelist, WhiteList.class);
ConstraintResolver resolver = ConstraintResolverFactory.newInstance();
loadedWhiteList.getWhitelist().forEach(c -> {
try {
c.setSelection(resolver);
} catch (InvocationTargetException e) {
e.printStackTrace();
} catch (NoSuchMethodException e) {
e.printStackTrace();
} catch (InstantiationException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
});
Map<String, String> param = new HashMap<>();
param.put("oname", "Miriam");
param.put("name", "Miriam");
param.put("osurname", "Miriam");
param.put("surname", "Miriam");
loadedWhiteList.getWhitelist().forEach(c -> Assertions.assertFalse(c.verifyCriteria(param)));
param.put("oname", "P");
param.put("osurname", "tj");
Assertions
.assertEquals(
1, loadedWhiteList
.getWhitelist()
.stream()
.map(c -> c.verifyCriteria(param))
.filter(Boolean::valueOf)
.collect(Collectors.toList())
.size());
param.put("oname", "Given Names Deactivated");
param.put("osurname", "Family Name Deactivated");
Assertions
.assertEquals(
1, loadedWhiteList
.getWhitelist()
.stream()
.map(c -> c.verifyCriteria(param))
.filter(Boolean::valueOf)
.collect(Collectors.toList())
.size());
param.put("name", "P");
param.put("surname", "tj");
Assertions
.assertEquals(
2, loadedWhiteList
.getWhitelist()
.stream()
.map(c -> c.verifyCriteria(param))
.filter(Boolean::valueOf)
.collect(Collectors.toList())
.size());
param.put("oname", "Given Names Deactivated");
param.put("osurname", "Family Name Deactivated");
Assertions
.assertEquals(
2, loadedWhiteList
.getWhitelist()
.stream()
.map(c -> c.verifyCriteria(param))
.filter(Boolean::valueOf)
.collect(Collectors.toList())
.size());
}
@Test
public void makeReportTest() throws Exception {
final String inputPath = "";
final String preparedInfoPath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/clean/orcid/preparedInfo")
.getPath();
final String orcidInputPath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/clean/orcid/authors.seq")
.getPath();
MakeReportSparkJob.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-outputPath", workingDir + "/reports",
"-inputPath", inputPath,
"-preparedInfoPath", preparedInfoPath,
"-orcidInputPath", orcidInputPath,
"-graphTableClassName", "eu.dnetlib.dhp.schema.oaf.Publication",
"-whitelist", whitelist
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<ReportInfo> tmp = sc
.textFile(workingDir.toString() + "/reports/wrong")
.map(item -> OBJECT_MAPPER.readValue(item, ReportInfo.class));
Assertions.assertEquals(28, tmp.count());
}
@Test
public void prepareInfoTest() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/clean/orcid/part-00000.gz")
.getPath();
PrepareResultsSparkJob.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-outputPath", workingDir.toString() + "/preparedInfo",
"-inputPath", sourcePath,
"-graphTableClassName", "eu.dnetlib.dhp.schema.oaf.Publication"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<ResultInfo> tmp = sc
.textFile(workingDir.toString() + "/preparedInfo")
.map(item -> OBJECT_MAPPER.readValue(item, ResultInfo.class));
tmp.foreach(rdd -> System.out.println(OBJECT_MAPPER.writeValueAsString(rdd)));
}
}