added workflow for generating authors with dois data sequence file

This commit is contained in:
Enrico Ottonello 2020-04-24 15:50:40 +02:00
parent c03ac6e5bb
commit 941e94af06
12 changed files with 473 additions and 9 deletions

View File

@ -0,0 +1,150 @@
package eu.dnetlib.doiboost.orcid;
import eu.dnetlib.doiboost.orcid.json.JsonWriter;
import eu.dnetlib.doiboost.orcid.model.WorkData;
import eu.dnetlib.doiboost.orcid.xml.XMLRecordParser;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.mortbay.log.Log;
public class ActivitiesDecompressor {
private static final int MAX_XML_WORKS_PARSED = -1;
public static void parseGzActivities(Configuration conf, String inputUri, Path outputPath)
throws Exception {
String uri = inputUri;
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path inputPath = new Path(uri);
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
CompressionCodec codec = factory.getCodec(inputPath);
if (codec == null) {
System.err.println("No codec found for " + uri);
System.exit(1);
}
CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension());
InputStream gzipInputStream = null;
try {
gzipInputStream = codec.createInputStream(fs.open(inputPath));
parseTarActivities(fs, conf, gzipInputStream, outputPath);
} finally {
Log.debug("Closing gzip stream");
IOUtils.closeStream(gzipInputStream);
}
}
private static void parseTarActivities(
FileSystem fs, Configuration conf, InputStream gzipInputStream, Path outputPath) {
int counter = 0;
int doiFound = 0;
int errorFromOrcidFound = 0;
int xmlParserErrorFound = 0;
try (TarArchiveInputStream tais = new TarArchiveInputStream(gzipInputStream)) {
TarArchiveEntry entry = null;
try (SequenceFile.Writer writer =
SequenceFile.createWriter(
conf,
SequenceFile.Writer.file(outputPath),
SequenceFile.Writer.keyClass(Text.class),
SequenceFile.Writer.valueClass(Text.class))) {
while ((entry = tais.getNextTarEntry()) != null) {
String filename = entry.getName();
try {
if (entry.isDirectory() || !filename.contains("works")) {
} else {
Log.debug("XML work entry name: " + entry.getName());
counter++;
BufferedReader br =
new BufferedReader(
new InputStreamReader(
tais)); // Read directly from tarInput
String line;
StringBuffer buffer = new StringBuffer();
while ((line = br.readLine()) != null) {
buffer.append(line);
}
WorkData workData =
XMLRecordParser.VTDParseWorkData(buffer.toString().getBytes());
if (workData != null) {
if (workData.getErrorCode() != null) {
errorFromOrcidFound += 1;
Log.debug(
"error from Orcid with code "
+ workData.getErrorCode()
+ " for entry "
+ entry.getName());
continue;
}
if (workData.isDoiFound()) {
String jsonData = JsonWriter.create(workData);
Log.debug("oid: " + workData.getOid() + " data: " + jsonData);
final Text key = new Text(workData.getOid());
final Text value = new Text(jsonData);
try {
writer.append(key, value);
} catch (IOException e) {
Log.debug("Writing to sequence file: " + e.getMessage());
Log.debug(e);
throw new RuntimeException(e);
}
doiFound += 1;
}
} else {
Log.warn(
"Data not retrievable ["
+ entry.getName()
+ "] "
+ buffer.toString());
xmlParserErrorFound += 1;
}
}
} catch (Exception e) {
Log.warn(
"Parsing work from tar archive and xml work: "
+ filename
+ " "
+ e.getMessage());
Log.warn(e);
}
if ((counter % 100000) == 0) {
Log.info("Current xml works parsed: " + counter);
}
if ((MAX_XML_WORKS_PARSED > -1) && (counter > MAX_XML_WORKS_PARSED)) {
break;
}
}
}
} catch (IOException e) {
Log.warn("Parsing work from gzip archive: " + e.getMessage());
Log.warn(e);
throw new RuntimeException(e);
}
Log.info("Activities parse completed");
Log.info("Total XML works parsed: " + counter);
Log.info("Total doi found: " + doiFound);
Log.info("Error from Orcid found: " + errorFromOrcidFound);
Log.info("Error parsing xml work found: " + xmlParserErrorFound);
}
}

View File

@ -0,0 +1,53 @@
package eu.dnetlib.doiboost.orcid;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import java.io.IOException;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.mortbay.log.Log;
public class OrcidAuthorsDOIsDataGen extends OrcidDSManager {
private String activitiesFileNameTarGz;
private String outputAuthorsDOIsPath;
public static void main(String[] args) throws IOException, Exception {
OrcidAuthorsDOIsDataGen orcidAuthorsDOIsDataGen = new OrcidAuthorsDOIsDataGen();
orcidAuthorsDOIsDataGen.loadArgs(args);
orcidAuthorsDOIsDataGen.generateAuthorsDOIsData();
}
public void generateAuthorsDOIsData() throws Exception {
Configuration conf = initConfigurationObject();
FileSystem fs = initFileSystemObject(conf);
String tarGzUri =
hdfsServerUri.concat(hdfsOrcidDefaultPath).concat(activitiesFileNameTarGz);
Path outputPath =
new Path(
hdfsServerUri
.concat(hdfsOrcidDefaultPath)
.concat(outputAuthorsDOIsPath)
.concat("authors_dois.seq"));
ActivitiesDecompressor.parseGzActivities(conf, tarGzUri, outputPath);
}
private void loadArgs(String[] args) throws IOException, Exception {
final ArgumentApplicationParser parser =
new ArgumentApplicationParser(
IOUtils.toString(
OrcidAuthorsDOIsDataGen.class.getResourceAsStream(
"/eu/dnetlib/dhp/doiboost/create_orcid_authors_dois_data.json")));
parser.parseArgument(args);
hdfsServerUri = parser.get("hdfsServerUri");
Log.info("HDFS URI: " + hdfsServerUri);
hdfsOrcidDefaultPath = parser.get("hdfsOrcidDefaultPath");
Log.info("Default Path: " + hdfsOrcidDefaultPath);
activitiesFileNameTarGz = parser.get("activitiesFileNameTarGz");
Log.info("Activities File Name: " + activitiesFileNameTarGz);
outputAuthorsDOIsPath = parser.get("outputAuthorsDOIsPath");
Log.info("Output Authors DOIs Data: " + outputAuthorsDOIsPath);
}
}

View File

@ -11,8 +11,8 @@ import org.mortbay.log.Log;
public class OrcidDSManager {
private String hdfsServerUri;
private String hdfsOrcidDefaultPath;
protected String hdfsServerUri;
protected String hdfsOrcidDefaultPath;
private String summariesFileNameTarGz;
private String outputAuthorsPath;
@ -35,7 +35,7 @@ public class OrcidDSManager {
SummariesDecompressor.parseGzSummaries(conf, tarGzUri, outputPath);
}
private Configuration initConfigurationObject() {
protected Configuration initConfigurationObject() {
// ====== Init HDFS File System Object
Configuration conf = new Configuration();
// Set FileSystem URI
@ -46,7 +46,7 @@ public class OrcidDSManager {
return conf;
}
private FileSystem initFileSystemObject(Configuration conf) {
protected FileSystem initFileSystemObject(Configuration conf) {
// Get the filesystem - HDFS
FileSystem fs = null;
try {

View File

@ -82,7 +82,8 @@ public class SummariesDecompressor {
buffer.append(line);
}
AuthorData authorData =
XMLRecordParser.VTDParse(buffer.toString().getBytes());
XMLRecordParser.VTDParseAuthorData(
buffer.toString().getBytes());
if (authorData != null) {
if (authorData.getErrorCode() != null) {
errorFromOrcidFound += 1;

View File

@ -2,6 +2,7 @@ package eu.dnetlib.doiboost.orcid.json;
import com.google.gson.JsonObject;
import eu.dnetlib.doiboost.orcid.model.AuthorData;
import eu.dnetlib.doiboost.orcid.model.WorkData;
public class JsonWriter {
@ -15,4 +16,11 @@ public class JsonWriter {
}
return author.toString();
}
public static String create(WorkData workData) {
JsonObject work = new JsonObject();
work.addProperty("oid", workData.getOid());
work.addProperty("doi", workData.getDoi());
return work.toString();
}
}

View File

@ -0,0 +1,42 @@
package eu.dnetlib.doiboost.orcid.model;
public class WorkData {
private String oid;
private String doi;
private boolean doiFound = false;
public boolean isDoiFound() {
return doiFound;
}
public void setDoiFound(boolean doiFound) {
this.doiFound = doiFound;
}
public String getOid() {
return oid;
}
public void setOid(String oid) {
this.oid = oid;
}
public String getDoi() {
return doi;
}
public void setDoi(String doi) {
this.doi = doi;
}
public String getErrorCode() {
return errorCode;
}
public void setErrorCode(String errorCode) {
this.errorCode = errorCode;
}
private String errorCode;
}

View File

@ -10,6 +10,7 @@ import com.ximpleware.VTDNav;
import eu.dnetlib.dhp.parser.utility.VtdException;
import eu.dnetlib.dhp.parser.utility.VtdUtilityParser;
import eu.dnetlib.doiboost.orcid.model.AuthorData;
import eu.dnetlib.doiboost.orcid.model.WorkData;
import java.util.Arrays;
import java.util.List;
@ -26,9 +27,13 @@ public class XMLRecordParser {
private static final String NS_RECORD_URL = "http://www.orcid.org/ns/record";
private static final String NS_RECORD = "record";
private static final String NS_ERROR_URL = "http://www.orcid.org/ns/error";
private static final String NS_WORK = "work";
private static final String NS_WORK_URL = "http://www.orcid.org/ns/work";
private static final String NS_ERROR = "error";
public static AuthorData VTDParse(byte[] bytes)
public static AuthorData VTDParseAuthorData(byte[] bytes)
throws VtdException, EncodingException, EOFException, EntityException, ParseException {
final VTDGen vg = new VTDGen();
vg.setDoc(bytes);
@ -78,4 +83,44 @@ public class XMLRecordParser {
}
return authorData;
}
public static WorkData VTDParseWorkData(byte[] bytes)
throws VtdException, EncodingException, EOFException, EntityException, ParseException {
final VTDGen vg = new VTDGen();
vg.setDoc(bytes);
vg.parse(true);
final VTDNav vn = vg.getNav();
final AutoPilot ap = new AutoPilot(vn);
ap.declareXPathNameSpace(NS_COMMON, NS_COMMON_URL);
ap.declareXPathNameSpace(NS_WORK, NS_WORK_URL);
ap.declareXPathNameSpace(NS_ERROR, NS_ERROR_URL);
WorkData workData = new WorkData();
final List<String> errors = VtdUtilityParser.getTextValue(ap, vn, "//error:response-code");
if (!errors.isEmpty()) {
workData.setErrorCode(errors.get(0));
return workData;
}
List<VtdUtilityParser.Node> workNodes =
VtdUtilityParser.getTextValuesWithAttributes(
ap, vn, "//work:work", Arrays.asList("path"));
if (!workNodes.isEmpty()) {
final String oid = (workNodes.get(0).getAttributes().get("path")).split("/")[1];
workData.setOid(oid);
} else {
return null;
}
final List<String> dois =
VtdUtilityParser.getTextValue(
ap,
vn,
"//common:external-id-type[text()=\"doi\"]/../common:external-id-value");
if (!dois.isEmpty()) {
workData.setDoi(dois.get(0));
workData.setDoiFound(true);
}
return workData;
}
}

View File

@ -0,0 +1,6 @@
[
{"paramName":"n", "paramLongName":"hdfsServerUri", "paramDescription": "the server uri", "paramRequired": true},
{"paramName":"d", "paramLongName":"hdfsOrcidDefaultPath", "paramDescription": "the default work path", "paramRequired": true},
{"paramName":"f", "paramLongName":"activitiesFileNameTarGz", "paramDescription": "the name of the activities orcid file", "paramRequired": true},
{"paramName":"o", "paramLongName":"outputAuthorsDOIsPath", "paramDescription": "the relative folder of the sequencial file to write", "paramRequired": true}
]

View File

@ -0,0 +1,22 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.action.sharelib.for.java</name>
<value>spark2</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
<property>
<name>oozie.launcher.mapreduce.map.java.opts</name>
<value>-Xmx4g</value>
</property>
</configuration>

View File

@ -0,0 +1,39 @@
<workflow-app name="Generate Orcid Authors DOIs Data" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>workingPath_activities</name>
<description>the working dir base path</description>
</property>
</parameters>
<start to="ResetWorkingPath"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ResetWorkingPath">
<fs>
<delete path='${workingPath_activities}/output'/>
<mkdir path='${workingPath_activities}/output'/>
</fs>
<ok to="GenerateOrcidAuthorsDOIsData"/>
<error to="Kill"/>
</action>
<action name="GenerateOrcidAuthorsDOIsData">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.doiboost.orcid.OrcidAuthorsDOIsDataGen</main-class>
<arg>-d</arg><arg>${workingPath_activities}/</arg>
<arg>-n</arg><arg>${nameNode}</arg>
<arg>-f</arg><arg>ORCID_2019_activites_0.tar.gz</arg>
<arg>-o</arg><arg>output/</arg>
</java>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -3,13 +3,14 @@ package eu.dnetlib.doiboost.orcid.xml;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import eu.dnetlib.doiboost.orcid.model.AuthorData;
import eu.dnetlib.doiboost.orcid.model.WorkData;
import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.Test;
public class XMLRecordParserTest {
@Test
public void testOrcidXMLRecordParser() throws Exception {
public void testOrcidAuthorDataXMLParser() throws Exception {
String xml =
IOUtils.toString(
@ -17,7 +18,7 @@ public class XMLRecordParserTest {
XMLRecordParser p = new XMLRecordParser();
AuthorData authorData = p.VTDParse(xml.getBytes());
AuthorData authorData = p.VTDParseAuthorData(xml.getBytes());
assertNotNull(authorData);
assertNotNull(authorData.getName());
System.out.println("name: " + authorData.getName());
@ -32,9 +33,27 @@ public class XMLRecordParserTest {
XMLRecordParser p = new XMLRecordParser();
AuthorData authorData = p.VTDParse(xml.getBytes());
AuthorData authorData = p.VTDParseAuthorData(xml.getBytes());
assertNotNull(authorData);
assertNotNull(authorData.getErrorCode());
System.out.println("error: " + authorData.getErrorCode());
}
@Test
public void testOrcidWorkDataXMLParser() throws Exception {
String xml =
IOUtils.toString(
this.getClass()
.getResourceAsStream("activity_work_0000-0002-5982-8983.xml"));
XMLRecordParser p = new XMLRecordParser();
WorkData workData = p.VTDParseWorkData(xml.getBytes());
assertNotNull(workData);
assertNotNull(workData.getOid());
System.out.println("oid: " + workData.getOid());
assertNotNull(workData.getDoi());
System.out.println("doi: " + workData.getDoi());
}
}

View File

@ -0,0 +1,79 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<work:work xmlns:address="http://www.orcid.org/ns/address"
xmlns:email="http://www.orcid.org/ns/email" xmlns:history="http://www.orcid.org/ns/history"
xmlns:employment="http://www.orcid.org/ns/employment"
xmlns:education="http://www.orcid.org/ns/education"
xmlns:other-name="http://www.orcid.org/ns/other-name"
xmlns:deprecated="http://www.orcid.org/ns/deprecated"
xmlns:funding="http://www.orcid.org/ns/funding"
xmlns:research-resource="http://www.orcid.org/ns/research-resource"
xmlns:service="http://www.orcid.org/ns/service"
xmlns:researcher-url="http://www.orcid.org/ns/researcher-url"
xmlns:distinction="http://www.orcid.org/ns/distinction"
xmlns:internal="http://www.orcid.org/ns/internal"
xmlns:membership="http://www.orcid.org/ns/membership"
xmlns:person="http://www.orcid.org/ns/person"
xmlns:personal-details="http://www.orcid.org/ns/personal-details"
xmlns:bulk="http://www.orcid.org/ns/bulk" xmlns:common="http://www.orcid.org/ns/common"
xmlns:record="http://www.orcid.org/ns/record" xmlns:keyword="http://www.orcid.org/ns/keyword"
xmlns:activities="http://www.orcid.org/ns/activities"
xmlns:qualification="http://www.orcid.org/ns/qualification"
xmlns:external-identifier="http://www.orcid.org/ns/external-identifier"
xmlns:error="http://www.orcid.org/ns/error"
xmlns:preferences="http://www.orcid.org/ns/preferences"
xmlns:invited-position="http://www.orcid.org/ns/invited-position"
xmlns:work="http://www.orcid.org/ns/work"
xmlns:peer-review="http://www.orcid.org/ns/peer-review" put-code="50101152"
path="/0000-0001-5349-4030/work/50101152" visibility="public">
<common:created-date>2018-11-01T19:49:45.562Z</common:created-date>
<common:last-modified-date>2018-11-01T19:49:45.562Z</common:last-modified-date>
<common:source>
<common:source-client-id>
<common:uri>https://orcid.org/client/0000-0002-5982-8983</common:uri>
<common:path>0000-0002-5982-8983</common:path>
<common:host>orcid.org</common:host>
</common:source-client-id>
<common:source-name>Scopus - Elsevier</common:source-name>
</common:source>
<work:title>
<common:title>"Calling Out" in class: Degrees of candor in addressing social injustices in
racially homogenous and heterogeneous U.S. history classrooms</common:title>
</work:title>
<work:journal-title>Journal of Social Studies Research</work:journal-title>
<work:citation>
<work:citation-type>bibtex</work:citation-type>
<work:citation-value>@article{Massaro2018,title = {{"}Calling Out{"} in class: Degrees of
candor in addressing social injustices in racially homogenous and heterogeneous U.S.
history classrooms},journal = {Journal of Social Studies Research},year = {2018},author
= {Parkhouse, H. and Massaro, V.R.}}</work:citation-value>
</work:citation>
<work:type>journal-article</work:type>
<common:publication-date>
<common:year>2018</common:year>
</common:publication-date>
<common:external-ids>
<common:external-id>
<common:external-id-type>doi</common:external-id-type>
<common:external-id-value>10.1016/j.jssr.2018.01.004</common:external-id-value>
<common:external-id-normalized transient="true"
>10.1016/j.jssr.2018.01.004</common:external-id-normalized>
<common:external-id-relationship>self</common:external-id-relationship>
</common:external-id>
<common:external-id>
<common:external-id-type>eid</common:external-id-type>
<common:external-id-value>2-s2.0-85041949043</common:external-id-value>
<common:external-id-normalized transient="true"
>2-s2.0-85041949043</common:external-id-normalized>
<common:external-id-relationship>self</common:external-id-relationship>
</common:external-id>
</common:external-ids>
<common:url>http://www.scopus.com/inward/record.url?eid=2-s2.0-85041949043&amp;partnerID=MN8TOARS</common:url>
<work:contributors>
<work:contributor>
<work:credit-name>Parkhouse, H.</work:credit-name>
</work:contributor>
<work:contributor>
<work:credit-name>Massaro, V.R.</work:credit-name>
</work:contributor>
</work:contributors>
</work:work>