change in wf definition to have the cleaning part follow the report part directly. Modification of the name of the properties. Adjustments in test classes

This commit is contained in:
Miriam Baglioni 2020-11-30 12:00:26 +01:00
parent 44a66ad8a6
commit 02589717b0
8 changed files with 790 additions and 847 deletions

View File

@ -5,9 +5,9 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.util.*;
import eu.dnetlib.dhp.oa.graph.dump.Utils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.text.similarity.CosineDistance;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
@ -27,7 +27,9 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.PacePerson;
import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.schema.oaf.*;
import net.sf.saxon.trans.Maker;
import scala.Serializable;
import scala.Tuple2;
@ -62,12 +64,6 @@ public class CleanAuthorPidsSparkJob implements Serializable {
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
String preparedInfoPath = parser.get("preparedInfoPath");
log.info("preparedInfoPath: {} ", preparedInfoPath);
String orcidInputPath = parser.get("orcidInputPath");
log.info("orcidInputPath: {}", orcidInputPath);
String graphTableClassName = parser.get("graphTableClassName");
log.info("graphTableClassName: {}", graphTableClassName);
@ -101,187 +97,75 @@ public class CleanAuthorPidsSparkJob implements Serializable {
* @param <R>
*/
private static <R extends Result> void cleanAuthorPid(SparkSession spark, String inputPath, String outputPath,
String reportPath, Class<R> entityClazz) {
String reportPath, Class<R> entityClazz) {
Dataset<R> result = Utils.readPath(spark, inputPath, entityClazz);
Dataset<Tuple2<String, ResultInfo>> wrong = Utils
.readPath(spark, reportPath, ReportInfo.class)
.flatMap((FlatMapFunction<ReportInfo, Tuple2<String, ResultInfo>>) ri -> {
List<Tuple2<String, ResultInfo>> ret = new ArrayList<>();
ri.getAssociatedAuthors().forEach(aa -> ret.add(new Tuple2<>(aa.getId(), aa)));
return ret.iterator();
}, Encoders.tuple(Encoders.STRING(), Encoders.bean(ResultInfo.class)));
Dataset<Tuple2<String, ReportInfo>> wrong = Utils.readPath(spark, reportPath, ReportInfo.class)
.flatMap((FlatMapFunction<ReportInfo, Tuple2<String, ReportInfo>>) ri -> {
List<Tuple2<String, ReportInfo>> ret = new ArrayList<>();
ri.getAssociatedAuthors().forEach(aa -> ret.add(new Tuple2<>(aa.getId(), ri)));
return ret.iterator();
}, Encoders.tuple(Encoders.STRING(), Encoders.bean(ReportInfo.class)));
result
.joinWith(wrong, result.col("id").equalTo(wrong.col("_1")), "left")
.groupByKey((MapFunction<Tuple2<R, Tuple2<String, ResultInfo>>, String>) value -> {
return value._1().getId();
}, Encoders.STRING())
.mapGroups((MapGroupsFunction<String, Tuple2<R, Tuple2<String, ResultInfo>>, R>) (key, it) -> {
Tuple2<R, Tuple2<String, ResultInfo>> first = it.next();
R ret = first._1();
if (Optional.ofNullable(first._2()).isPresent()) {
emendOrcidIfNeeded(first._2()._2(), ret);
result.joinWith(wrong, result.col("id").equalTo(wrong.col("_1")), "left")
.map((MapFunction<Tuple2<R, Tuple2<String, ReportInfo>>, Result>) value ->{
Result r = value._1();
if(Optional.ofNullable(value._2()).isPresent()){
ReportInfo ri = value._2()._2();
String orcid = ri.getOid();
r.getAuthor().forEach(author -> {
List<StructuredProperty> pids = author.getPid();
for(StructuredProperty pid: pids){
if(pid.getQualifier().getClassid().equals("orcid")){
if(pid.getValue().equals(orcid)){
pids.remove(pid);
break;
}
}
}
});
}
it.forEachRemaining(t2 -> emendOrcidIfNeeded(t2._2()._2(), ret));
}
return r;
}, Encoders.bean(Result.class))
.write()
.option("compression","gzip")
.mode(SaveMode.Overwrite)
.json(outputPath);
return ret;
}, Encoders.bean(entityClazz))
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(outputPath);
}
// /**
// * Loads the sequence file containing the information about the orcid id, name and surname of the author.
// * It returns a dataset whose model maps the one defined in the class OrcidAuthoritative
// * @param spark the sparkSession
// * @param orcidInputPath the path where to read the sequence file
// * @return the dataset
// */
// private static Dataset<OrcidAuthotitative> loadAuthoritativeOrcid(SparkSession spark, String orcidInputPath) {
//
// JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
//
// return spark
// .createDataset(
// JavaRDD
// .toRDD(
// sc
// .sequenceFile(orcidInputPath, Text.class, Text.class)
// .map(pair -> OBJECT_MAPPER.readValue(pair._2().toString(), OrcidAuthotitative.class))),
// Encoders.bean(OrcidAuthotitative.class));
//
// }
//
// /**
// * It cleans the association with the orcid to authors whose name and surname do not match the one in the orcid sequence file
// * First step the join between the authoritative information and the prepared Info on orcid id. If the name surname of the authors do not match
// * (or the surname and the first letter of the name) null is returned. If they match the resultInfo instance is returned.
// * Second step the result info returned are grouped by key (the key is the id of the result) and a new result with as many authors as those
// * having a correct association with the orcid is returned.
// * Third step a join between the result and the authoritavide authors result is done on the result id.
// * Each author in the original result with a pid of type orcid is match against the list of the authoritative authors. If the author
// * in the original result has his pid among those in the authoritative result than the pid is retained, else is removed
// * @param spark the stpark session
// * @param inputPath the path where to find the result to emend
// * @param entityClazz the class of the result considered
// * @param outputPath the path where to write the emended result
// * @param preparedInfoPath the path where to find the selected information for the result
// * @param authoritative the authoritative association orcid name surname
// * @param <I>
// */
//
// private static <I extends Result> void cleanAuthorPid(SparkSession spark, String inputPath, Class<I> entityClazz,
// String outputPath, String preparedInfoPath,
// Dataset<OrcidAuthotitative> authoritative) {
//
// Dataset<ResultInfo> resultInfo = readPath(spark, preparedInfoPath, ResultInfo.class);
//
// Dataset<Result> checkedResult = resultInfo
// .joinWith(
// authoritative, authoritative
// .col("oid")
// .equalTo(resultInfo.col("orcid")),
// "left")
// .map((MapFunction<Tuple2<ResultInfo, OrcidAuthotitative>, ResultInfo>) pair -> {
// OrcidAuthotitative oa = pair._2();
//
// ResultInfo ri = pair._1();
//
// if (StringUtils.isBlank(ri.getSurname())) {
// PacePerson pp = new PacePerson(ri.getFullname(), false);
// ri.setSurname(pp.getNormalisedSurname());
// ri.setName(pp.getNormalisedFirstName());
// }
//
// if (!StringUtils
// .stripAccents(oa.getSurname().trim())
// .equalsIgnoreCase(StringUtils.stripAccents(ri.getSurname().trim()))) {
//
// return null;
// }
// if (!StringUtils
// .stripAccents(oa.getName().trim())
// .equalsIgnoreCase(StringUtils.stripAccents(ri.getName().trim()))) {
// if (StringUtils.stripAccents(oa.getName().trim()).charAt(0) != StringUtils
// .stripAccents(ri.getName().trim())
// .charAt(0)) {
//
// return null;
// }
// }
// return ri;
//
// }, Encoders.bean(ResultInfo.class))
// .filter(Objects::nonNull)
// .groupByKey((MapFunction<ResultInfo, String>) ri -> ri.getId(), Encoders.STRING())
// .mapGroups((MapGroupsFunction<String, ResultInfo, Result>) (key, group) -> {
// Result r = new Result();
// r.setId(key);
// List<Author> aList = new ArrayList<>();
// while (group.hasNext()) {
// ResultInfo cri = group.next();
// Author a = new Author();
// a.setName(cri.getName());
// a.setSurname(cri.getSurname());
// StructuredProperty sp = new StructuredProperty();
// sp.setValue(cri.getOrcid());
// a.setPid(Arrays.asList(sp));
// aList.add(a);
// }
// r.setAuthor(aList);
// return r;
// }, Encoders.bean(Result.class));
//
// Dataset<I> result = readPath(spark, inputPath, entityClazz);
//
// result
// .joinWith(checkedResult, result.col("id").equalTo(checkedResult.col("id")), "left")
// .map((MapFunction<Tuple2<I, Result>, I>) value -> {
// if (!Optional.ofNullable(value._2()).isPresent()) {
// return value._1();
// }
// I tocheck = value._1();
//
// for (Author author : tocheck.getAuthor()) {
// author.setPid(checkAndEmend(author, value._2()));
// }
// return tocheck;
// }, Encoders.bean(entityClazz))
// .write()
// .option("compression", "gzip")
// .mode(SaveMode.Overwrite)
// .json(outputPath);
//
// }
//
// private static List<StructuredProperty> checkAndEmend(Author author, Result authoritative) {
// List<StructuredProperty> tocheck = author.getPid();
// for (StructuredProperty tc : tocheck) {
// if (tc.getQualifier().getClassid().equals("orcid")) {
// for (Author a : authoritative.getAuthor()) {
// List<StructuredProperty> aPid = a.getPid();
// if (aPid.get(0).getValue().equals(tc.getValue())) {
// return tocheck;
// }
// }
// tocheck.remove(tc);
// }
// }
//
// return tocheck;
//
// }
private static <R extends Result> void emendOrcidIfNeeded(ResultInfo ri, R ret) {
String orcid = ri.getOrcid();
ret.getAuthor().forEach(author -> {
List<StructuredProperty> pids = author.getPid();
for (StructuredProperty pid : pids) {
if (pid.getQualifier().getClassid().equals("orcid")) {
if (pid.getValue().equals(orcid) && sameAuthor(author, ri)) {
pids.remove(pid);
break;
}
}
}
});
}
private static boolean sameAuthor(Author author, ResultInfo associatedAuthors) {
String aname = author.getName();
String asurname = author.getSurname();
if (StringUtils.isBlank(asurname)) {
PacePerson pp = new PacePerson(author.getFullname(), false);
asurname = pp.getNormalisedSurname();
aname = pp.getNormalisedFirstName();
}
Double cosineDistance = new CosineDistance()
.apply(
MakeReportSparkJob.handleNameSurname(aname + " " + asurname),
MakeReportSparkJob
.handleNameSurname(associatedAuthors.getName() + " " + associatedAuthors.getSurname()));
if (Math.round(cosineDistance) == 0) {
return true;
}
return false;
}
private static <R> Dataset<R> readPath(
SparkSession spark, String inputPath, Class<R> clazz) {

View File

@ -88,7 +88,7 @@ public class MakeReportSparkJob implements Serializable {
.toString(
MakeReportSparkJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/clean_orcid/input_clean_author_pids.json"));
"/eu/dnetlib/dhp/oa/graph/clean_orcid/input_report_author_pids.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
@ -98,10 +98,7 @@ public class MakeReportSparkJob implements Serializable {
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputPath = parser.get("inputPath");
log.info("inputPath: {}", inputPath);
String outputPath = parser.get("outputPath");
String outputPath = parser.get("reportOutputPath");
log.info("outputPath: {}", outputPath);
String preparedInfoPath = parser.get("preparedInfoPath");
@ -110,14 +107,9 @@ public class MakeReportSparkJob implements Serializable {
String orcidInputPath = parser.get("orcidInputPath");
log.info("orcidInputPath: {}", orcidInputPath);
String graphTableClassName = parser.get("graphTableClassName");
log.info("graphTableClassName: {}", graphTableClassName);
String whiteListString = parser.get("whitelist");
log.info("whitelist: {}", whiteListString);
Class<? extends Result> entityClazz = (Class<? extends Result>) Class.forName(graphTableClassName);
SparkConf conf = new SparkConf();
runWithSparkHiveSession(
conf,
@ -131,8 +123,6 @@ public class MakeReportSparkJob implements Serializable {
});
}
/**
* Loads the sequence file containing the information about the orcid id, name and surname of the author.
* It returns a dataset whose model maps the one defined in the class OrcidAuthoritative
@ -316,7 +306,7 @@ public class MakeReportSparkJob implements Serializable {
"2", handledSurname + " " + handledName
}
};
//exact match word by word
// exact match word by word
Double cosineDistance = new CosineDistance().apply(input[0][1], input[1][1]);
if (Math.round((1 - cosineDistance) * 100) == 100) {
reportInfo.setLevel("cosine similarity equals to 1");
@ -379,7 +369,7 @@ public class MakeReportSparkJob implements Serializable {
}
}
// TODO match size is not equal to zero. Verify the match value and then decide how to preceed
} else {
// MatchService matchService = new MatchService();
//
@ -393,13 +383,13 @@ public class MakeReportSparkJob implements Serializable {
// .createDocument();
// }).collect(Collectors.toList());
// if (matchService.applyMatchByDocId(documentList).entrySet().size() == 1) {
if (FuzzySearch.ratio(input[0][1], input[1][1]) > 90) {
reportInfo.setLevel("more than 90 in fuzzywuzzy");
return new Tuple2<>("right", reportInfo);
} else {
reportInfo.setLevel("less than 90 in fuzzywuzzy");
return new Tuple2<>("check", reportInfo);
}
if (FuzzySearch.ratio(input[0][1], input[1][1]) > 90) {
reportInfo.setLevel("more than 90 in fuzzywuzzy");
return new Tuple2<>("right", reportInfo);
} else {
reportInfo.setLevel("less than 90 in fuzzywuzzy");
return new Tuple2<>("check", reportInfo);
}
// }else{
// reportInfo.setLevel("not found a match in name matching");

View File

@ -24,22 +24,10 @@
"paramRequired": true
},
{
"paramName": "pip",
"paramLongName": "preparedInfoPath",
"paramDescription": "The path of the prepared result informaiton",
"paramRequired": true
},
{
"paramName": "oip",
"paramLongName": "orcidInputPath",
"paramDescription": "the path to the authoritative orcid information",
"paramName": "rp",
"paramLongName": "reportPath",
"paramDescription": "The path of the prepared result informaiton",
"paramRequired": true
},
{
"paramName":"wl",
"paramLongName":"whitelist",
"paramDescription": "the whitelist",
"paramRequired": true
}
}
]

View File

@ -1,13 +1,7 @@
[
{
"paramName":"i",
"paramLongName":"inputPath",
"paramDescription": "the path of the sequencial file to read",
"paramRequired": true
},
{
"paramName": "out",
"paramLongName": "outputPath",
"paramName": "rop",
"paramLongName": "reportOutputPath",
"paramDescription": "the path used to store temporary output files",
"paramRequired": true
},
@ -17,12 +11,6 @@
"paramDescription": "true if the spark session is managed, false otherwise",
"paramRequired": false
},
{
"paramName": "gtn",
"paramLongName": "graphTableClassName",
"paramDescription": "the table name of the result currently considering",
"paramRequired": true
},
{
"paramName": "pip",
"paramLongName": "preparedInfoPath",

View File

@ -13,6 +13,9 @@
<name>orcidInputPath</name>
<description>the input path where to find the orcid authoritative information</description>
</property>
<property>
<name>orcidReportPath</name>
</property>
<property>
<name>clean</name>
<value>false</value>
@ -185,7 +188,7 @@
<error to="Kill"/>
</action>
<join name="wait_prepare" to="cleanorreport"/>
<join name="wait_prepare" to="make_report"/>
<decision name="cleanorreport">
<switch>
@ -219,9 +222,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${inputPath}/publication</arg>
<arg>--outputPath</arg><arg>${outputPath}/publication</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--reportOutputPath</arg><arg>${reportOutputPath}/publication</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/publication</arg>
<arg>--orcidInputPath</arg><arg>${orcidInputPath}</arg>
<arg>--whitelist</arg><arg>${whitelist}</arg>
@ -248,9 +249,7 @@
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--preparedInfoPath</arg><arg>${workingDir}/dataset</arg>
<arg>--outputPath</arg><arg>${outputPath}/dataset</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--inputPath</arg><arg>${inputPath}/dataset</arg>
<arg>--reportOutputPath</arg><arg>${reportOutputPath}/dataset</arg>
<arg>--orcidInputPath</arg><arg>${orcidInputPath}</arg>
<arg>--whitelist</arg><arg>${whitelist}</arg>
</spark>
@ -276,9 +275,7 @@
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--preparedInfoPath</arg><arg>${workingDir}/otherresearchproduct</arg>
<arg>--outputPath</arg><arg>${outputPath}/otherresearchproduct</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--inputPath</arg><arg>${inputPath}/otherresearchproduct</arg>
<arg>--reportOutputPath</arg><arg>${reportOutputPath}/otherresearchproduct</arg>
<arg>--orcidInputPath</arg><arg>${orcidInputPath}</arg>
<arg>--whitelist</arg><arg>${whitelist}</arg>
</spark>
@ -304,9 +301,7 @@
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--preparedInfoPath</arg><arg>${workingDir}/software</arg>
<arg>--outputPath</arg><arg>${outputPath}/software</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--inputPath</arg><arg>${inputPath}/software</arg>
<arg>--reportOutputPath</arg><arg>${reportOutputPath}/software</arg>
<arg>--orcidInputPath</arg><arg>${orcidInputPath}</arg>
<arg>--whitelist</arg><arg>${whitelist}</arg>
</spark>
@ -314,51 +309,7 @@
<error to="Kill"/>
</action>
<join name="wait_report" to="End"/>
<fork name="clean_orcid_copy">
<path start="copy_relation"/>
<path start="copy_organization"/>
<path start="copy_projects"/>
<path start="copy_datasources"/>
</fork>
<action name="copy_relation">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${inputPath}/relation</arg>
<arg>${nameNode}/${outputPath}/relation</arg>
</distcp>
<ok to="wait_copy"/>
<error to="Kill"/>
</action>
<action name="copy_organization">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${inputPath}/organization</arg>
<arg>${nameNode}/${outputPath}/organization</arg>
</distcp>
<ok to="wait_copy"/>
<error to="Kill"/>
</action>
<action name="copy_projects">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${inputPath}/project</arg>
<arg>${nameNode}/${outputPath}/project</arg>
</distcp>
<ok to="wait_copy"/>
<error to="Kill"/>
</action>
<action name="copy_datasources">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${sourcePath}/datasource</arg>
<arg>${nameNode}/${outputPath}/datasource</arg>
</distcp>
<ok to="wait_copy"/>
<error to="Kill"/>
</action>
<join name="wait_copy" to="clean_orcid"/>
<join name="wait_report" to="clean_orcid"/>
<fork name="clean_orcid">
<path start="clean_publication_orcid"/>
@ -387,8 +338,7 @@
<arg>--inputPath</arg><arg>${inputPath}/publication</arg>
<arg>--outputPath</arg><arg>${outputPath}/publication</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/publication</arg>
<arg>--orcidInputPath</arg><arg>${orcidInputPath}</arg>
<arg>--reportPath</arg><arg>${reportOutputPath}/publication</arg>
</spark>
<ok to="wait_clean"/>
<error to="Kill"/>
@ -411,11 +361,10 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--preparedInfoPath</arg><arg>${workingDir}/dataset</arg>
<arg>--reportPath</arg><arg>${reportOutputPath}/dataset</arg>
<arg>--outputPath</arg><arg>${outputPath}/dataset</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--inputPath</arg><arg>${inputPath}/dataset</arg>
<arg>--orcidInputPath</arg><arg>${orcidInputPath}</arg>
</spark>
<ok to="wait_clean"/>
<error to="Kill"/>
@ -438,11 +387,10 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--preparedInfoPath</arg><arg>${workingDir}/otherresearchproduct</arg>
<arg>--reportPath</arg><arg>${reportOutputPath}/otherresearchproduct</arg>
<arg>--outputPath</arg><arg>${outputPath}/otherresearchproduct</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--inputPath</arg><arg>${inputPath}/otherresearchproduct</arg>
<arg>--orcidInputPath</arg><arg>${orcidInputPath}</arg>
</spark>
<ok to="wait_clean"/>
<error to="Kill"/>
@ -465,11 +413,10 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--preparedInfoPath</arg><arg>${workingDir}/software</arg>
<arg>--reportPath</arg><arg>${reportOutputPath}/software</arg>
<arg>--outputPath</arg><arg>${outputPath}/software</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--inputPath</arg><arg>${inputPath}/software</arg>
<arg>--orcidInputPath</arg><arg>${orcidInputPath}</arg>
</spark>
<ok to="wait_clean"/>
<error to="Kill"/>

View File

@ -19,6 +19,10 @@ import org.apache.neethi.Assertion;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
@ -40,11 +44,219 @@ import com.wcohen.ss.Levenstein;
import eu.dnetlib.dhp.oa.graph.clean.authorpids.*;
import eu.dnetlib.dhp.oa.graph.clean.authorpids.constraints.ConstraintResolver;
import eu.dnetlib.dhp.oa.graph.clean.authorpids.constraints.ConstraintResolverFactory;
import eu.dnetlib.dhp.schema.dump.oaf.graph.Relation;
import eu.dnetlib.dhp.schema.oaf.Publication;
import me.xdrop.fuzzywuzzy.FuzzySearch;
import scala.Tuple2;
public class CleanOrcidTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static SparkSession spark;
private static Path workingDir;
private static final Logger log = LoggerFactory.getLogger(CleanOrcidTest.class);
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files.createTempDirectory(CleanOrcidTest.class.getSimpleName());
log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(CleanOrcidTest.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession
.builder()
.appName(CleanOrcidTest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop();
}
@Test
public void cleanTest() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/clean/orcid/part-00000.gz")
.getPath();
final String reportPath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/clean/orcid/part-00000-wrong.json.gz")
.getPath();
CleanAuthorPidsSparkJob.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-outputPath", workingDir.toString() + "/publication",
"-inputPath", sourcePath,
"-graphTableClassName", "eu.dnetlib.dhp.schema.oaf.Publication",
"-reportPath", reportPath
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Publication> tmp = sc
.textFile(workingDir.toString() + "/publication")
.map(item -> OBJECT_MAPPER.readValue(item, Publication.class));
org.apache.spark.sql.Dataset<Publication> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(Publication.class));
verificationDataset.createOrReplaceTempView("publication");
Dataset<Row> checkds = spark
.sql(
"select id, auth.name, auth.surname, authPid.value " +
"from publication " +
"lateral view explode(author) a as auth " +
"lateral view explode(auth.pid) p as authPid ");
Dataset<Row> verify = checkds.filter("id='50|datacite____::ca9a732b18157bc002cbd55663fc656e'");
Assertions.assertEquals(1, verify.count());
Assertions
.assertTrue(
MakeReportSparkJob
.handleNameSurname(
verify
.select("name")
.collectAsList()
.get(0)
.getString(0))
.equals("aysegul") &&
MakeReportSparkJob
.handleNameSurname(
verify
.select("surname")
.collectAsList()
.get(0)
.getString(0))
.equals("donmez turan")
&&
verify
.select("value")
.collectAsList()
.get(0)
.getString(0)
.equals("0000-0003-4186-2339"));
verify = checkds.filter("id = '50|doiboost____::e8b80c60bc3ff984e797863f87565a52'");
Assertions.assertEquals(2, verify.count());
Assertions
.assertTrue(
MakeReportSparkJob
.handleNameSurname(
verify
.filter("name = 'Bircan'")
.select("surname")
.collectAsList()
.get(0)
.getString(0))
.equals("dinc") &&
verify
.filter("name = 'Bircan'")
.select("value")
.collectAsList()
.get(0)
.getString(0)
.equals("0000-0002-9717-6410"));
Assertions
.assertTrue(
MakeReportSparkJob
.handleNameSurname(
verify
.filter("name = 'Ayhan'")
.select("surname")
.collectAsList()
.get(0)
.getString(0))
.equals("unlu") &&
verify
.filter("name = 'Ayhan'")
.select("value")
.collectAsList()
.get(0)
.getString(0)
.equals("0000-0001-6033-7148"));
verify = checkds.filter("id = '50|datacite____::392a7e811373bf649300cb868d1c38c6'");
Assertions.assertEquals(5, verify.count());
Assertions
.assertEquals(
"0000-0002-2587-1932",
verify
.filter("name = 'Petra'")
.select("value")
.collectAsList()
.get(0)
.getString(0));
Assertions
.assertEquals(
"0000-0003-0084-3465",
verify
.filter("name = 'Petar'")
.select("value")
.collectAsList()
.get(0)
.getString(0));
Assertions
.assertEquals(
"0000-0001-8502-7816",
verify
.filter("name = 'Dario'")
.select("value")
.collectAsList()
.get(0)
.getString(0));
Assertions
.assertEquals(
"0000-0003-4488-0559",
verify
.filter("name = 'Jozica'")
.select("value")
.collectAsList()
.get(0)
.getString(0));
Assertions
.assertEquals(
"0000-0003-3189-8661",
verify
.filter("name = 'Tea'")
.select("value")
.collectAsList()
.get(0)
.getString(0));
Assertions
.assertEquals(
"0000-0003-0084-3465",
verify
.filter("name = 'Petar'")
.select("value")
.collectAsList()
.get(0)
.getString(0));
}
}

View File

@ -1,4 +1,287 @@
package eu.dnetlib.dhp.oa.graph.clean;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.Serializable;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.text.similarity.CosineDistance;
import org.apache.commons.text.similarity.FuzzyScore;
import org.junit.jupiter.api.Test;
import com.google.gson.Gson;
import com.intuit.fuzzymatcher.component.MatchService;
import com.intuit.fuzzymatcher.domain.Document;
import com.intuit.fuzzymatcher.domain.Element;
import com.intuit.fuzzymatcher.domain.ElementType;
import com.intuit.fuzzymatcher.domain.Match;
import eu.dnetlib.dhp.oa.graph.clean.authorpids.MakeReportSparkJob;
import me.xdrop.fuzzywuzzy.FuzzySearch;
public class FuzzyLogicTest {
// needed for fuzzywuzzy to get a lower bound ratio under which the authors are most probably different
String[][] wrong = {
{
"1", MakeReportSparkJob.handleNameSurname("Alex Bullock")
},
{
"2", MakeReportSparkJob.handleNameSurname("Gillian Farnie")
},
{
"3", MakeReportSparkJob.handleNameSurname("Luís Rocha")
},
{
"4", MakeReportSparkJob.handleNameSurname("Pedro Relvas")
},
{
"9", MakeReportSparkJob.handleNameSurname("Prasanth Manohar")
},
{
"10", MakeReportSparkJob.handleNameSurname("Nachimuthu Ramesh")
}
};
String[][] input = {
{
"1", MakeReportSparkJob.handleNameSurname("Dr. Ulrike Elsdoerfer Ph.D.")
},
{
"2", MakeReportSparkJob.handleNameSurname("Ulrike Elsdörfer")
},
{
"3", MakeReportSparkJob.handleNameSurname("Steven Ossont")
},
{
"4", MakeReportSparkJob.handleNameSurname("Steven J. Johnston")
},
{
"5", MakeReportSparkJob.handleNameSurname("Joanna Molyn")
},
{
"6", MakeReportSparkJob.handleNameSurname("Joanna Molyn-Blanchfield")
},
{
"7", MakeReportSparkJob.handleNameSurname("Zhang Tian-Tuo")
},
{
"8", MakeReportSparkJob.handleNameSurname("Zhang Tian tuo")
},
{
"9", MakeReportSparkJob.handleNameSurname("Prasanth Manohar")
},
{
"10", MakeReportSparkJob.handleNameSurname("Nachimuthu Ramesh")
},
{
"9", MakeReportSparkJob.handleNameSurname("Hassan Ahmed")
},
{
"10", MakeReportSparkJob.handleNameSurname("Hassan Mohamed")
},
{
"11", MakeReportSparkJob.handleNameSurname("Jonathan ODonnell")
},
{
"12", MakeReportSparkJob.handleNameSurname("Jonathon A. O Dannell")
},
{
"11", MakeReportSparkJob.handleNameSurname("Amilcar António Teiga Teixeira")
},
{
"12", MakeReportSparkJob.handleNameSurname("Amílcar Teixeira")
},
{
"13", MakeReportSparkJob.handleNameSurname("Bruno Rossion")
},
{
"14", MakeReportSparkJob.handleNameSurname("B. Rossion")
},
{
"15", MakeReportSparkJob.handleNameSurname("TINGYOU WANG")
},
{
"16", MakeReportSparkJob.handleNameSurname("Wang Ting-You")
},
{
"17", MakeReportSparkJob.handleNameSurname("Jacob Moran-Gilad")
},
{
"18", MakeReportSparkJob.handleNameSurname("Moran-Gilad Jacon")
},
{
"19", MakeReportSparkJob.handleNameSurname("Adelle Semmler")
},
{
"20", MakeReportSparkJob.handleNameSurname("Adelle Craig")
},
{
"21", MakeReportSparkJob.handleNameSurname("Ramziddin M")
},
{
"20", MakeReportSparkJob.handleNameSurname("R. Mansurov")
},
{
"21", MakeReportSparkJob.handleNameSurname("Matthew Jones")
},
{
"22", MakeReportSparkJob.handleNameSurname("Martin Fenner")
},
{
"23", MakeReportSparkJob.handleNameSurname("Surachai Karnjanakom")
},
{
"24", MakeReportSparkJob.handleNameSurname("Panya Maneechakr")
},
{
"25", MakeReportSparkJob.handleNameSurname("Surachai Karnjanakom")
},
{
"26", MakeReportSparkJob.handleNameSurname("Panya Maneechakr")
},
{
"27", MakeReportSparkJob.handleNameSurname("Dönmez Turan Aygül")
},
{
"28", MakeReportSparkJob.handleNameSurname("Mendeş Pekdemir Işık")
},
{
"29", MakeReportSparkJob.handleNameSurname("ık Mendeş Pekdemir")
},
{
"30", MakeReportSparkJob.handleNameSurname("Aygül Dönmez Turan")
}
};
@Test
public void cosineDistanceTest() {
for (int i = 0; i < input.length; i += 2) {
double cosineDistance = new CosineDistance().apply(input[i][1], input[i + 1][1]);
System.out
.println(
"CosineDistance of '" + input[i][1] + "' & '" + input[i + 1][1] + "' | Words in strings are "
+ Math.round(cosineDistance * 100) + "% dis-similar or "
+ Math.round((1 - cosineDistance) * 100) + "% similar.");
}
}
@Test
public void testAuthorFuzzyMatch() {
Function<String, String> clean = s -> MakeReportSparkJob.handleNameSurname(s);
List<Document> documentList = Arrays.asList(input).stream().map(contact -> {
return new Document.Builder(contact[0])
.addElement(
new Element.Builder<String>()
.setValue(contact[1])
.setType(ElementType.NAME)
.setPreProcessingFunction(clean)
.createElement())
.createDocument();
}).collect(Collectors.toList());
MatchService matchService = new MatchService();
Map<String, List<Match<Document>>> result = matchService.applyMatchByDocId(documentList);
result.entrySet().forEach(entry -> {
entry.getValue().forEach(match -> {
System.out
.println(
"Data: " + match.getData() + " Matched With: " + match.getMatchedWith() + " Score: "
+ match.getScore().getResult());
});
});
}
// this is not good since it depends on the order of the authors
// e.g. FuzzyScore of 'donmez turan aygul' & 'mendes pekdemir is k' | Similarity ratio 5
// FuzzyScore of 'is k mendes pekdemir' & 'aygul donmez turan' | Similarity ratio 0
@Test
public void testAuthorFuzzyApache() {
for (int i = 0; i < input.length; i += 2) {
System.out
.println(
"FuzzyScore of '" + input[i][1] + "' & '" + input[i + 1][1] + "' | Similarity ratio "
+ new FuzzyScore(Locale.getDefault()).fuzzyScore(input[i][1], input[i + 1][1]));
}
}
@Test
public void FuzzyWuzzyTest() {
applyFuzzyWuzzy(input);
}
@Test
public void FuzzyWuzzyWrongTest() throws IOException {
final String inputPath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/clean/wrongassociation.json")
.getPath();
BufferedReader reader = new BufferedReader(new FileReader(inputPath));
String line;
List<OrcidAuthor> orcidAuthorList = new ArrayList<>();
while (null != (line = reader.readLine())) {
orcidAuthorList.add(new Gson().fromJson(line, OrcidAuthor.class));
}
applyFuzzyWuzzy(orcidAuthorList);
}
private void applyFuzzyWuzzy(List<OrcidAuthor> orcidAuthorList) {
orcidAuthorList.forEach(entry -> {
String orcid = MakeReportSparkJob.handleNameSurname(entry.getOrcid());
String result = MakeReportSparkJob.handleNameSurname(entry.getResult());
System.out
.println(
"FuzzyWuzzy of '" + orcid + "' & '" + result + "' | Similarity ratio "
+ FuzzySearch.ratio(orcid, result));
});
}
private void applyFuzzyWuzzy(String[][] input) {
for (int i = 0; i < input.length; i += 2) {
System.out
.println(
"FuzzyWuzzy of '" + input[i][1] + "' & '" + input[i + 1][1] + "' | Similarity ratio "
+ FuzzySearch.ratio(input[i][1], input[i + 1][1]));
}
}
class OrcidAuthor implements Serializable {
private String orcid;
private String result;
public String getOrcid() {
return orcid;
}
public void setOrcid(String orcid) {
this.orcid = orcid;
}
public String getResult() {
return result;
}
public void setResult(String result) {
this.result = result;
}
}
}

View File

@ -1,18 +1,17 @@
package eu.dnetlib.dhp.oa.graph.clean;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import com.intuit.fuzzymatcher.component.MatchService;
import com.intuit.fuzzymatcher.domain.Document;
import com.intuit.fuzzymatcher.domain.Element;
import com.intuit.fuzzymatcher.domain.ElementType;
import com.intuit.fuzzymatcher.domain.Match;
import com.wcohen.ss.Levenstein;
import eu.dnetlib.dhp.oa.graph.clean.authorpids.*;
import eu.dnetlib.dhp.oa.graph.clean.authorpids.constraints.ConstraintResolver;
import eu.dnetlib.dhp.oa.graph.clean.authorpids.constraints.ConstraintResolverFactory;
import me.xdrop.fuzzywuzzy.FuzzySearch;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.text.similarity.CosineDistance;
@ -28,545 +27,197 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import com.intuit.fuzzymatcher.component.MatchService;
import com.intuit.fuzzymatcher.domain.Document;
import com.intuit.fuzzymatcher.domain.Element;
import com.intuit.fuzzymatcher.domain.ElementType;
import com.intuit.fuzzymatcher.domain.Match;
import com.wcohen.ss.Levenstein;
import eu.dnetlib.dhp.oa.graph.clean.authorpids.*;
import eu.dnetlib.dhp.oa.graph.clean.authorpids.constraints.ConstraintResolver;
import eu.dnetlib.dhp.oa.graph.clean.authorpids.constraints.ConstraintResolverFactory;
import me.xdrop.fuzzywuzzy.FuzzySearch;
import scala.Tuple2;
public class MakeReportTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static SparkSession spark;
private static Path workingDir;
private static final Logger log = LoggerFactory.getLogger(MakeReportTest.class);
final String whitelist = "{\"whitelist\":[{\"criteria\":[{\"verb\":\"shorterorequal\",\"field\":\"oname\",\"value\":\"2\"},{\"verb\":\"shorterorequal\",\"field\":\"osurname\",\"value\":\"2\"}]},{\"criteria\":[{\"verb\":\"shorterorequal\", \"field\":\"name\", \"value\":\"2\"},{\"verb\":\"shorterorequal\", \"field\":\"surname\", \"value\":\"2\"}]}, {\"criteria\":[{\"verb\":\"equals\", \"field\":\"oname\", \"value\":\"Given Names Deactivated\"},{\"verb\":\"equals\", \"field\":\"osurname\", \"value\":\"Family Name Deactivated\"}]}]}";
// needed for fuzzywuzzy to get a lower bound ratio under which the authors are most probably different
String[][] wrong = {
{
"1", MakeReportSparkJob.handleNameSurname("Alex Bullock")
},
{
"2", MakeReportSparkJob.handleNameSurname("Gillian Farnie")
},
{
"3", MakeReportSparkJob.handleNameSurname("Luís Rocha")
},
{
"4", MakeReportSparkJob.handleNameSurname("Pedro Relvas")
},
{
"9", MakeReportSparkJob.handleNameSurname("Prasanth Manohar")
},
{
"10", MakeReportSparkJob.handleNameSurname("Nachimuthu Ramesh")
}
};
String[][] input = {
{
"1", MakeReportSparkJob.handleNameSurname("Dr. Ulrike Elsdoerfer Ph.D.")
},
{
"2", MakeReportSparkJob.handleNameSurname("Ulrike Elsdörfer")
},
{
"3", MakeReportSparkJob.handleNameSurname("Steven Ossont")
},
{
"4", MakeReportSparkJob.handleNameSurname("Steven J. Johnston")
},
{
"5", MakeReportSparkJob.handleNameSurname("Joanna Molyn")
},
{
"6", MakeReportSparkJob.handleNameSurname("Joanna Molyn-Blanchfield")
},
{
"7", MakeReportSparkJob.handleNameSurname("Zhang Tian-Tuo")
},
{
"8", MakeReportSparkJob.handleNameSurname("Zhang Tiantuo")
},
{
"9", MakeReportSparkJob.handleNameSurname("Prasanth Manohar")
},
{
"10", MakeReportSparkJob.handleNameSurname("Nachimuthu Ramesh")
},
{
"9", MakeReportSparkJob.handleNameSurname("Hassan Ahmed")
},
{
"10", MakeReportSparkJob.handleNameSurname("Hassan Mohamed")
},
{
"11", MakeReportSparkJob.handleNameSurname("Jonathan ODonnell")
},
{
"12", MakeReportSparkJob.handleNameSurname("Jonathon A. O Dannell")
},
{
"11", MakeReportSparkJob.handleNameSurname("Amilcar António Teiga Teixeira")
},
{
"12", MakeReportSparkJob.handleNameSurname("Amílcar Teixeira")
},
{
"13", MakeReportSparkJob.handleNameSurname("Bruno Rossion")
},
{
"14", MakeReportSparkJob.handleNameSurname("B. Rossion")
},
{
"15", MakeReportSparkJob.handleNameSurname("TINGYOU WANG")
},
{
"16", MakeReportSparkJob.handleNameSurname("Wang Ting-You")
},
{
"17", MakeReportSparkJob.handleNameSurname("Jacob Moran-Gilad")
},
{
"18", MakeReportSparkJob.handleNameSurname("Moran-Gilad Jacon")
},
{
"19", MakeReportSparkJob.handleNameSurname("Adelle Semmler")
},
{
"20", MakeReportSparkJob.handleNameSurname("Adelle Craig")
},
{
"21", MakeReportSparkJob.handleNameSurname("Ramziddin M")
},
{
"20", MakeReportSparkJob.handleNameSurname("R. Mansurov")
},
{
"21", MakeReportSparkJob.handleNameSurname("Matthew Jones")
},
{
"22", MakeReportSparkJob.handleNameSurname("Martin Fenner")
},
{
"23", MakeReportSparkJob.handleNameSurname("Surachai Karnjanakom")
},
{
"24", MakeReportSparkJob.handleNameSurname("Panya Maneechakr")
},
{
"25", MakeReportSparkJob.handleNameSurname("Surachai Karnjanakom")
},
{
"26", MakeReportSparkJob.handleNameSurname("Panya Maneechakr")
}
};
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files.createTempDirectory(MakeReportTest.class.getSimpleName());
log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(MakeReportTest.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession
.builder()
.appName(MakeReportTest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop();
}
@Test
public void loadBlackList() {
WhiteList loadedWhiteList = new Gson().fromJson(whitelist, WhiteList.class);
ConstraintResolver resolver = ConstraintResolverFactory.newInstance();
loadedWhiteList.getWhitelist().forEach(c -> {
try {
c.setSelection(resolver);
} catch (InvocationTargetException e) {
e.printStackTrace();
} catch (NoSuchMethodException e) {
e.printStackTrace();
} catch (InstantiationException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
});
Map<String, String> param = new HashMap<>();
param.put("oname", "Miriam");
param.put("name", "Miriam");
param.put("osurname", "Miriam");
param.put("surname", "Miriam");
loadedWhiteList.getWhitelist().forEach(c -> Assertions.assertFalse(c.verifyCriteria(param)));
param.put("oname", "P");
param.put("osurname", "tj");
Assertions
.assertEquals(
1, loadedWhiteList
.getWhitelist()
.stream()
.map(c -> c.verifyCriteria(param))
.filter(Boolean::valueOf)
.collect(Collectors.toList())
.size());
param.put("oname", "Given Names Deactivated");
param.put("osurname", "Family Name Deactivated");
Assertions
.assertEquals(
1, loadedWhiteList
.getWhitelist()
.stream()
.map(c -> c.verifyCriteria(param))
.filter(Boolean::valueOf)
.collect(Collectors.toList())
.size());
param.put("name", "P");
param.put("surname", "tj");
Assertions
.assertEquals(
2, loadedWhiteList
.getWhitelist()
.stream()
.map(c -> c.verifyCriteria(param))
.filter(Boolean::valueOf)
.collect(Collectors.toList())
.size());
param.put("oname", "Given Names Deactivated");
param.put("osurname", "Family Name Deactivated");
Assertions
.assertEquals(
2, loadedWhiteList
.getWhitelist()
.stream()
.map(c -> c.verifyCriteria(param))
.filter(Boolean::valueOf)
.collect(Collectors.toList())
.size());
}
@Test
public void loadOrcid() {
final String orcidInputPath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/clean/authors.seq")
.getPath();
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
sc
.sequenceFile(orcidInputPath, Text.class, Text.class)
.foreach(pair -> {
OrcidAuthotitative tmp = OBJECT_MAPPER.readValue(pair._2().toString(), OrcidAuthotitative.class);
if (tmp.getOid().equalsIgnoreCase("0000-0001-6689-5079")) {
System.out.println(pair._2().toString());
}
});
}
// @Test
// public void serializeConstraint() throws JsonProcessingException {
// WhiteList whiteList = new WhiteList();
//
// SelectionConstraints sc = new SelectionConstraints();
//
// Constraints c = new Constraints();
// c.setVerb("verb");
// c.setValue("value");
// c.setField("field");
//
// sc.setCriteria(Arrays.asList(c, c));
//
// whiteList.setWhitelist(Arrays.asList(sc));
//
// System.out.println(OBJECT_MAPPER.writeValueAsString(whiteList));
// }
@Test
public void makeReportTest() throws Exception {
final String inputPath = "";
final String preparedInfoPath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/clean/part-00000-prepared_list.json.gz")
.getPath();
final String orcidInputPath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/clean/authors.seq")
.getPath();
MakeReportSparkJob.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-outputPath", workingDir + "/reports",
"-inputPath", inputPath,
"-preparedInfoPath", preparedInfoPath,
"-orcidInputPath", orcidInputPath,
"-graphTableClassName", "eu.dnetlib.dhp.schema.oaf.Publication",
"-whitelist", whitelist
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<ReportInfo> tmp = sc
.textFile(workingDir.toString() + "/reports/wrong")
.map(item -> OBJECT_MAPPER.readValue(item, ReportInfo.class));
Assertions.assertEquals(1, tmp.count());
}
@Test
public void prepareInfoTest() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/clean/part-00000.gz")
.getPath();
PrepareResultsSparkJob.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-outputPath", workingDir.toString() + "/preparedInfo",
"-inputPath", sourcePath,
"-graphTableClassName", "eu.dnetlib.dhp.schema.oaf.Publication"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<ResultInfo> tmp = sc
.textFile(workingDir.toString() + "/preparedInfo")
.map(item -> OBJECT_MAPPER.readValue(item, ResultInfo.class));
tmp.foreach(rdd -> System.out.println(OBJECT_MAPPER.writeValueAsString(rdd)));
}
@Test
public void cleanNameSurname() {
String name = "Hübner";
String surname = "Hubenr";
name = StringUtils
.stripAccents(name.toLowerCase().trim())
.replaceAll("[^a-z\\s]+", " ");
surname = StringUtils
.stripAccents(surname.toLowerCase().trim())
.replace(".", "")
.replaceAll("[^a-z\\s]+", " ")
.replace("'", " ")
.trim();
Levenstein l = new Levenstein();
double score = Math.abs(l.score(name, surname));
System.out.println(score);
}
@Test
public void testMakeReport() throws IOException {
ResultInfo ri = new ResultInfo();
ri.setName("Prasanth");
ri.setSurname("Manohar");
OrcidAuthotitative oa = new OrcidAuthotitative();
oa.setName("Nachimuthu");
oa.setSurname("Ramesh");
Tuple2<ResultInfo, OrcidAuthotitative> t2 = new Tuple2<ResultInfo, OrcidAuthotitative>(ri, oa);
Tuple2<String, ReportInfo> tmp = MakeReportSparkJob.getStringReportInfoTuple2(t2);
System.out.println(new Gson().toJson(tmp._2(), ReportInfo.class));
ri.setName("Sophia");
ri.setSurname("Hooper");
oa.setName("Man");
oa.setSurname("Yang");
oa.setCreditName("Man Yang (previous known as Sophia Yang Hooper)");
WhiteList wl = new Gson().fromJson(whitelist, WhiteList.class);
ConstraintResolver resolver = ConstraintResolverFactory.newInstance();
wl.getWhitelist().forEach(c -> {
try {
c.setSelection(resolver);
} catch (InvocationTargetException e) {
e.printStackTrace();
} catch (NoSuchMethodException e) {
e.printStackTrace();
} catch (InstantiationException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
});
System.out
.println(
OBJECT_MAPPER
.writeValueAsString(
MakeReportSparkJob.getStringReportInfoFuzzyTuple2(new Tuple2<>(ri, oa), wl)._2()));
}
@Test
public void cosineDistanceTest() {
for (int i = 0; i < input.length; i += 2) {
double cosineDistance = new CosineDistance().apply(input[i][1], input[i + 1][1]);
System.out
.println(
"CosineDistance of '" + input[i][1] + "' & '" + input[i + 1][1] + "' | Words in strings are "
+ Math.round(cosineDistance * 100) + "% dis-similar or "
+ Math.round((1 - cosineDistance) * 100) + "% similar.");
}
}
@Test
public void testAuthorFuzzyMatch() {
Function<String, String> clean = s -> MakeReportSparkJob.handleNameSurname(s);
List<Document> documentList = Arrays.asList(input).stream().map(contact -> {
return new Document.Builder(contact[0])
.addElement(
new Element.Builder<String>()
.setValue(contact[1])
.setType(ElementType.NAME)
.setPreProcessingFunction(clean)
.createElement())
.createDocument();
}).collect(Collectors.toList());
MatchService matchService = new MatchService();
Map<String, List<Match<Document>>> result = matchService.applyMatchByDocId(documentList);
result.entrySet().forEach(entry -> {
entry.getValue().forEach(match -> {
System.out
.println(
"Data: " + match.getData() + " Matched With: " + match.getMatchedWith() + " Score: "
+ match.getScore().getResult());
});
});
}
@Test
public void testAuthorFuzzyApache(){
for (int i = 0; i < input.length; i += 2) {
System.out
.println(
"FuzzyScore of '" + input[i][1] + "' & '" + input[i + 1][1] + "' | Similarity ratio "
+ new FuzzyScore(Locale.getDefault()).fuzzyScore(input[i][1], input[i + 1][1]));
}
}
@Test
public void FuzzyWuzzyTest() {
applyFuzzyWuzzy(input);
}
@Test
public void FuzzyWuzzyWrongTest() throws IOException {
final String inputPath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/clean/wrongassociation.json")
.getPath();
BufferedReader reader = new BufferedReader(new FileReader(inputPath));
String line;
List<OrcidAuthor> orcidAuthorList = new ArrayList<>();
while (null != (line = reader.readLine())) {
orcidAuthorList.add(new Gson().fromJson(line, OrcidAuthor.class));
}
applyFuzzyWuzzy(orcidAuthorList);
}
private void applyFuzzyWuzzy(List<OrcidAuthor> orcidAuthorList) {
orcidAuthorList.forEach(entry -> {
String orcid = MakeReportSparkJob.handleNameSurname(entry.getOrcid());
String result = MakeReportSparkJob.handleNameSurname(entry.getResult());
System.out
.println(
"FuzzyWuzzy of '" + orcid + "' & '" + result + "' | Similarity ratio "
+ FuzzySearch.ratio(orcid, result));
});
}
private void applyFuzzyWuzzy(String[][] input) {
for (int i = 0; i < input.length; i += 2) {
System.out
.println(
"FuzzyWuzzy of '" + input[i][1] + "' & '" + input[i + 1][1] + "' | Similarity ratio "
+ FuzzySearch.ratio(input[i][1], input[i + 1][1]));
}
}
class OrcidAuthor implements Serializable {
private String orcid;
private String result;
public String getOrcid() {
return orcid;
}
public void setOrcid(String orcid) {
this.orcid = orcid;
}
public String getResult() {
return result;
}
public void setResult(String result) {
this.result = result;
}
}
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static SparkSession spark;
private static Path workingDir;
private static final Logger log = LoggerFactory.getLogger(MakeReportTest.class);
final String whitelist = "{\"whitelist\":[{\"criteria\":[{\"verb\":\"shorterorequal\",\"field\":\"oname\",\"value\":\"2\"},{\"verb\":\"shorterorequal\",\"field\":\"osurname\",\"value\":\"2\"}]},{\"criteria\":[{\"verb\":\"shorterorequal\", \"field\":\"name\", \"value\":\"2\"},{\"verb\":\"shorterorequal\", \"field\":\"surname\", \"value\":\"2\"}]}, {\"criteria\":[{\"verb\":\"equals\", \"field\":\"oname\", \"value\":\"Given Names Deactivated\"},{\"verb\":\"equals\", \"field\":\"osurname\", \"value\":\"Family Name Deactivated\"}]}]}";
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files.createTempDirectory(MakeReportTest.class.getSimpleName());
log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(MakeReportTest.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession
.builder()
.appName(MakeReportTest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop();
}
@Test
public void loadBlackList() {
WhiteList loadedWhiteList = new Gson().fromJson(whitelist, WhiteList.class);
ConstraintResolver resolver = ConstraintResolverFactory.newInstance();
loadedWhiteList.getWhitelist().forEach(c -> {
try {
c.setSelection(resolver);
} catch (InvocationTargetException e) {
e.printStackTrace();
} catch (NoSuchMethodException e) {
e.printStackTrace();
} catch (InstantiationException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
});
Map<String, String> param = new HashMap<>();
param.put("oname", "Miriam");
param.put("name", "Miriam");
param.put("osurname", "Miriam");
param.put("surname", "Miriam");
loadedWhiteList.getWhitelist().forEach(c -> Assertions.assertFalse(c.verifyCriteria(param)));
param.put("oname", "P");
param.put("osurname", "tj");
Assertions
.assertEquals(
1, loadedWhiteList
.getWhitelist()
.stream()
.map(c -> c.verifyCriteria(param))
.filter(Boolean::valueOf)
.collect(Collectors.toList())
.size());
param.put("oname", "Given Names Deactivated");
param.put("osurname", "Family Name Deactivated");
Assertions
.assertEquals(
1, loadedWhiteList
.getWhitelist()
.stream()
.map(c -> c.verifyCriteria(param))
.filter(Boolean::valueOf)
.collect(Collectors.toList())
.size());
param.put("name", "P");
param.put("surname", "tj");
Assertions
.assertEquals(
2, loadedWhiteList
.getWhitelist()
.stream()
.map(c -> c.verifyCriteria(param))
.filter(Boolean::valueOf)
.collect(Collectors.toList())
.size());
param.put("oname", "Given Names Deactivated");
param.put("osurname", "Family Name Deactivated");
Assertions
.assertEquals(
2, loadedWhiteList
.getWhitelist()
.stream()
.map(c -> c.verifyCriteria(param))
.filter(Boolean::valueOf)
.collect(Collectors.toList())
.size());
}
@Test
public void makeReportTest() throws Exception {
final String inputPath = "";
final String preparedInfoPath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/clean/orcid/preparedInfo")
.getPath();
final String orcidInputPath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/clean/orcid/authors.seq")
.getPath();
MakeReportSparkJob.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-outputPath", workingDir + "/reports",
"-inputPath", inputPath,
"-preparedInfoPath", preparedInfoPath,
"-orcidInputPath", orcidInputPath,
"-graphTableClassName", "eu.dnetlib.dhp.schema.oaf.Publication",
"-whitelist", whitelist
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<ReportInfo> tmp = sc
.textFile(workingDir.toString() + "/reports/wrong")
.map(item -> OBJECT_MAPPER.readValue(item, ReportInfo.class));
Assertions.assertEquals(28, tmp.count());
}
@Test
public void prepareInfoTest() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/clean/orcid/part-00000.gz")
.getPath();
PrepareResultsSparkJob.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-outputPath", workingDir.toString() + "/preparedInfo",
"-inputPath", sourcePath,
"-graphTableClassName", "eu.dnetlib.dhp.schema.oaf.Publication"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<ResultInfo> tmp = sc
.textFile(workingDir.toString() + "/preparedInfo")
.map(item -> OBJECT_MAPPER.readValue(item, ResultInfo.class));
tmp.foreach(rdd -> System.out.println(OBJECT_MAPPER.writeValueAsString(rdd)));
}
}