fixed enriched works generation

This commit is contained in:
Enrico Ottonello 2020-06-29 18:03:16 +02:00
parent b2213b6435
commit b7b6be12a5
8 changed files with 66 additions and 67 deletions

View File

@ -2,16 +2,12 @@
package eu.dnetlib.doiboost.orcid.json;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import eu.dnetlib.doiboost.orcidnodoi.model.WorkDataNoDoi;
public class JsonHelper {
public static String createOidWork(WorkDataNoDoi workData) {
JsonObject oidWork = new JsonObject();
oidWork.addProperty("oid", workData.getOid());
oidWork.addProperty("work", new Gson().toJson(workData));
return oidWork.toString();
return new Gson().toJson(workData);
}
}

View File

@ -26,8 +26,8 @@ import eu.dnetlib.doiboost.orcidnodoi.xml.XMLRecordParserNoDoi;
public class ActivitiesDumpReader {
private static final int MAX_XML_WORKS_PARSED = -1;
private static final int XML_WORKS_PARSED_COUNTER_LOG_INTERVAL = 100000;
private static final int MAX_XML_WORKS_PARSED = 100;
private static final int XML_WORKS_PARSED_COUNTER_LOG_INTERVAL = 10;
public static void parseGzActivities(Configuration conf, String inputUri, Path outputPath)
throws Exception {

View File

@ -45,6 +45,7 @@ public class GenOrcidAuthorWork extends OrcidDSManager {
Log.info("HDFS URI: " + hdfsServerUri);
workingPath = parser.get("workingPath");
Log.info("Working Path: " + workingPath);
hdfsOrcidDefaultPath = workingPath;
activitiesFileNameTarGz = parser.get("activitiesFileNameTarGz");
Log.info("Activities File Name: " + activitiesFileNameTarGz);
outputWorksPath = parser.get("outputWorksPath");

View File

@ -24,6 +24,7 @@ import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.doiboost.orcid.json.JsonHelper;
import eu.dnetlib.doiboost.orcid.model.AuthorData;
import eu.dnetlib.doiboost.orcidnodoi.model.WorkDataNoDoi;
import eu.dnetlib.doiboost.orcidnodoi.similarity.AuthorMatcher;
@ -31,9 +32,9 @@ import scala.Tuple2;
public class SparkGenEnrichedOrcidWorks {
static Logger logger = LoggerFactory.getLogger(SparkGenEnrichedOrcidWorks.class);
public static void main(String[] args) throws IOException, Exception {
Logger logger = LoggerFactory.getLogger(SparkGenEnrichedOrcidWorks.class);
logger.info("[ SparkGenerateDoiAuthorList STARTED]");
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
@ -46,13 +47,9 @@ public class SparkGenEnrichedOrcidWorks {
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
logger.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String workingPath = parser.get("workingPath");
logger.info("workingPath: ", workingPath);
final String outputEnrichedWorksPath = parser.get("outputEnrichedWorksPath");
logger.info("outputEnrichedWorksPath: ", outputEnrichedWorksPath);
final String outputWorksPath = parser.get("outputWorksPath");
logger.info("outputWorksPath: ", outputWorksPath);
SparkConf conf = new SparkConf();
runWithSparkSession(
@ -67,30 +64,33 @@ public class SparkGenEnrichedOrcidWorks {
.createDataset(
summariesRDD.map(seq -> loadAuthorFromJson(seq._1(), seq._2())).rdd(),
Encoders.bean(AuthorData.class));
logger.info("Authors data loaded: " + summariesDataset.count());
JavaPairRDD<Text, Text> activitiesRDD = sc
.sequenceFile(workingPath + outputWorksPath + "works_X.seq", Text.class, Text.class);
.sequenceFile(workingPath + outputWorksPath + "*.seq", Text.class, Text.class);
Dataset<WorkDataNoDoi> activitiesDataset = spark
.createDataset(
activitiesRDD.map(seq -> loadWorkFromJson(seq._1(), seq._2())).rdd(),
Encoders.bean(WorkDataNoDoi.class));
logger.info("Works data loaded: " + activitiesDataset.count());
activitiesDataset
JavaRDD<Tuple2<String, String>> enrichedWorksRDD = activitiesDataset
.joinWith(
summariesDataset,
activitiesDataset.col("oid").equalTo(summariesDataset.col("oid")), "inner")
.map(
(MapFunction<Tuple2<WorkDataNoDoi, AuthorData>, Tuple2<String, WorkDataNoDoi>>) value -> {
(MapFunction<Tuple2<WorkDataNoDoi, AuthorData>, Tuple2<String, String>>) value -> {
WorkDataNoDoi w = value._1;
AuthorData a = value._2;
AuthorMatcher.match(a, w.getContributors());
return new Tuple2<>(a.getOid(), w);
return new Tuple2<>(a.getOid(), JsonHelper.createOidWork(w));
},
Encoders.tuple(Encoders.STRING(), Encoders.bean(WorkDataNoDoi.class)))
Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
.filter(Objects::nonNull)
.toJavaRDD()
.saveAsTextFile(workingPath + outputEnrichedWorksPath);
;
.toJavaRDD();
logger.info("Works enriched data created: " + enrichedWorksRDD.count());
enrichedWorksRDD.repartition(10).saveAsTextFile(workingPath + outputEnrichedWorksPath);
logger.info("Works enriched data saved");
});
}
@ -105,6 +105,7 @@ public class SparkGenEnrichedOrcidWorks {
}
private static WorkDataNoDoi loadWorkFromJson(Text orcidId, Text json) {
WorkDataNoDoi workData = new Gson().fromJson(json.toString(), WorkDataNoDoi.class);
return workData;
}

View File

@ -33,15 +33,13 @@ public class AuthorMatcher {
List<Integer> matchCounters = Arrays.asList(matchCounter);
Contributor contributor = null;
contributors.forEach(c -> {
if (normalize(c.getCreditName()).contains(normalize(author.getName())) ||
normalize(c.getCreditName()).contains(normalize(author.getSurname())) ||
((author.getOtherName() != null)
&& normalize(c.getCreditName()).contains(normalize(author.getOtherName())))) {
if (simpleMatch(c.getCreditName(), author.getName()) ||
simpleMatch(c.getCreditName(), author.getSurname()) ||
simpleMatch(c.getCreditName(), author.getOtherName())) {
matchCounters.set(0, matchCounters.get(0) + 1);
c.setSimpleMatch(true);
}
});
logger.info("match counter: " + Integer.toString(matchCounters.get(0)));
if (matchCounters.get(0) == 1) {
updateAuthorsSimpleMatch(contributors, author);
} else if (matchCounters.get(0) > 1) {
@ -50,7 +48,6 @@ public class AuthorMatcher {
.filter(c -> c.isSimpleMatch())
.map(c -> {
c.setScore(bestMatch(author.getName(), author.getSurname(), c.getCreditName()));
logger.debug("nella map: " + c.getCreditName() + " score: " + c.getScore());
return c;
})
.filter(c -> c.getScore() >= threshold)
@ -59,24 +56,21 @@ public class AuthorMatcher {
if (optCon.isPresent()) {
bestMatchContributor = optCon.get();
bestMatchContributor.setBestMatch(true);
logger.info("best match: " + bestMatchContributor.getCreditName());
updateAuthorsSimilarityMatch(contributors, author);
}
}
logger.info("UPDATED contributors: ");
contributors.forEach(c -> {
logger
.info(
c.getOid() + " - " + c.getCreditName() + " - " +
c.getName() + " - " + c.getSurname() + " - " +
c.getRole() + " - " + c.getSequence());
});
}
private static boolean simpleMatch(String name, String searchValue) {
if (searchValue == null) {
return false;
}
return normalize(name).contains(normalize(searchValue));
}
private static Double bestMatch(String authorSurname, String authorName, String contributor) {
logger.debug(authorSurname + " " + authorName + " vs " + contributor);
String[] contributorSplitted = contributor.split(" ");
if (contributorSplitted.length == 0) {
return 0.0;
@ -90,10 +84,6 @@ public class AuthorMatcher {
}
contributorSurname = joiner.toString();
}
logger
.debug(
"contributorName: " + contributorName +
" contributorSurname: " + contributorSurname);
String authorNameNrm = normalize(authorName);
String authorSurnameNrm = normalize(authorSurname);
String contributorNameNrm = normalize(contributorName);
@ -108,8 +98,6 @@ public class AuthorMatcher {
private static Double similarity(String nameA, String surnameA, String nameB, String surnameB) {
Double score = similarityJaroWinkler(nameA, surnameA, nameB, surnameB);
logger
.debug(nameA + ", " + surnameA + " <> " + nameB + ", " + surnameB + " score: " + Double.toString(score));
return score;
}
@ -118,6 +106,9 @@ public class AuthorMatcher {
}
private static String normalize(final String s) {
if (s == null) {
return new String("");
}
return nfd(s)
.toLowerCase()
// do not compact the regexes in a single expression, would cause StackOverflowError
@ -142,7 +133,6 @@ public class AuthorMatcher {
private static void updateAuthorsSimpleMatch(List<Contributor> contributors, AuthorData author) {
contributors.forEach(c -> {
if (c.isSimpleMatch()) {
logger.info("simple match on : " + c.getCreditName());
c.setName(author.getName());
c.setSurname(author.getSurname());
c.setOid(author.getOid());
@ -152,21 +142,10 @@ public class AuthorMatcher {
}
private static void updateAuthorsSimilarityMatch(List<Contributor> contributors, AuthorData author) {
logger.info("inside updateAuthorsSimilarityMatch ...");
contributors.forEach(c -> {
logger
.info(
c.getOid() + " - " + c.getCreditName() + " - " +
c.getName() + " - " + c.getSurname() + " - " +
c.getRole() + " - " + c.getSequence() + " - best: " + c.isBestMatch() + " - simpe: "
+ c.isSimpleMatch());
});
contributors
.stream()
.filter(c -> c.isBestMatch())
.forEach(c -> {
logger.info("similarity match on : " + c.getCreditName());
c.setName(author.getName());
c.setSurname(author.getSurname());
c.setOid(author.getOid());
@ -184,7 +163,6 @@ public class AuthorMatcher {
c.getSequence().equals("additional")))
.count() > 0) {
seqFound = true;
logger.info("sequence data found");
}
if (!seqFound) {
List<Integer> seqIds = Arrays.asList(0);

View File

@ -41,7 +41,6 @@ public class XMLRecordParserNoDoi {
public static WorkDataNoDoi VTDParseWorkData(byte[] bytes)
throws VtdException, EncodingException, EOFException, EntityException, ParseException, XPathParseException,
NavException, XPathEvalException {
logger.info("parsing xml ...");
final VTDGen vg = new VTDGen();
vg.setDoc(bytes);
vg.parse(true);
@ -191,6 +190,9 @@ public class XMLRecordParserNoDoi {
nameIndex++;
}
}
if (contributors.size() == 0) {
return contributors;
}
int sequenceIndex = 0;
ap.selectXPath("//work:contributor/work:contributor-attributes/work:contributor-sequence");

View File

@ -8,15 +8,24 @@
<value>true</value>
</property>
<property>
<name>oozie.launcher.mapreduce.map.java.opts</name>
<value>-Xmx4g</value>
<name>oozie.launcher.mapreduce.map.java.opts</name>
<value>-Xmx4g</value>
</property>
<property>
<name>jobTracker</name>
<value>hadoop-rm3.garr-pa1.d4science.org:8032</value>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://hadoop-rm1.garr-pa1.d4science.org:8020</value>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
</configuration>

View File

@ -71,10 +71,9 @@
<description>the shell command that downloads and puts to hdfs orcid activity file X</description>
</property>
</parameters>
<start to="ResetWorkingPath"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
@ -133,6 +132,7 @@
<arg>-n</arg><arg>${nameNode}</arg>
<arg>-f</arg><arg>ORCID_2019_activites_0.tar.gz</arg>
<arg>-ow</arg><arg>no_doi_works/works_0.seq</arg>
<arg>-oew</arg><arg>no_doi_enriched_works/</arg>
</java>
<ok to="join_node"/>
<error to="Kill"/>
@ -169,6 +169,7 @@
<arg>-n</arg><arg>${nameNode}</arg>
<arg>-f</arg><arg>ORCID_2019_activites_1.tar.gz</arg>
<arg>-ow</arg><arg>no_doi_works/works_1.seq</arg>
<arg>-oew</arg><arg>no_doi_enriched_works/</arg>
</java>
<ok to="join_node"/>
<error to="Kill"/>
@ -205,6 +206,7 @@
<arg>-n</arg><arg>${nameNode}</arg>
<arg>-f</arg><arg>ORCID_2019_activites_2.tar.gz</arg>
<arg>-ow</arg><arg>no_doi_works/works_2.seq</arg>
<arg>-oew</arg><arg>no_doi_enriched_works/</arg>
</java>
<ok to="join_node"/>
<error to="Kill"/>
@ -241,6 +243,7 @@
<arg>-n</arg><arg>${nameNode}</arg>
<arg>-f</arg><arg>ORCID_2019_activites_3.tar.gz</arg>
<arg>-ow</arg><arg>no_doi_works/works_3.seq</arg>
<arg>-oew</arg><arg>no_doi_enriched_works/</arg>
</java>
<ok to="join_node"/>
<error to="Kill"/>
@ -277,6 +280,7 @@
<arg>-n</arg><arg>${nameNode}</arg>
<arg>-f</arg><arg>ORCID_2019_activites_4.tar.gz</arg>
<arg>-ow</arg><arg>no_doi_works/works_4.seq</arg>
<arg>-oew</arg><arg>no_doi_enriched_works/</arg>
</java>
<ok to="join_node"/>
<error to="Kill"/>
@ -313,6 +317,7 @@
<arg>-n</arg><arg>${nameNode}</arg>
<arg>-f</arg><arg>ORCID_2019_activites_5.tar.gz</arg>
<arg>-ow</arg><arg>no_doi_works/works_5.seq</arg>
<arg>-oew</arg><arg>no_doi_enriched_works/</arg>
</java>
<ok to="join_node"/>
<error to="Kill"/>
@ -349,6 +354,7 @@
<arg>-n</arg><arg>${nameNode}</arg>
<arg>-f</arg><arg>ORCID_2019_activites_6.tar.gz</arg>
<arg>-ow</arg><arg>no_doi_works/works_6.seq</arg>
<arg>-oew</arg><arg>no_doi_enriched_works/</arg>
</java>
<ok to="join_node"/>
<error to="Kill"/>
@ -386,6 +392,7 @@
<arg>-n</arg><arg>${nameNode}</arg>
<arg>-f</arg><arg>ORCID_2019_activites_7.tar.gz</arg>
<arg>-ow</arg><arg>no_doi_works/works_7.seq</arg>
<arg>-oew</arg><arg>no_doi_enriched_works/</arg>
</java>
<ok to="join_node"/>
<error to="Kill"/>
@ -422,6 +429,7 @@
<arg>-n</arg><arg>${nameNode}</arg>
<arg>-f</arg><arg>ORCID_2019_activites_8.tar.gz</arg>
<arg>-ow</arg><arg>no_doi_works/works_8.seq</arg>
<arg>-oew</arg><arg>no_doi_enriched_works/</arg>
</java>
<ok to="join_node"/>
<error to="Kill"/>
@ -458,6 +466,7 @@
<arg>-n</arg><arg>${nameNode}</arg>
<arg>-f</arg><arg>ORCID_2019_activites_9.tar.gz</arg>
<arg>-ow</arg><arg>no_doi_works/works_9.seq</arg>
<arg>-oew</arg><arg>no_doi_enriched_works/</arg>
</java>
<ok to="join_node"/>
<error to="Kill"/>
@ -494,11 +503,12 @@
<arg>-n</arg><arg>${nameNode}</arg>
<arg>-f</arg><arg>ORCID_2019_activites_X.tar.gz</arg>
<arg>-ow</arg><arg>no_doi_works/works_X.seq</arg>
<arg>-oew</arg><arg>no_doi_enriched_works/</arg>
</java>
<ok to="join_node"/>
<error to="Kill"/>
</action>
<join name = "join_node" to = "Gen_Enriched_Orcid_Works"/>
<action name="Gen_Enriched_Orcid_Works">
@ -509,12 +519,14 @@
<mode>cluster</mode>
<name>Gen_Enriched_Orcid_Works</name>
<class>eu.dnetlib.doiboost.orcidnodoi.SparkGenEnrichedOrcidWorks</class>
<jar>dhp-doiboost-1.2.2-SNAPSHOT.jar</jar>
<jar>dhp-doiboost-1.2.4-SNAPSHOT.jar</jar>
<spark-opts>--num-executors 10 --conf spark.yarn.jars=&quot;hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2&quot; --executor-memory=${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory}
</spark-opts>
<arg>-w</arg><arg>${workingPath}/</arg>
<arg>-n</arg><arg>${nameNode}</arg>
<arg>-f</arg><arg>-</arg>
<arg>-ow</arg><arg>no_doi_works/</arg>
<arg>-oew</arg><arg>no_doi_enriched_works/</arg>
<arg>-oew</arg><arg>no_doi_enriched_works/output</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>