forked from D-Net/dnet-hadoop
Merge branch 'stable_ids' of https://code-repo.d4science.org/D-Net/dnet-hadoop into stable_ids
This commit is contained in:
commit
fb930b84d3
|
@ -33,7 +33,6 @@ object SparkCreateBaselineDataFrame {
|
||||||
implicit val PMJEncoder: Encoder[PMJournal] = Encoders.kryo(classOf[PMJournal])
|
implicit val PMJEncoder: Encoder[PMJournal] = Encoders.kryo(classOf[PMJournal])
|
||||||
implicit val PMAEncoder: Encoder[PMAuthor] = Encoders.kryo(classOf[PMAuthor])
|
implicit val PMAEncoder: Encoder[PMAuthor] = Encoders.kryo(classOf[PMAuthor])
|
||||||
val k: RDD[(String, String)] = sc.wholeTextFiles(s"$workingPath/baseline",2000)
|
val k: RDD[(String, String)] = sc.wholeTextFiles(s"$workingPath/baseline",2000)
|
||||||
|
|
||||||
val ds:Dataset[PMArticle] = spark.createDataset(k.filter(i => i._1.endsWith(".gz")).flatMap(i =>{
|
val ds:Dataset[PMArticle] = spark.createDataset(k.filter(i => i._1.endsWith(".gz")).flatMap(i =>{
|
||||||
val xml = new XMLEventReader(Source.fromBytes(i._2.getBytes()))
|
val xml = new XMLEventReader(Source.fromBytes(i._2.getBytes()))
|
||||||
new PMParser(xml)
|
new PMParser(xml)
|
||||||
|
|
|
@ -8,10 +8,33 @@ import java.util.List;
|
||||||
public class PMArticle implements Serializable {
|
public class PMArticle implements Serializable {
|
||||||
|
|
||||||
private String pmid;
|
private String pmid;
|
||||||
|
private String doi;
|
||||||
private String date;
|
private String date;
|
||||||
private PMJournal journal;
|
private PMJournal journal;
|
||||||
private String title;
|
private String title;
|
||||||
private String description;
|
private String description;
|
||||||
|
private String language;
|
||||||
|
private final List<PMSubject> subjects = new ArrayList<>();
|
||||||
|
private final List<PMSubject> publicationTypes = new ArrayList<>();
|
||||||
|
|
||||||
|
public List<PMSubject> getPublicationTypes() {
|
||||||
|
return publicationTypes;
|
||||||
|
}
|
||||||
|
|
||||||
|
private final List<PMGrant> grants = new ArrayList<>();
|
||||||
|
|
||||||
|
public List<PMGrant> getGrants() {
|
||||||
|
return grants;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getDoi() {
|
||||||
|
return doi;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDoi(String doi) {
|
||||||
|
this.doi = doi;
|
||||||
|
}
|
||||||
|
|
||||||
private List<PMAuthor> authors = new ArrayList<>();
|
private List<PMAuthor> authors = new ArrayList<>();
|
||||||
|
|
||||||
public String getPmid() {
|
public String getPmid() {
|
||||||
|
@ -61,4 +84,16 @@ public class PMArticle implements Serializable {
|
||||||
public void setAuthors(List<PMAuthor> authors) {
|
public void setAuthors(List<PMAuthor> authors) {
|
||||||
this.authors = authors;
|
this.authors = authors;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public List<PMSubject> getSubjects() {
|
||||||
|
return subjects;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getLanguage() {
|
||||||
|
return language;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setLanguage(String language) {
|
||||||
|
this.language = language;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,42 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.sx.ebi.model;
|
||||||
|
|
||||||
|
public class PMGrant {
|
||||||
|
|
||||||
|
private String grantID;
|
||||||
|
private String agency;
|
||||||
|
private String country;
|
||||||
|
|
||||||
|
public PMGrant() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public PMGrant(String grantID, String agency, String country) {
|
||||||
|
this.grantID = grantID;
|
||||||
|
this.agency = agency;
|
||||||
|
this.country = country;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getGrantID() {
|
||||||
|
return grantID;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setGrantID(String grantID) {
|
||||||
|
this.grantID = grantID;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getAgency() {
|
||||||
|
return agency;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setAgency(String agency) {
|
||||||
|
this.agency = agency;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getCountry() {
|
||||||
|
return country;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCountry(String country) {
|
||||||
|
this.country = country;
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,4 +1,6 @@
|
||||||
package eu.dnetlib.dhp.sx.ebi.model
|
package eu.dnetlib.dhp.sx.ebi.model
|
||||||
|
|
||||||
|
import scala.xml.MetaData
|
||||||
import scala.xml.pull.{EvElemEnd, EvElemStart, EvText, XMLEventReader}
|
import scala.xml.pull.{EvElemEnd, EvElemStart, EvText, XMLEventReader}
|
||||||
class PMParser(xml:XMLEventReader) extends Iterator[PMArticle] {
|
class PMParser(xml:XMLEventReader) extends Iterator[PMArticle] {
|
||||||
|
|
||||||
|
@ -12,24 +14,61 @@ class PMParser(xml:XMLEventReader) extends Iterator[PMArticle] {
|
||||||
tmp
|
tmp
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def extractAttributes(attrs:MetaData, key:String):String = {
|
||||||
|
|
||||||
|
val res = attrs.get(key)
|
||||||
|
if (res.isDefined) {
|
||||||
|
val s =res.get
|
||||||
|
if (s != null && s.nonEmpty)
|
||||||
|
s.head.text
|
||||||
|
else
|
||||||
|
null
|
||||||
|
}
|
||||||
|
else null
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def validate_Date(year:String, month:String, day:String):String = {
|
||||||
|
try {
|
||||||
|
f"${year.toInt}-${month.toInt}%02d-${day.toInt}%02d"
|
||||||
|
|
||||||
|
} catch {
|
||||||
|
case _: Throwable =>null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
def generateNextArticle():PMArticle = {
|
def generateNextArticle():PMArticle = {
|
||||||
|
|
||||||
|
|
||||||
|
var currentSubject:PMSubject = null
|
||||||
var currentAuthor: PMAuthor = null
|
var currentAuthor: PMAuthor = null
|
||||||
var currentJournal: PMJournal = null
|
var currentJournal: PMJournal = null
|
||||||
|
var currentGrant: PMGrant = null
|
||||||
var currNode: String = null
|
var currNode: String = null
|
||||||
var currentYear = "0"
|
var currentYear = "0"
|
||||||
var currentMonth = "01"
|
var currentMonth = "01"
|
||||||
var currentDay = "01"
|
var currentDay = "01"
|
||||||
|
var currentArticleType:String = null
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
while (xml.hasNext) {
|
while (xml.hasNext) {
|
||||||
xml.next match {
|
xml.next match {
|
||||||
case EvElemStart(_, label, _, _) =>
|
case EvElemStart(_, label, attrs, _) =>
|
||||||
currNode = label
|
currNode = label
|
||||||
|
|
||||||
label match {
|
label match {
|
||||||
case "PubmedArticle" => currentArticle = new PMArticle
|
case "PubmedArticle" => currentArticle = new PMArticle
|
||||||
case "Author" => currentAuthor = new PMAuthor
|
case "Author" => currentAuthor = new PMAuthor
|
||||||
case "Journal" => currentJournal = new PMJournal
|
case "Journal" => currentJournal = new PMJournal
|
||||||
|
case "Grant" => currentGrant = new PMGrant
|
||||||
|
case "PublicationType" | "DescriptorName" =>
|
||||||
|
currentSubject = new PMSubject
|
||||||
|
currentSubject.setMeshId(extractAttributes(attrs, "UI"))
|
||||||
|
case "ArticleId" => currentArticleType = extractAttributes(attrs,"IdType")
|
||||||
case _ =>
|
case _ =>
|
||||||
}
|
}
|
||||||
case EvElemEnd(_, label) =>
|
case EvElemEnd(_, label) =>
|
||||||
|
@ -37,8 +76,12 @@ class PMParser(xml:XMLEventReader) extends Iterator[PMArticle] {
|
||||||
case "PubmedArticle" => return currentArticle
|
case "PubmedArticle" => return currentArticle
|
||||||
case "Author" => currentArticle.getAuthors.add(currentAuthor)
|
case "Author" => currentArticle.getAuthors.add(currentAuthor)
|
||||||
case "Journal" => currentArticle.setJournal(currentJournal)
|
case "Journal" => currentArticle.setJournal(currentJournal)
|
||||||
case "DateCompleted" => currentArticle.setDate(s"$currentYear-$currentMonth-$currentDay")
|
case "Grant" => currentArticle.getGrants.add(currentGrant)
|
||||||
|
case "PubMedPubDate" => if (currentArticle.getDate== null)
|
||||||
|
currentArticle.setDate(validate_Date(currentYear,currentMonth,currentDay))
|
||||||
case "PubDate" => currentJournal.setDate(s"$currentYear-$currentMonth-$currentDay")
|
case "PubDate" => currentJournal.setDate(s"$currentYear-$currentMonth-$currentDay")
|
||||||
|
case "DescriptorName" => currentArticle.getSubjects.add(currentSubject)
|
||||||
|
case "PublicationType" =>currentArticle.getPublicationTypes.add(currentSubject)
|
||||||
case _ =>
|
case _ =>
|
||||||
}
|
}
|
||||||
case EvText(text) =>
|
case EvText(text) =>
|
||||||
|
@ -57,12 +100,18 @@ class PMParser(xml:XMLEventReader) extends Iterator[PMArticle] {
|
||||||
currentArticle.setDescription(currentArticle.getDescription + text.trim)
|
currentArticle.setDescription(currentArticle.getDescription + text.trim)
|
||||||
}
|
}
|
||||||
case "PMID" => currentArticle.setPmid(text.trim)
|
case "PMID" => currentArticle.setPmid(text.trim)
|
||||||
|
case "ArticleId" => if ("doi".equalsIgnoreCase(currentArticleType)) currentArticle.setDoi(text.trim)
|
||||||
|
case "Language" => currentArticle.setLanguage(text.trim)
|
||||||
case "ISSN" => currentJournal.setIssn(text.trim)
|
case "ISSN" => currentJournal.setIssn(text.trim)
|
||||||
|
case "GrantID" => currentGrant.setGrantID(text.trim)
|
||||||
|
case "Agency" => currentGrant.setAgency(text.trim)
|
||||||
|
case "Country" => if (currentGrant != null) currentGrant.setCountry(text.trim)
|
||||||
case "Year" => currentYear = text.trim
|
case "Year" => currentYear = text.trim
|
||||||
case "Month" => currentMonth = text.trim
|
case "Month" => currentMonth = text.trim
|
||||||
case "Day" => currentDay = text.trim
|
case "Day" => currentDay = text.trim
|
||||||
case "Volume" => currentJournal.setVolume( text.trim)
|
case "Volume" => currentJournal.setVolume( text.trim)
|
||||||
case "Issue" => currentJournal.setIssue (text.trim)
|
case "Issue" => currentJournal.setIssue (text.trim)
|
||||||
|
case "PublicationType" | "DescriptorName" => currentSubject.setValue(text.trim)
|
||||||
case "LastName" => {
|
case "LastName" => {
|
||||||
if (currentAuthor != null)
|
if (currentAuthor != null)
|
||||||
currentAuthor.setLastName(text.trim)
|
currentAuthor.setLastName(text.trim)
|
||||||
|
|
|
@ -0,0 +1,41 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.sx.ebi.model;
|
||||||
|
|
||||||
|
public class PMSubject {
|
||||||
|
private String value;
|
||||||
|
private String meshId;
|
||||||
|
private String registryNumber;
|
||||||
|
|
||||||
|
public PMSubject() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public PMSubject(String value, String meshId, String registryNumber) {
|
||||||
|
this.value = value;
|
||||||
|
this.meshId = meshId;
|
||||||
|
this.registryNumber = registryNumber;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getValue() {
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setValue(String value) {
|
||||||
|
this.value = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getMeshId() {
|
||||||
|
return meshId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMeshId(String meshId) {
|
||||||
|
this.meshId = meshId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getRegistryNumber() {
|
||||||
|
return registryNumber;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setRegistryNumber(String registryNumber) {
|
||||||
|
this.registryNumber = registryNumber;
|
||||||
|
}
|
||||||
|
}
|
|
@ -5,9 +5,6 @@ import org.apache.spark.sql.{Encoder, Encoders}
|
||||||
import org.json4s
|
import org.json4s
|
||||||
import org.json4s.DefaultFormats
|
import org.json4s.DefaultFormats
|
||||||
import org.json4s.jackson.JsonMethods.parse
|
import org.json4s.jackson.JsonMethods.parse
|
||||||
|
|
||||||
import java.text.SimpleDateFormat
|
|
||||||
import java.util.Date
|
|
||||||
import java.util.regex.Pattern
|
import java.util.regex.Pattern
|
||||||
import scala.language.postfixOps
|
import scala.language.postfixOps
|
||||||
import scala.xml.{Elem, Node, XML}
|
import scala.xml.{Elem, Node, XML}
|
||||||
|
|
|
@ -24,7 +24,7 @@ object SparkGeneratePanagaeaDataset {
|
||||||
SparkSession
|
SparkSession
|
||||||
.builder()
|
.builder()
|
||||||
.config(conf)
|
.config(conf)
|
||||||
.appName(SparkCreateEBIDataFrame.getClass.getSimpleName)
|
.appName(SparkGeneratePanagaeaDataset.getClass.getSimpleName)
|
||||||
.master(parser.get("master")).getOrCreate()
|
.master(parser.get("master")).getOrCreate()
|
||||||
|
|
||||||
parser.getObjectMap.asScala.foreach(s => logger.info(s"${s._1} -> ${s._2}"))
|
parser.getObjectMap.asScala.foreach(s => logger.info(s"${s._1} -> ${s._2}"))
|
||||||
|
|
|
@ -0,0 +1,19 @@
|
||||||
|
<configuration>
|
||||||
|
<property>
|
||||||
|
<name>jobTracker</name>
|
||||||
|
<value>yarnRM</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>nameNode</name>
|
||||||
|
<value>hdfs://nameservice1</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.use.system.libpath</name>
|
||||||
|
<value>true</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.action.sharelib.for.spark</name>
|
||||||
|
<value>spark2</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
</configuration>
|
|
@ -0,0 +1,40 @@
|
||||||
|
<workflow-app name="Transform_Pubmed_Workflow" xmlns="uri:oozie:workflow:0.5">
|
||||||
|
<parameters>
|
||||||
|
<property>
|
||||||
|
<name>baselineWorkingPath</name>
|
||||||
|
<description>the Baseline Working Path</description>
|
||||||
|
</property>
|
||||||
|
</parameters>
|
||||||
|
|
||||||
|
<start to="ConvertDataset"/>
|
||||||
|
|
||||||
|
<kill name="Kill">
|
||||||
|
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
|
</kill>
|
||||||
|
|
||||||
|
<action name="ConvertDataset">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>Convert Baseline to Dataset</name>
|
||||||
|
<class>eu.dnetlib.dhp.sx.ebi.SparkCreateBaselineDataFrame</class>
|
||||||
|
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--workingPath</arg><arg>${baselineWorkingPath}</arg>
|
||||||
|
<arg>--master</arg><arg>yarn</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="End"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<end name="End"/>
|
||||||
|
|
||||||
|
</workflow-app>
|
|
@ -0,0 +1,4 @@
|
||||||
|
[
|
||||||
|
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
|
||||||
|
{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}
|
||||||
|
]
|
|
@ -1,16 +1,24 @@
|
||||||
package eu.dnetlib.dhp.sx.ebi
|
package eu.dnetlib.dhp.sx.ebi
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper
|
||||||
|
import eu.dnetlib.dhp.sx.ebi.model.PMParser
|
||||||
import org.junit.jupiter.api.Test
|
import org.junit.jupiter.api.Test
|
||||||
|
|
||||||
|
import scala.io.Source
|
||||||
|
import scala.xml.pull.XMLEventReader
|
||||||
|
|
||||||
class TestEBI {
|
class TestEBI {
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// @Test
|
@Test
|
||||||
def testEBIData() = {
|
def testEBIData() = {
|
||||||
SparkAddLinkUpdates.main("-mt local[*] -w /home/sandro/Downloads".split(" "))
|
val inputXML = Source.fromInputStream(getClass.getResourceAsStream("pubmed.xml")).mkString
|
||||||
|
val xml = new XMLEventReader(Source.fromBytes(inputXML.getBytes()))
|
||||||
|
|
||||||
|
val mapper = new ObjectMapper()
|
||||||
|
|
||||||
|
new PMParser(xml).foreach(s =>println(mapper.writeValueAsString(s)))
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue