From 7011d4203ee7d0ee38debbb5aa1958d931f5b62e Mon Sep 17 00:00:00 2001 From: Enrico Ottonello Date: Fri, 17 Apr 2020 18:52:39 +0200 Subject: [PATCH] parser of orcid summaries from tar gz file on hdfs, that creates a sequence file with authors informations (oid, name, surname, credit name) --- .../orciddsmanager/OrcidDSManager.java | 99 ++++++++++++ .../orciddsmanager/SummariesDecompressor.java | 143 ++++++++++++++++++ .../orciddsmanager/json/JsonWriter.java | 19 +++ .../orciddsmanager/model/AuthorData.java | 41 +++++ .../orciddsmanager/xml/XMLRecordParser.java | 98 ++++++++++++ 5 files changed, 400 insertions(+) create mode 100644 dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/orciddsmanager/OrcidDSManager.java create mode 100644 dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/orciddsmanager/SummariesDecompressor.java create mode 100644 dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/orciddsmanager/json/JsonWriter.java create mode 100644 dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/orciddsmanager/model/AuthorData.java create mode 100644 dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/orciddsmanager/xml/XMLRecordParser.java diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/orciddsmanager/OrcidDSManager.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/orciddsmanager/OrcidDSManager.java new file mode 100644 index 000000000..010b76a53 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/orciddsmanager/OrcidDSManager.java @@ -0,0 +1,99 @@ +package orciddsmanager; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.Logger; + +public class OrcidDSManager { + + private static final Logger logger = Logger.getLogger(OrcidDSManager.class); + + private String hdfsServerUri; + private String hadoopUsername; + private String hdfsOrcidDefaultPath; + private String summariesFileNameTarGz; + private String outputAuthorsPath; + + public static void main(String[] args) { + logger.info("OrcidDSManager started"); + OrcidDSManager orcidDSManager = new OrcidDSManager(); + try { + orcidDSManager.initGARRProperties(); + orcidDSManager.generateAuthors(); + } catch (Exception e) { + logger.error("Generating authors data: "+e.getMessage()); + } + } + + public void generateAuthors() throws Exception { + Configuration conf = initConfigurationObject(); + FileSystem fs = initFileSystemObject(conf); + String tarGzUri = hdfsServerUri.concat(hdfsOrcidDefaultPath).concat(summariesFileNameTarGz); + logger.info("Started parsing "+tarGzUri); + Path outputPath = new Path(hdfsServerUri.concat(hdfsOrcidDefaultPath).concat(outputAuthorsPath).concat(Long.toString(System.currentTimeMillis())).concat("/authors_part")); + SummariesDecompressor.parseGzSummaries(conf, tarGzUri, outputPath); + } + + private Configuration initConfigurationObject() { + // ====== Init HDFS File System Object + Configuration conf = new Configuration(); + // Set FileSystem URI + conf.set("fs.defaultFS", hdfsServerUri.concat(hdfsOrcidDefaultPath)); + // Because of Maven + conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); + conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); + // Set HADOOP user + System.setProperty("HADOOP_USER_NAME", hadoopUsername); + System.setProperty("hadoop.home.dir", "/"); + return conf; + } + + private FileSystem initFileSystemObject(Configuration conf) { + //Get the filesystem - HDFS + FileSystem fs = null; + try { + fs = FileSystem.get(URI.create(hdfsServerUri.concat(hdfsOrcidDefaultPath)), conf); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + return fs; + } + + private void loadProperties() throws FileNotFoundException, IOException { + + Properties appProps = new Properties(); + ClassLoader classLoader = ClassLoader.getSystemClassLoader(); + appProps.load(classLoader.getResourceAsStream("orciddsmanager/props/app.properties")); + hdfsServerUri = appProps.getProperty("hdfs.server.uri"); + hadoopUsername = appProps.getProperty("hdfs.hadoopusername"); + hdfsOrcidDefaultPath = appProps.getProperty("hdfs.orcid.defaultpath"); + summariesFileNameTarGz = appProps.getProperty("hdfs.orcid.summariesfilename.tar.gz"); + outputAuthorsPath = appProps.getProperty("hdfs.orcid.output.authorspath"); + } + + private void initDefaultProperties() throws FileNotFoundException, IOException { + + hdfsServerUri = "hdfs://localhost:9000"; + hadoopUsername = "enrico.ottonello"; + hdfsOrcidDefaultPath = "/user/enrico.ottonello/orcid/"; + summariesFileNameTarGz = "ORCID_2019_summaries.tar.gz"; + outputAuthorsPath = "output/"; + } + + private void initGARRProperties() throws FileNotFoundException, IOException { + + hdfsServerUri = "hdfs://hadoop-rm1.garr-pa1.d4science.org:8020"; + hadoopUsername = "root"; + hdfsOrcidDefaultPath = "/data/orcid_summaries/"; + summariesFileNameTarGz = "ORCID_2019_summaries.tar.gz"; + outputAuthorsPath = "output/"; + } +} diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/orciddsmanager/SummariesDecompressor.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/orciddsmanager/SummariesDecompressor.java new file mode 100644 index 000000000..3a4caba35 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/orciddsmanager/SummariesDecompressor.java @@ -0,0 +1,143 @@ +package orciddsmanager; + +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.log4j.Logger; +import org.xml.sax.SAXException; + +import orciddsmanager.json.JsonWriter; +import orciddsmanager.model.AuthorData; +import orciddsmanager.xml.XMLRecordParser; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URI; +import javax.xml.parsers.ParserConfigurationException; +import javax.xml.xpath.XPathExpressionException; + +public class SummariesDecompressor { + + private static final Logger logger = Logger.getLogger(SummariesDecompressor.class); + + public static void parseGzSummaries(Configuration conf, String inputUri, Path outputPath) throws Exception { + String uri = inputUri; + FileSystem fs = FileSystem.get(URI.create(uri), conf); + Path inputPath = new Path(uri); + CompressionCodecFactory factory = new CompressionCodecFactory(conf); + CompressionCodec codec = factory.getCodec(inputPath); + if (codec == null) { + System.err.println("No codec found for " + uri); + System.exit(1); + } + CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension()); + InputStream gzipInputStream = null; + try { + gzipInputStream = codec.createInputStream(fs.open(inputPath)); + parseTarSummaries(fs, conf, gzipInputStream, outputPath); + + } finally { + logger.debug("Closing gzip stream"); + IOUtils.closeStream(gzipInputStream); + } + } + + private static void parseTarSummaries(FileSystem fs, Configuration conf, InputStream gzipInputStream, Path outputPath) { + int counter = 0; + int nameFound = 0; + int surnameFound = 0; + int creditNameFound = 0; + int errorFromOrcidFound = 0; + int xmlParserErrorFound = 0; + try (TarArchiveInputStream tais = new TarArchiveInputStream(gzipInputStream)) { + TarArchiveEntry entry = null; + + try (SequenceFile.Writer writer = SequenceFile.createWriter(conf, + SequenceFile.Writer.file(outputPath), SequenceFile.Writer.keyClass(Text.class), + SequenceFile.Writer.valueClass(Text.class))) { + + while ((entry = tais.getNextTarEntry()) != null) { + String filename = entry.getName(); + if (entry.isDirectory()) { + logger.debug("Directory entry name: "+entry.getName()); + } else { + logger.debug("XML record entry name: "+entry.getName()); + counter++; + BufferedReader br = new BufferedReader(new InputStreamReader(tais)); // Read directly from tarInput + String line; + StringBuffer buffer = new StringBuffer(); + while ((line = br.readLine()) != null) { + buffer.append(line); + } + try (ByteArrayInputStream bais = new ByteArrayInputStream(buffer.toString().getBytes())) { + AuthorData authorData = XMLRecordParser.parse(bais); + if (authorData!=null) { + if (authorData.getErrorCode()!=null) { + errorFromOrcidFound+=1; + logger.debug("error from Orcid with code "+authorData.getErrorCode()+" for oid "+entry.getName()); + continue; + } + String jsonData = JsonWriter.create(authorData); + logger.debug("oid: "+authorData.getOid() + " data: "+jsonData); + + final Text key = new Text(authorData.getOid()); + final Text value = new Text(jsonData); + + try { + writer.append(key, value); + } catch (IOException e) { + logger.error("Writing to sequence file: "+e.getMessage()); + e.printStackTrace(); + throw new RuntimeException(e); + } + + if (authorData.getName()!=null) { + nameFound+=1; + } + if (authorData.getSurname()!=null) { + surnameFound+=1; + } + if (authorData.getCreditName()!=null) { + creditNameFound+=1; + } + + } + else { + logger.error("Data not retrievable ["+entry.getName()+"] "+buffer.toString()); + xmlParserErrorFound+=1; + } + + } catch (XPathExpressionException | ParserConfigurationException | SAXException e) { + logger.error("Parsing record from tar archive: "+e.getMessage()); + e.printStackTrace(); + } + } + + if ((counter % 1000) == 0) { + logger.info("Current xml records parsed: "+counter); + } + } + } + } catch (IOException e) { + logger.error("Parsing record from gzip archive: "+e.getMessage()); + e.printStackTrace(); + } + logger.info("Summaries parse completed"); + logger.info("Total XML records parsed: "+counter); + logger.info("Name found: "+nameFound); + logger.info("Surname found: "+surnameFound); + logger.info("Credit name found: "+creditNameFound); + logger.info("Error from Orcid found: "+errorFromOrcidFound); + logger.info("Error parsing xml record found: "+xmlParserErrorFound); + } +} \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/orciddsmanager/json/JsonWriter.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/orciddsmanager/json/JsonWriter.java new file mode 100644 index 000000000..7321e644f --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/orciddsmanager/json/JsonWriter.java @@ -0,0 +1,19 @@ +package orciddsmanager.json; + +import com.google.gson.JsonObject; + +import orciddsmanager.model.AuthorData; + +public class JsonWriter { + + public static String create(AuthorData authorData) { + JsonObject author = new JsonObject(); + author.addProperty("oid", authorData.getOid()); + author.addProperty("name", authorData.getName()); + author.addProperty("surname", authorData.getSurname()); + if (authorData.getCreditName()!=null) { + author.addProperty("creditname", authorData.getCreditName()); + } + return author.toString(); + } +} diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/orciddsmanager/model/AuthorData.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/orciddsmanager/model/AuthorData.java new file mode 100644 index 000000000..0e3ca3609 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/orciddsmanager/model/AuthorData.java @@ -0,0 +1,41 @@ +package orciddsmanager.model; + +public class AuthorData { + + private String oid; + private String name; + private String surname; + private String creditName; + private String errorCode; + + public String getErrorCode() { + return errorCode; + } + public void setErrorCode(String errorCode) { + this.errorCode = errorCode; + } + public String getName() { + return name; + } + public void setName(String name) { + this.name = name; + } + public String getSurname() { + return surname; + } + public void setSurname(String surname) { + this.surname = surname; + } + public String getCreditName() { + return creditName; + } + public void setCreditName(String creditName) { + this.creditName = creditName; + } + public String getOid() { + return oid; + } + public void setOid(String oid) { + this.oid = oid; + } +} diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/orciddsmanager/xml/XMLRecordParser.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/orciddsmanager/xml/XMLRecordParser.java new file mode 100644 index 000000000..1934bf063 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/orciddsmanager/xml/XMLRecordParser.java @@ -0,0 +1,98 @@ +package orciddsmanager.xml; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.Iterator; + +import javax.xml.namespace.NamespaceContext; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; +import javax.xml.xpath.XPath; +import javax.xml.xpath.XPathConstants; +import javax.xml.xpath.XPathExpressionException; +import javax.xml.xpath.XPathFactory; + +import org.apache.commons.lang.StringUtils; +import org.w3c.dom.Document; +import org.xml.sax.SAXException; + +import orciddsmanager.model.AuthorData; + +public class XMLRecordParser { + + public static AuthorData parse(ByteArrayInputStream bytesStream) throws ParserConfigurationException, SAXException, IOException, XPathExpressionException { + bytesStream.reset(); + DocumentBuilderFactory builderFactory = DocumentBuilderFactory.newInstance(); + builderFactory.setNamespaceAware(true); + DocumentBuilder builder = builderFactory.newDocumentBuilder(); + + Document xmlDocument = builder.parse(bytesStream); + XPath xPath = XPathFactory.newInstance().newXPath(); + xPath.setNamespaceContext(new NamespaceContext() { + @Override + public Iterator getPrefixes(String arg0) { + return null; + } + @Override + public String getPrefix(String arg0) { + return null; + } + @Override + public String getNamespaceURI(String arg0) { + if ("common".equals(arg0)) { + return "http://www.orcid.org/ns/common"; + } + else if ("person".equals(arg0)) { + return "http://www.orcid.org/ns/person"; + } + else if ("personal-details".equals(arg0)) { + return "http://www.orcid.org/ns/personal-details"; + } + else if ("other-name".equals(arg0)) { + return "http://www.orcid.org/ns/other-name"; + } + else if ("record".equals(arg0)) { + return "http://www.orcid.org/ns/record"; + } + else if ("error".equals(arg0)) { + return "http://www.orcid.org/ns/error"; + } + return null; + } + }); + + AuthorData authorData = new AuthorData(); + String errorPath = "//error:response-code"; + String error = (String)xPath.compile(errorPath).evaluate(xmlDocument, XPathConstants.STRING); + if (!StringUtils.isBlank(error)) { + authorData.setErrorCode(error); + return authorData; + } + String oidPath = "//record:record/@path"; + String oid = (String)xPath.compile(oidPath).evaluate(xmlDocument, XPathConstants.STRING); + if (!StringUtils.isBlank(oid)) { + oid = oid.substring(1); + authorData.setOid(oid); + } + else { + return null; + } + String namePath = "//personal-details:given-names"; + String name = (String)xPath.compile(namePath).evaluate(xmlDocument, XPathConstants.STRING); + if (!StringUtils.isBlank(name)) { + authorData.setName(name); + } + String surnamePath = "//personal-details:family-name"; + String surname = (String)xPath.compile(surnamePath).evaluate(xmlDocument, XPathConstants.STRING); + if (!StringUtils.isBlank(surname)) { + authorData.setSurname(surname); + } + String creditnamePath = "//personal-details:credit-name"; + String creditName = (String)xPath.compile(creditnamePath).evaluate(xmlDocument, XPathConstants.STRING); + if (!StringUtils.isBlank(creditName)) { + authorData.setCreditName(creditName); + } + return authorData; + } +}