forked from D-Net/dnet-hadoop
parser of orcid summaries from tar gz file on hdfs, that creates a sequence file with authors informations (oid, name, surname, credit name)
This commit is contained in:
parent
a329ea5575
commit
7011d4203e
|
@ -0,0 +1,99 @@
|
|||
package orciddsmanager;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
public class OrcidDSManager {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(OrcidDSManager.class);
|
||||
|
||||
private String hdfsServerUri;
|
||||
private String hadoopUsername;
|
||||
private String hdfsOrcidDefaultPath;
|
||||
private String summariesFileNameTarGz;
|
||||
private String outputAuthorsPath;
|
||||
|
||||
public static void main(String[] args) {
|
||||
logger.info("OrcidDSManager started");
|
||||
OrcidDSManager orcidDSManager = new OrcidDSManager();
|
||||
try {
|
||||
orcidDSManager.initGARRProperties();
|
||||
orcidDSManager.generateAuthors();
|
||||
} catch (Exception e) {
|
||||
logger.error("Generating authors data: "+e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public void generateAuthors() throws Exception {
|
||||
Configuration conf = initConfigurationObject();
|
||||
FileSystem fs = initFileSystemObject(conf);
|
||||
String tarGzUri = hdfsServerUri.concat(hdfsOrcidDefaultPath).concat(summariesFileNameTarGz);
|
||||
logger.info("Started parsing "+tarGzUri);
|
||||
Path outputPath = new Path(hdfsServerUri.concat(hdfsOrcidDefaultPath).concat(outputAuthorsPath).concat(Long.toString(System.currentTimeMillis())).concat("/authors_part"));
|
||||
SummariesDecompressor.parseGzSummaries(conf, tarGzUri, outputPath);
|
||||
}
|
||||
|
||||
private Configuration initConfigurationObject() {
|
||||
// ====== Init HDFS File System Object
|
||||
Configuration conf = new Configuration();
|
||||
// Set FileSystem URI
|
||||
conf.set("fs.defaultFS", hdfsServerUri.concat(hdfsOrcidDefaultPath));
|
||||
// Because of Maven
|
||||
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
|
||||
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
|
||||
// Set HADOOP user
|
||||
System.setProperty("HADOOP_USER_NAME", hadoopUsername);
|
||||
System.setProperty("hadoop.home.dir", "/");
|
||||
return conf;
|
||||
}
|
||||
|
||||
private FileSystem initFileSystemObject(Configuration conf) {
|
||||
//Get the filesystem - HDFS
|
||||
FileSystem fs = null;
|
||||
try {
|
||||
fs = FileSystem.get(URI.create(hdfsServerUri.concat(hdfsOrcidDefaultPath)), conf);
|
||||
} catch (IOException e) {
|
||||
// TODO Auto-generated catch block
|
||||
e.printStackTrace();
|
||||
}
|
||||
return fs;
|
||||
}
|
||||
|
||||
private void loadProperties() throws FileNotFoundException, IOException {
|
||||
|
||||
Properties appProps = new Properties();
|
||||
ClassLoader classLoader = ClassLoader.getSystemClassLoader();
|
||||
appProps.load(classLoader.getResourceAsStream("orciddsmanager/props/app.properties"));
|
||||
hdfsServerUri = appProps.getProperty("hdfs.server.uri");
|
||||
hadoopUsername = appProps.getProperty("hdfs.hadoopusername");
|
||||
hdfsOrcidDefaultPath = appProps.getProperty("hdfs.orcid.defaultpath");
|
||||
summariesFileNameTarGz = appProps.getProperty("hdfs.orcid.summariesfilename.tar.gz");
|
||||
outputAuthorsPath = appProps.getProperty("hdfs.orcid.output.authorspath");
|
||||
}
|
||||
|
||||
private void initDefaultProperties() throws FileNotFoundException, IOException {
|
||||
|
||||
hdfsServerUri = "hdfs://localhost:9000";
|
||||
hadoopUsername = "enrico.ottonello";
|
||||
hdfsOrcidDefaultPath = "/user/enrico.ottonello/orcid/";
|
||||
summariesFileNameTarGz = "ORCID_2019_summaries.tar.gz";
|
||||
outputAuthorsPath = "output/";
|
||||
}
|
||||
|
||||
private void initGARRProperties() throws FileNotFoundException, IOException {
|
||||
|
||||
hdfsServerUri = "hdfs://hadoop-rm1.garr-pa1.d4science.org:8020";
|
||||
hadoopUsername = "root";
|
||||
hdfsOrcidDefaultPath = "/data/orcid_summaries/";
|
||||
summariesFileNameTarGz = "ORCID_2019_summaries.tar.gz";
|
||||
outputAuthorsPath = "output/";
|
||||
}
|
||||
}
|
|
@ -0,0 +1,143 @@
|
|||
package orciddsmanager;
|
||||
|
||||
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
|
||||
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.SequenceFile;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||
import org.apache.hadoop.io.compress.CompressionCodecFactory;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.xml.sax.SAXException;
|
||||
|
||||
import orciddsmanager.json.JsonWriter;
|
||||
import orciddsmanager.model.AuthorData;
|
||||
import orciddsmanager.xml.XMLRecordParser;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.net.URI;
|
||||
import javax.xml.parsers.ParserConfigurationException;
|
||||
import javax.xml.xpath.XPathExpressionException;
|
||||
|
||||
public class SummariesDecompressor {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(SummariesDecompressor.class);
|
||||
|
||||
public static void parseGzSummaries(Configuration conf, String inputUri, Path outputPath) throws Exception {
|
||||
String uri = inputUri;
|
||||
FileSystem fs = FileSystem.get(URI.create(uri), conf);
|
||||
Path inputPath = new Path(uri);
|
||||
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
|
||||
CompressionCodec codec = factory.getCodec(inputPath);
|
||||
if (codec == null) {
|
||||
System.err.println("No codec found for " + uri);
|
||||
System.exit(1);
|
||||
}
|
||||
CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension());
|
||||
InputStream gzipInputStream = null;
|
||||
try {
|
||||
gzipInputStream = codec.createInputStream(fs.open(inputPath));
|
||||
parseTarSummaries(fs, conf, gzipInputStream, outputPath);
|
||||
|
||||
} finally {
|
||||
logger.debug("Closing gzip stream");
|
||||
IOUtils.closeStream(gzipInputStream);
|
||||
}
|
||||
}
|
||||
|
||||
private static void parseTarSummaries(FileSystem fs, Configuration conf, InputStream gzipInputStream, Path outputPath) {
|
||||
int counter = 0;
|
||||
int nameFound = 0;
|
||||
int surnameFound = 0;
|
||||
int creditNameFound = 0;
|
||||
int errorFromOrcidFound = 0;
|
||||
int xmlParserErrorFound = 0;
|
||||
try (TarArchiveInputStream tais = new TarArchiveInputStream(gzipInputStream)) {
|
||||
TarArchiveEntry entry = null;
|
||||
|
||||
try (SequenceFile.Writer writer = SequenceFile.createWriter(conf,
|
||||
SequenceFile.Writer.file(outputPath), SequenceFile.Writer.keyClass(Text.class),
|
||||
SequenceFile.Writer.valueClass(Text.class))) {
|
||||
|
||||
while ((entry = tais.getNextTarEntry()) != null) {
|
||||
String filename = entry.getName();
|
||||
if (entry.isDirectory()) {
|
||||
logger.debug("Directory entry name: "+entry.getName());
|
||||
} else {
|
||||
logger.debug("XML record entry name: "+entry.getName());
|
||||
counter++;
|
||||
BufferedReader br = new BufferedReader(new InputStreamReader(tais)); // Read directly from tarInput
|
||||
String line;
|
||||
StringBuffer buffer = new StringBuffer();
|
||||
while ((line = br.readLine()) != null) {
|
||||
buffer.append(line);
|
||||
}
|
||||
try (ByteArrayInputStream bais = new ByteArrayInputStream(buffer.toString().getBytes())) {
|
||||
AuthorData authorData = XMLRecordParser.parse(bais);
|
||||
if (authorData!=null) {
|
||||
if (authorData.getErrorCode()!=null) {
|
||||
errorFromOrcidFound+=1;
|
||||
logger.debug("error from Orcid with code "+authorData.getErrorCode()+" for oid "+entry.getName());
|
||||
continue;
|
||||
}
|
||||
String jsonData = JsonWriter.create(authorData);
|
||||
logger.debug("oid: "+authorData.getOid() + " data: "+jsonData);
|
||||
|
||||
final Text key = new Text(authorData.getOid());
|
||||
final Text value = new Text(jsonData);
|
||||
|
||||
try {
|
||||
writer.append(key, value);
|
||||
} catch (IOException e) {
|
||||
logger.error("Writing to sequence file: "+e.getMessage());
|
||||
e.printStackTrace();
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
if (authorData.getName()!=null) {
|
||||
nameFound+=1;
|
||||
}
|
||||
if (authorData.getSurname()!=null) {
|
||||
surnameFound+=1;
|
||||
}
|
||||
if (authorData.getCreditName()!=null) {
|
||||
creditNameFound+=1;
|
||||
}
|
||||
|
||||
}
|
||||
else {
|
||||
logger.error("Data not retrievable ["+entry.getName()+"] "+buffer.toString());
|
||||
xmlParserErrorFound+=1;
|
||||
}
|
||||
|
||||
} catch (XPathExpressionException | ParserConfigurationException | SAXException e) {
|
||||
logger.error("Parsing record from tar archive: "+e.getMessage());
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
if ((counter % 1000) == 0) {
|
||||
logger.info("Current xml records parsed: "+counter);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.error("Parsing record from gzip archive: "+e.getMessage());
|
||||
e.printStackTrace();
|
||||
}
|
||||
logger.info("Summaries parse completed");
|
||||
logger.info("Total XML records parsed: "+counter);
|
||||
logger.info("Name found: "+nameFound);
|
||||
logger.info("Surname found: "+surnameFound);
|
||||
logger.info("Credit name found: "+creditNameFound);
|
||||
logger.info("Error from Orcid found: "+errorFromOrcidFound);
|
||||
logger.info("Error parsing xml record found: "+xmlParserErrorFound);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,19 @@
|
|||
package orciddsmanager.json;
|
||||
|
||||
import com.google.gson.JsonObject;
|
||||
|
||||
import orciddsmanager.model.AuthorData;
|
||||
|
||||
public class JsonWriter {
|
||||
|
||||
public static String create(AuthorData authorData) {
|
||||
JsonObject author = new JsonObject();
|
||||
author.addProperty("oid", authorData.getOid());
|
||||
author.addProperty("name", authorData.getName());
|
||||
author.addProperty("surname", authorData.getSurname());
|
||||
if (authorData.getCreditName()!=null) {
|
||||
author.addProperty("creditname", authorData.getCreditName());
|
||||
}
|
||||
return author.toString();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,41 @@
|
|||
package orciddsmanager.model;
|
||||
|
||||
public class AuthorData {
|
||||
|
||||
private String oid;
|
||||
private String name;
|
||||
private String surname;
|
||||
private String creditName;
|
||||
private String errorCode;
|
||||
|
||||
public String getErrorCode() {
|
||||
return errorCode;
|
||||
}
|
||||
public void setErrorCode(String errorCode) {
|
||||
this.errorCode = errorCode;
|
||||
}
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
public void setName(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
public String getSurname() {
|
||||
return surname;
|
||||
}
|
||||
public void setSurname(String surname) {
|
||||
this.surname = surname;
|
||||
}
|
||||
public String getCreditName() {
|
||||
return creditName;
|
||||
}
|
||||
public void setCreditName(String creditName) {
|
||||
this.creditName = creditName;
|
||||
}
|
||||
public String getOid() {
|
||||
return oid;
|
||||
}
|
||||
public void setOid(String oid) {
|
||||
this.oid = oid;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,98 @@
|
|||
package orciddsmanager.xml;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
|
||||
import javax.xml.namespace.NamespaceContext;
|
||||
import javax.xml.parsers.DocumentBuilder;
|
||||
import javax.xml.parsers.DocumentBuilderFactory;
|
||||
import javax.xml.parsers.ParserConfigurationException;
|
||||
import javax.xml.xpath.XPath;
|
||||
import javax.xml.xpath.XPathConstants;
|
||||
import javax.xml.xpath.XPathExpressionException;
|
||||
import javax.xml.xpath.XPathFactory;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.w3c.dom.Document;
|
||||
import org.xml.sax.SAXException;
|
||||
|
||||
import orciddsmanager.model.AuthorData;
|
||||
|
||||
public class XMLRecordParser {
|
||||
|
||||
public static AuthorData parse(ByteArrayInputStream bytesStream) throws ParserConfigurationException, SAXException, IOException, XPathExpressionException {
|
||||
bytesStream.reset();
|
||||
DocumentBuilderFactory builderFactory = DocumentBuilderFactory.newInstance();
|
||||
builderFactory.setNamespaceAware(true);
|
||||
DocumentBuilder builder = builderFactory.newDocumentBuilder();
|
||||
|
||||
Document xmlDocument = builder.parse(bytesStream);
|
||||
XPath xPath = XPathFactory.newInstance().newXPath();
|
||||
xPath.setNamespaceContext(new NamespaceContext() {
|
||||
@Override
|
||||
public Iterator getPrefixes(String arg0) {
|
||||
return null;
|
||||
}
|
||||
@Override
|
||||
public String getPrefix(String arg0) {
|
||||
return null;
|
||||
}
|
||||
@Override
|
||||
public String getNamespaceURI(String arg0) {
|
||||
if ("common".equals(arg0)) {
|
||||
return "http://www.orcid.org/ns/common";
|
||||
}
|
||||
else if ("person".equals(arg0)) {
|
||||
return "http://www.orcid.org/ns/person";
|
||||
}
|
||||
else if ("personal-details".equals(arg0)) {
|
||||
return "http://www.orcid.org/ns/personal-details";
|
||||
}
|
||||
else if ("other-name".equals(arg0)) {
|
||||
return "http://www.orcid.org/ns/other-name";
|
||||
}
|
||||
else if ("record".equals(arg0)) {
|
||||
return "http://www.orcid.org/ns/record";
|
||||
}
|
||||
else if ("error".equals(arg0)) {
|
||||
return "http://www.orcid.org/ns/error";
|
||||
}
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
AuthorData authorData = new AuthorData();
|
||||
String errorPath = "//error:response-code";
|
||||
String error = (String)xPath.compile(errorPath).evaluate(xmlDocument, XPathConstants.STRING);
|
||||
if (!StringUtils.isBlank(error)) {
|
||||
authorData.setErrorCode(error);
|
||||
return authorData;
|
||||
}
|
||||
String oidPath = "//record:record/@path";
|
||||
String oid = (String)xPath.compile(oidPath).evaluate(xmlDocument, XPathConstants.STRING);
|
||||
if (!StringUtils.isBlank(oid)) {
|
||||
oid = oid.substring(1);
|
||||
authorData.setOid(oid);
|
||||
}
|
||||
else {
|
||||
return null;
|
||||
}
|
||||
String namePath = "//personal-details:given-names";
|
||||
String name = (String)xPath.compile(namePath).evaluate(xmlDocument, XPathConstants.STRING);
|
||||
if (!StringUtils.isBlank(name)) {
|
||||
authorData.setName(name);
|
||||
}
|
||||
String surnamePath = "//personal-details:family-name";
|
||||
String surname = (String)xPath.compile(surnamePath).evaluate(xmlDocument, XPathConstants.STRING);
|
||||
if (!StringUtils.isBlank(surname)) {
|
||||
authorData.setSurname(surname);
|
||||
}
|
||||
String creditnamePath = "//personal-details:credit-name";
|
||||
String creditName = (String)xPath.compile(creditnamePath).evaluate(xmlDocument, XPathConstants.STRING);
|
||||
if (!StringUtils.isBlank(creditName)) {
|
||||
authorData.setCreditName(creditName);
|
||||
}
|
||||
return authorData;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue