forked from D-Net/dnet-hadoop
Implemented new method for update baseline inside scala node
This commit is contained in:
parent
b84e0cabeb
commit
2557bb41f5
|
@ -6,18 +6,130 @@ import eu.dnetlib.dhp.schema.oaf.Result
|
||||||
import eu.dnetlib.dhp.sx.graph.bio.pubmed.{PMArticle, PMAuthor, PMJournal, PMParser, PubMedToOaf}
|
import eu.dnetlib.dhp.sx.graph.bio.pubmed.{PMArticle, PMAuthor, PMJournal, PMParser, PubMedToOaf}
|
||||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory
|
import eu.dnetlib.dhp.utils.ISLookupClientFactory
|
||||||
import org.apache.commons.io.IOUtils
|
import org.apache.commons.io.IOUtils
|
||||||
|
import org.apache.hadoop.conf.Configuration
|
||||||
|
import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem, Path}
|
||||||
|
import org.apache.http.client.config.RequestConfig
|
||||||
|
import org.apache.http.client.methods.HttpGet
|
||||||
|
import org.apache.http.impl.client.HttpClientBuilder
|
||||||
import org.apache.spark.SparkConf
|
import org.apache.spark.SparkConf
|
||||||
import org.apache.spark.rdd.RDD
|
import org.apache.spark.rdd.RDD
|
||||||
import org.apache.spark.sql.expressions.Aggregator
|
import org.apache.spark.sql.expressions.Aggregator
|
||||||
import org.apache.spark.sql._
|
import org.apache.spark.sql._
|
||||||
import org.slf4j.{Logger, LoggerFactory}
|
import org.slf4j.{Logger, LoggerFactory}
|
||||||
|
|
||||||
|
import java.io.InputStream
|
||||||
import scala.io.Source
|
import scala.io.Source
|
||||||
import scala.xml.pull.XMLEventReader
|
import scala.xml.pull.XMLEventReader
|
||||||
|
|
||||||
object SparkCreateBaselineDataFrame {
|
object SparkCreateBaselineDataFrame {
|
||||||
|
|
||||||
|
|
||||||
|
def requestBaseLineUpdatePage(maxFile:String):List[(String,String)] = {
|
||||||
|
val data =requestPage("https://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/")
|
||||||
|
|
||||||
|
val result =data.lines.filter(l => l.startsWith("<a href=")).map{l =>
|
||||||
|
val end = l.lastIndexOf("\">")
|
||||||
|
val start = l.indexOf("<a href=\"")
|
||||||
|
|
||||||
|
if (start>= 0 && end >start)
|
||||||
|
l.substring(start+9, (end-start))
|
||||||
|
else
|
||||||
|
""
|
||||||
|
}.filter(s =>s.endsWith(".gz") ).filter(s => s > maxFile).map(s => (s,s"https://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/$s")).toList
|
||||||
|
|
||||||
|
result
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def downloadBaselinePart(url:String):InputStream = {
|
||||||
|
val r = new HttpGet(url)
|
||||||
|
val timeout = 60; // seconds
|
||||||
|
val config = RequestConfig.custom()
|
||||||
|
.setConnectTimeout(timeout * 1000)
|
||||||
|
.setConnectionRequestTimeout(timeout * 1000)
|
||||||
|
.setSocketTimeout(timeout * 1000).build()
|
||||||
|
val client = HttpClientBuilder.create().setDefaultRequestConfig(config).build()
|
||||||
|
val response = client.execute(r)
|
||||||
|
println(s"get response with status${response.getStatusLine.getStatusCode}")
|
||||||
|
response.getEntity.getContent
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
def requestPage(url:String):String = {
|
||||||
|
val r = new HttpGet(url)
|
||||||
|
val timeout = 60; // seconds
|
||||||
|
val config = RequestConfig.custom()
|
||||||
|
.setConnectTimeout(timeout * 1000)
|
||||||
|
.setConnectionRequestTimeout(timeout * 1000)
|
||||||
|
.setSocketTimeout(timeout * 1000).build()
|
||||||
|
val client = HttpClientBuilder.create().setDefaultRequestConfig(config).build()
|
||||||
|
try {
|
||||||
|
var tries = 4
|
||||||
|
while (tries > 0) {
|
||||||
|
println(s"requesting ${r.getURI}")
|
||||||
|
try {
|
||||||
|
val response = client.execute(r)
|
||||||
|
println(s"get response with status${response.getStatusLine.getStatusCode}")
|
||||||
|
if (response.getStatusLine.getStatusCode > 400) {
|
||||||
|
tries -= 1
|
||||||
|
}
|
||||||
|
else
|
||||||
|
return IOUtils.toString(response.getEntity.getContent)
|
||||||
|
} catch {
|
||||||
|
case e: Throwable =>
|
||||||
|
println(s"Error on requesting ${r.getURI}")
|
||||||
|
e.printStackTrace()
|
||||||
|
tries -= 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
""
|
||||||
|
} finally {
|
||||||
|
if (client != null)
|
||||||
|
client.close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def downloadBaseLineUpdate(baselinePath:String, hdfsServerUri:String ):Unit = {
|
||||||
|
|
||||||
|
|
||||||
|
val conf = new Configuration
|
||||||
|
conf.set("fs.defaultFS", hdfsServerUri)
|
||||||
|
val fs = FileSystem.get(conf)
|
||||||
|
val p = new Path(baselinePath)
|
||||||
|
val files = fs.listFiles(p,false)
|
||||||
|
var max_file = ""
|
||||||
|
while (files.hasNext) {
|
||||||
|
val c = files.next()
|
||||||
|
val data = c.getPath.toString
|
||||||
|
val fileName = data.substring(data.lastIndexOf("/")+1)
|
||||||
|
|
||||||
|
if (fileName> max_file)
|
||||||
|
max_file = fileName
|
||||||
|
}
|
||||||
|
|
||||||
|
val files_to_download = requestBaseLineUpdatePage(max_file)
|
||||||
|
|
||||||
|
files_to_download.foreach { u =>
|
||||||
|
val hdfsWritePath: Path = new Path(s"$baselinePath/${u._1}")
|
||||||
|
val fsDataOutputStream: FSDataOutputStream = fs.create(hdfsWritePath, true)
|
||||||
|
val i = downloadBaselinePart(u._2)
|
||||||
|
val buffer = Array.fill[Byte](1024)(0)
|
||||||
|
while(i.read(buffer)>0) {
|
||||||
|
fsDataOutputStream.write(buffer)
|
||||||
|
}
|
||||||
|
i.close()
|
||||||
|
println(s"Downloaded ${u._2} into $baselinePath/${u._1}")
|
||||||
|
fsDataOutputStream.close()
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
val pmArticleAggregator: Aggregator[(String, PMArticle), PMArticle, PMArticle] = new Aggregator[(String, PMArticle), PMArticle, PMArticle] with Serializable {
|
val pmArticleAggregator: Aggregator[(String, PMArticle), PMArticle, PMArticle] = new Aggregator[(String, PMArticle), PMArticle, PMArticle] with Serializable {
|
||||||
override def zero: PMArticle = new PMArticle
|
override def zero: PMArticle = new PMArticle
|
||||||
|
|
||||||
|
@ -51,6 +163,10 @@ object SparkCreateBaselineDataFrame {
|
||||||
val targetPath = parser.get("targetPath")
|
val targetPath = parser.get("targetPath")
|
||||||
log.info("targetPath: {}", targetPath)
|
log.info("targetPath: {}", targetPath)
|
||||||
|
|
||||||
|
val hdfsServerUri = parser.get("hdfsServerUri")
|
||||||
|
log.info("hdfsServerUri: {}", targetPath)
|
||||||
|
|
||||||
|
|
||||||
val isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl)
|
val isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl)
|
||||||
val vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService)
|
val vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService)
|
||||||
val spark: SparkSession =
|
val spark: SparkSession =
|
||||||
|
@ -61,16 +177,15 @@ object SparkCreateBaselineDataFrame {
|
||||||
.master(parser.get("master")).getOrCreate()
|
.master(parser.get("master")).getOrCreate()
|
||||||
import spark.implicits._
|
import spark.implicits._
|
||||||
|
|
||||||
|
|
||||||
val sc = spark.sparkContext
|
val sc = spark.sparkContext
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
implicit val PMEncoder: Encoder[PMArticle] = Encoders.kryo(classOf[PMArticle])
|
implicit val PMEncoder: Encoder[PMArticle] = Encoders.kryo(classOf[PMArticle])
|
||||||
implicit val PMJEncoder: Encoder[PMJournal] = Encoders.kryo(classOf[PMJournal])
|
implicit val PMJEncoder: Encoder[PMJournal] = Encoders.kryo(classOf[PMJournal])
|
||||||
implicit val PMAEncoder: Encoder[PMAuthor] = Encoders.kryo(classOf[PMAuthor])
|
implicit val PMAEncoder: Encoder[PMAuthor] = Encoders.kryo(classOf[PMAuthor])
|
||||||
implicit val resultEncoder: Encoder[Result] = Encoders.kryo(classOf[Result])
|
implicit val resultEncoder: Encoder[Result] = Encoders.kryo(classOf[Result])
|
||||||
|
|
||||||
|
downloadBaseLineUpdate(s"$workingPath/baseline", hdfsServerUri)
|
||||||
|
|
||||||
val k: RDD[(String, String)] = sc.wholeTextFiles(s"$workingPath/baseline",2000)
|
val k: RDD[(String, String)] = sc.wholeTextFiles(s"$workingPath/baseline",2000)
|
||||||
val ds:Dataset[PMArticle] = spark.createDataset(k.filter(i => i._1.endsWith(".gz")).flatMap(i =>{
|
val ds:Dataset[PMArticle] = spark.createDataset(k.filter(i => i._1.endsWith(".gz")).flatMap(i =>{
|
||||||
val xml = new XMLEventReader(Source.fromBytes(i._2.getBytes()))
|
val xml = new XMLEventReader(Source.fromBytes(i._2.getBytes()))
|
||||||
|
@ -87,7 +202,5 @@ object SparkCreateBaselineDataFrame {
|
||||||
.map(a => PubMedToOaf.convert(a, vocabularies)).as[Result]
|
.map(a => PubMedToOaf.convert(a, vocabularies)).as[Result]
|
||||||
.filter(p => p!= null)
|
.filter(p => p!= null)
|
||||||
.write.mode(SaveMode.Overwrite).save(targetPath)
|
.write.mode(SaveMode.Overwrite).save(targetPath)
|
||||||
|
|
||||||
//s"$workingPath/oaf/baseline_oaf"
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,20 +4,16 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||||
import eu.dnetlib.dhp.sx.graph.bio.BioDBToOAF.EBILinkItem
|
import eu.dnetlib.dhp.sx.graph.bio.BioDBToOAF.EBILinkItem
|
||||||
import eu.dnetlib.dhp.sx.graph.bio.pubmed.{PMArticle, PMAuthor, PMJournal}
|
import eu.dnetlib.dhp.sx.graph.bio.pubmed.{PMArticle, PMAuthor, PMJournal}
|
||||||
import org.apache.commons.io.IOUtils
|
import org.apache.commons.io.IOUtils
|
||||||
import org.apache.hadoop.conf.Configuration
|
|
||||||
import org.apache.hadoop.fs.{FileSystem, Path}
|
|
||||||
import org.apache.http.client.config.RequestConfig
|
import org.apache.http.client.config.RequestConfig
|
||||||
import org.apache.http.client.methods.{HttpGet, HttpUriRequest}
|
import org.apache.http.client.methods.HttpGet
|
||||||
import org.apache.http.impl.client.HttpClientBuilder
|
import org.apache.http.impl.client.HttpClientBuilder
|
||||||
import org.apache.spark.SparkConf
|
import org.apache.spark.SparkConf
|
||||||
import org.apache.spark.sql.expressions.Aggregator
|
|
||||||
import org.apache.spark.sql.functions.max
|
import org.apache.spark.sql.functions.max
|
||||||
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
|
import org.apache.spark.sql._
|
||||||
import org.slf4j.{Logger, LoggerFactory}
|
import org.slf4j.{Logger, LoggerFactory}
|
||||||
|
|
||||||
object SparkDownloadEBILinks {
|
object SparkDownloadEBILinks {
|
||||||
|
|
||||||
|
|
||||||
def createEBILinks(pmid:Long):EBILinkItem = {
|
def createEBILinks(pmid:Long):EBILinkItem = {
|
||||||
|
|
||||||
val res = requestLinks(pmid)
|
val res = requestLinks(pmid)
|
||||||
|
@ -26,7 +22,6 @@ object SparkDownloadEBILinks {
|
||||||
null
|
null
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def requestPage(url:String):String = {
|
def requestPage(url:String):String = {
|
||||||
val r = new HttpGet(url)
|
val r = new HttpGet(url)
|
||||||
val timeout = 60; // seconds
|
val timeout = 60; // seconds
|
||||||
|
@ -61,42 +56,6 @@ object SparkDownloadEBILinks {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def requestBaseLineUpdatePage():List[String] = {
|
|
||||||
val data =requestPage("https://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/")
|
|
||||||
|
|
||||||
val result =data.lines.filter(l => l.startsWith("<a href=")).map{l =>
|
|
||||||
val end = l.lastIndexOf("\">")
|
|
||||||
val start = l.indexOf("<a href=\"")
|
|
||||||
|
|
||||||
if (start>= 0 && end >start)
|
|
||||||
l.substring(start+9, (end-start))
|
|
||||||
else
|
|
||||||
""
|
|
||||||
}.filter(s =>s.endsWith(".gz") ).map(s => s"https://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/$s").toList
|
|
||||||
|
|
||||||
result
|
|
||||||
}
|
|
||||||
|
|
||||||
def downloadBaseLineUpdate(baselinePath:String, hdfsServerUri:String ):Unit = {
|
|
||||||
|
|
||||||
|
|
||||||
val conf = new Configuration
|
|
||||||
conf.set("fs.defaultFS", hdfsServerUri)
|
|
||||||
val fs = FileSystem.get(conf)
|
|
||||||
val p = new Path((baselinePath))
|
|
||||||
val files = fs.listFiles(p,false)
|
|
||||||
|
|
||||||
while (files.hasNext) {
|
|
||||||
val c = files.next()
|
|
||||||
c.getPath
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def requestLinks(PMID:Long):String = {
|
def requestLinks(PMID:Long):String = {
|
||||||
requestPage(s"https://www.ebi.ac.uk/europepmc/webservices/rest/MED/$PMID/datalinks?format=json")
|
requestPage(s"https://www.ebi.ac.uk/europepmc/webservices/rest/MED/$PMID/datalinks?format=json")
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
[
|
[
|
||||||
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
|
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
|
||||||
{"paramName":"i", "paramLongName":"isLookupUrl","paramDescription": "isLookupUrl", "paramRequired": true},
|
{"paramName":"i", "paramLongName":"isLookupUrl", "paramDescription": "isLookupUrl", "paramRequired": true},
|
||||||
{"paramName":"w", "paramLongName":"workingPath","paramDescription": "the path of the sequencial file to read", "paramRequired": true},
|
{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true},
|
||||||
{"paramName":"t", "paramLongName":"targetPath","paramDescription": "the oaf path ", "paramRequired": true}
|
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the oaf path ", "paramRequired": true},
|
||||||
|
{"paramName":"h", "paramLongName":"hdfsServerUri", "paramDescription": "the working path ", "paramRequired": true}
|
||||||
]
|
]
|
|
@ -1,5 +1,5 @@
|
||||||
[
|
[
|
||||||
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
|
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
|
||||||
{"paramName":"s", "paramLongName":"sourcePath","paramDescription": "the source Path", "paramRequired": true},
|
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the source Path", "paramRequired": true},
|
||||||
{"paramName":"w", "paramLongName":"workingPath","paramDescription": "the working path ", "paramRequired": true}
|
{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the working path ", "paramRequired": true}
|
||||||
]
|
]
|
|
@ -25,7 +25,6 @@
|
||||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
</kill>
|
</kill>
|
||||||
|
|
||||||
|
|
||||||
<action name="GenerateBaselineDataset">
|
<action name="GenerateBaselineDataset">
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn-cluster</master>
|
<master>yarn-cluster</master>
|
||||||
|
@ -43,6 +42,7 @@
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
<arg>--master</arg><arg>yarn</arg>
|
<arg>--master</arg><arg>yarn</arg>
|
||||||
|
<arg>--hdfsServerUri</arg><arg>${nameNode}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="End"/>
|
<ok to="End"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
<workflow-app name="Create EBI Dataset" xmlns="uri:oozie:workflow:0.5">
|
<workflow-app name="Create EBI Dataset" xmlns="uri:oozie:workflow:0.5">
|
||||||
<parameters>
|
<parameters>
|
||||||
<property>
|
<property>
|
||||||
<name>sourcePath</name>
|
<name>sourcePath</name>
|
||||||
|
@ -51,9 +51,17 @@
|
||||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
<arg>--master</arg><arg>yarn</arg>
|
<arg>--master</arg><arg>yarn</arg>
|
||||||
</spark>
|
</spark>
|
||||||
|
<ok to="OverrideFolders"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
<action name="OverrideFolders">
|
||||||
|
<fs>
|
||||||
|
<delete path="${sourcePath}/ebi_links_dataset_old"/>
|
||||||
|
<move source="${sourcePath}/ebi_links_dataset" target="${sourcePath}/ebi_links_dataset_old"/>
|
||||||
|
<move source="${workingPath}/links_final" target="${sourcePath}/ebi_links_dataset"/>
|
||||||
|
</fs>
|
||||||
<ok to="End"/>
|
<ok to="End"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
<end name="End"/>
|
<end name="End"/>
|
||||||
</workflow-app>
|
</workflow-app>
|
|
@ -51,11 +51,6 @@ class BioScholixTest extends AbstractVocabularyTest{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
|
||||||
def testDownloadEBIUpdate() = {
|
|
||||||
val data = SparkDownloadEBILinks.requestBaseLineUpdatePage()
|
|
||||||
println(data)
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue