Implemented method to download delta updates in EBI Links

This commit is contained in:
Sandro La Bruzzo 2021-08-30 09:32:21 +02:00
parent ccf4103a25
commit e8b3cb9147
6 changed files with 269 additions and 17 deletions

View File

@ -8,14 +8,15 @@ import org.apache.http.impl.client.{HttpClientBuilder, HttpClients}
import java.io.IOException import java.io.IOException
abstract class AbstractRestClient extends Iterator[String]{
abstract class AbstractRestClient extends Iterator[String] {
var buffer: List[String] = List() var buffer: List[String] = List()
var current_index:Int = 0 var current_index: Int = 0
var scroll_value: Option[String] = None var scroll_value: Option[String] = None
var complete:Boolean = false var complete: Boolean = false
def extractInfo(input: String): Unit def extractInfo(input: String): Unit
@ -23,13 +24,13 @@ abstract class AbstractRestClient extends Iterator[String]{
protected def getBufferData(): Unit protected def getBufferData(): Unit
def doHTTPGETRequest(url:String): String = { def doHTTPGETRequest(url: String): String = {
val httpGet = new HttpGet(url) val httpGet = new HttpGet(url)
doHTTPRequest(httpGet) doHTTPRequest(httpGet)
} }
def doHTTPPOSTRequest(url:String, json:String): String = { def doHTTPPOSTRequest(url: String, json: String): String = {
val httpPost = new HttpPost(url) val httpPost = new HttpPost(url)
if (json != null) { if (json != null) {
val entity = new StringEntity(json) val entity = new StringEntity(json)
@ -46,7 +47,7 @@ abstract class AbstractRestClient extends Iterator[String]{
override def next(): String = { override def next(): String = {
val next_item:String = buffer(current_index) val next_item: String = buffer(current_index)
current_index = current_index + 1 current_index = current_index + 1
if (current_index == buffer.size) if (current_index == buffer.size)
getBufferData() getBufferData()
@ -54,17 +55,16 @@ abstract class AbstractRestClient extends Iterator[String]{
} }
private def doHTTPRequest[A <: HttpUriRequest](r: A): String = {
private def doHTTPRequest[A <: HttpUriRequest](r: A) :String ={
val timeout = 60; // seconds val timeout = 60; // seconds
val config = RequestConfig.custom() val config = RequestConfig.custom()
.setConnectTimeout(timeout * 1000) .setConnectTimeout(timeout * 1000)
.setConnectionRequestTimeout(timeout * 1000) .setConnectionRequestTimeout(timeout * 1000)
.setSocketTimeout(timeout * 1000).build() .setSocketTimeout(timeout * 1000).build()
val client =HttpClientBuilder.create().setDefaultRequestConfig(config).build() val client = HttpClientBuilder.create().setDefaultRequestConfig(config).build()
var tries = 4 try {
while (tries > 0) { var tries = 4
while (tries > 0) {
println(s"requesting ${r.getURI}") println(s"requesting ${r.getURI}")
try { try {
val response = client.execute(r) val response = client.execute(r)
@ -78,10 +78,15 @@ abstract class AbstractRestClient extends Iterator[String]{
case e: Throwable => case e: Throwable =>
println(s"Error on requesting ${r.getURI}") println(s"Error on requesting ${r.getURI}")
e.printStackTrace() e.printStackTrace()
tries-=1 tries -= 1
} }
} }
"" ""
} } finally {
if (client != null)
client.close()
}
}
getBufferData() getBufferData()
} }

View File

@ -1,10 +1,10 @@
package eu.dnetlib.dhp.sx.graph.bio.pubmed package eu.dnetlib.dhp.sx.graph.bio.pubmed
import java.util.regex.Pattern
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
import eu.dnetlib.dhp.schema.common.ModelConstants import eu.dnetlib.dhp.schema.common.ModelConstants
import eu.dnetlib.dhp.schema.oaf.utils.{CleaningFunctions, GraphCleaningFunctions, IdentifierFactory, OafMapperUtils, PidType}
import eu.dnetlib.dhp.schema.oaf._ import eu.dnetlib.dhp.schema.oaf._
import eu.dnetlib.dhp.schema.oaf.utils.{GraphCleaningFunctions, IdentifierFactory, OafMapperUtils, PidType}
import java.util.regex.Pattern
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
object PubMedToOaf { object PubMedToOaf {
@ -17,7 +17,7 @@ object PubMedToOaf {
def cleanDoi(doi:String):String = { def cleanDoi(doi:String):String = {
val regex = "10.\\d{4,9}\\/[-._;()\\/:A-Z0-9]+$" val regex = "^10.\\d{4,9}\\/[\\[\\]\\-\\<\\>._;()\\/:A-Z0-9]+$"
val pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE) val pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE)

View File

@ -0,0 +1,115 @@
package eu.dnetlib.dhp.sx.graph.ebi
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.sx.graph.bio.BioDBToOAF.EBILinkItem
import eu.dnetlib.dhp.sx.graph.bio.pubmed.{PMArticle, PMAuthor, PMJournal}
import org.apache.commons.io.IOUtils
import org.apache.http.client.config.RequestConfig
import org.apache.http.client.methods.{HttpGet, HttpUriRequest}
import org.apache.http.impl.client.HttpClientBuilder
import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.functions.max
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
object SparkDownloadEBILinks {
def createEBILinks(pmid:Long):EBILinkItem = {
val res = requestLinks(pmid)
if (res!=null)
return EBILinkItem(pmid, res)
null
}
def requestLinks(PMID:Long):String = {
val r = new HttpGet(s"https://www.ebi.ac.uk/europepmc/webservices/rest/MED/$PMID/datalinks?format=json")
val timeout = 60; // seconds
val config = RequestConfig.custom()
.setConnectTimeout(timeout * 1000)
.setConnectionRequestTimeout(timeout * 1000)
.setSocketTimeout(timeout * 1000).build()
val client = HttpClientBuilder.create().setDefaultRequestConfig(config).build()
try {
var tries = 4
while (tries > 0) {
println(s"requesting ${r.getURI}")
try {
val response = client.execute(r)
println(s"get response with status${response.getStatusLine.getStatusCode}")
if (response.getStatusLine.getStatusCode > 400) {
tries -= 1
}
else
return IOUtils.toString(response.getEntity.getContent)
} catch {
case e: Throwable =>
println(s"Error on requesting ${r.getURI}")
e.printStackTrace()
tries -= 1
}
}
""
} finally {
if (client != null)
client.close()
}
}
def main(args: Array[String]): Unit = {
val log: Logger = LoggerFactory.getLogger(getClass)
val MAX_ITEM_PER_PARTITION = 20000
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/ebi/ebi_download_update.json")))
parser.parseArgument(args)
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.appName(SparkEBILinksToOaf.getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
import spark.implicits._
implicit val PMEncoder: Encoder[PMArticle] = Encoders.kryo(classOf[PMArticle])
implicit val PMJEncoder: Encoder[PMJournal] = Encoders.kryo(classOf[PMJournal])
implicit val PMAEncoder: Encoder[PMAuthor] = Encoders.kryo(classOf[PMAuthor])
val sourcePath = parser.get("sourcePath")
log.info(s"sourcePath -> $sourcePath")
val workingPath = parser.get("workingPath")
log.info(s"workingPath -> $workingPath")
log.info("Getting max pubmedId where the links have been requested")
val links:Dataset[EBILinkItem] = spark.read.load(s"$sourcePath/ebi_links_dataset").as[EBILinkItem]
val lastPMIDRequested =links.map(l => l.id).select(max("value")).first.getLong(0)
log.info("Retrieving PMID to request links")
val pubmed = spark.read.load(s"$sourcePath/baseline_dataset").as[PMArticle]
pubmed.map(p => p.getPmid.toLong).where(s"value > $lastPMIDRequested").write.mode(SaveMode.Overwrite).save(s"$workingPath/id_to_request")
val pmidToReq:Dataset[Long] = spark.read.load(s"$workingPath/id_to_request").as[Long]
val total = pmidToReq.count()
spark.createDataset(pmidToReq.rdd.repartition((total/MAX_ITEM_PER_PARTITION).toInt).map(pmid =>createEBILinks(pmid)).filter(l => l!= null)).write.mode(SaveMode.Overwrite).save(s"$workingPath/links_update")
val updates:Dataset[EBILinkItem] =spark.read.load(s"$workingPath/links_update").as[EBILinkItem]
links.union(updates).groupByKey(_.id)
.reduceGroups{(x,y) =>
if (x == null || x.links ==null)
y
if (y ==null || y.links ==null)
x
if (x.links.length > y.links.length)
x
else
y
}.map(_._2).write.mode(SaveMode.Overwrite).save(s"$workingPath/links_final")
}
}

View File

@ -0,0 +1,5 @@
[
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"s", "paramLongName":"sourcePath","paramDescription": "the source Path", "paramRequired": true},
{"paramName":"w", "paramLongName":"workingPath","paramDescription": "the working path ", "paramRequired": true}
]

View File

@ -0,0 +1,68 @@
<configuration>
<!-- OCEAN -->
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>hive_metastore_uris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
</property>
<!-- GARR -->
<!-- <property>-->
<!-- <name>jobTracker</name>-->
<!-- <value>yarn</value>-->
<!-- </property>-->
<!-- <property>-->
<!-- <name>nameNode</name>-->
<!-- <value>hdfs://hadoop-rm1.garr-pa1.d4science.org:8020</value>-->
<!-- </property>-->
<!-- <property>-->
<!-- <name>hive_metastore_uris</name>-->
<!-- <value>thrift://hadoop-edge3.garr-pa1.d4science.org:9083</value>-->
<!-- </property>-->
<!-- <property>-->
<!-- <name>spark2YarnHistoryServerAddress</name>-->
<!-- <value>http://hadoop-rm2.garr-pa1.d4science.org:19888</value>-->
<!-- </property>-->
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>spark2EventLogDir</name>
<value>/user/spark/spark2ApplicationHistory</value>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>"com.cloudera.spark.lineage.NavigatorAppListener"</value>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>"com.cloudera.spark.lineage.NavigatorQueryListener"</value>
</property>
</configuration>

View File

@ -0,0 +1,59 @@
<workflow-app name="Create EBI Dataset" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>
<description>the Working Path</description>
</property>
<property>
<name>workingPath</name>
<description>the Working Path</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
</parameters>
<start to="DownloadEBILinks"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="DownloadEBILinks">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Incremental Download EBI Links</name>
<class>eu.dnetlib.dhp.sx.graph.ebi.SparkDownloadEBILinks</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.shuffle.partitions=2000
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--master</arg><arg>yarn</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>