diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/AbstractRestClient.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/AbstractRestClient.scala index 92a870e37..bae41b218 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/AbstractRestClient.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/AbstractRestClient.scala @@ -8,14 +8,15 @@ import org.apache.http.impl.client.{HttpClientBuilder, HttpClients} import java.io.IOException -abstract class AbstractRestClient extends Iterator[String]{ + +abstract class AbstractRestClient extends Iterator[String] { var buffer: List[String] = List() - var current_index:Int = 0 + var current_index: Int = 0 var scroll_value: Option[String] = None - var complete:Boolean = false + var complete: Boolean = false def extractInfo(input: String): Unit @@ -23,13 +24,13 @@ abstract class AbstractRestClient extends Iterator[String]{ protected def getBufferData(): Unit - def doHTTPGETRequest(url:String): String = { + def doHTTPGETRequest(url: String): String = { val httpGet = new HttpGet(url) doHTTPRequest(httpGet) } - def doHTTPPOSTRequest(url:String, json:String): String = { + def doHTTPPOSTRequest(url: String, json: String): String = { val httpPost = new HttpPost(url) if (json != null) { val entity = new StringEntity(json) @@ -46,7 +47,7 @@ abstract class AbstractRestClient extends Iterator[String]{ override def next(): String = { - val next_item:String = buffer(current_index) + val next_item: String = buffer(current_index) current_index = current_index + 1 if (current_index == buffer.size) getBufferData() @@ -54,17 +55,16 @@ abstract class AbstractRestClient extends Iterator[String]{ } - - - private def doHTTPRequest[A <: HttpUriRequest](r: A) :String ={ + private def doHTTPRequest[A <: HttpUriRequest](r: A): String = { val timeout = 60; // seconds val config = RequestConfig.custom() .setConnectTimeout(timeout * 1000) .setConnectionRequestTimeout(timeout * 1000) .setSocketTimeout(timeout * 1000).build() - val client =HttpClientBuilder.create().setDefaultRequestConfig(config).build() - var tries = 4 - while (tries > 0) { + val client = HttpClientBuilder.create().setDefaultRequestConfig(config).build() + try { + var tries = 4 + while (tries > 0) { println(s"requesting ${r.getURI}") try { val response = client.execute(r) @@ -78,10 +78,15 @@ abstract class AbstractRestClient extends Iterator[String]{ case e: Throwable => println(s"Error on requesting ${r.getURI}") e.printStackTrace() - tries-=1 + tries -= 1 } } "" - } + } finally { + if (client != null) + client.close() + } + } + getBufferData() } \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PubMedToOaf.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PubMedToOaf.scala index 377bd902d..202eb7b14 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PubMedToOaf.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/bio/pubmed/PubMedToOaf.scala @@ -1,10 +1,10 @@ package eu.dnetlib.dhp.sx.graph.bio.pubmed -import java.util.regex.Pattern import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup import eu.dnetlib.dhp.schema.common.ModelConstants -import eu.dnetlib.dhp.schema.oaf.utils.{CleaningFunctions, GraphCleaningFunctions, IdentifierFactory, OafMapperUtils, PidType} import eu.dnetlib.dhp.schema.oaf._ +import eu.dnetlib.dhp.schema.oaf.utils.{GraphCleaningFunctions, IdentifierFactory, OafMapperUtils, PidType} +import java.util.regex.Pattern import scala.collection.JavaConverters._ object PubMedToOaf { @@ -17,7 +17,7 @@ object PubMedToOaf { def cleanDoi(doi:String):String = { - val regex = "10.\\d{4,9}\\/[-._;()\\/:A-Z0-9]+$" + val regex = "^10.\\d{4,9}\\/[\\[\\]\\-\\<\\>._;()\\/:A-Z0-9]+$" val pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ebi/SparkDownloadEBILinks.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ebi/SparkDownloadEBILinks.scala new file mode 100644 index 000000000..08e060459 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ebi/SparkDownloadEBILinks.scala @@ -0,0 +1,115 @@ +package eu.dnetlib.dhp.sx.graph.ebi + +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.sx.graph.bio.BioDBToOAF.EBILinkItem +import eu.dnetlib.dhp.sx.graph.bio.pubmed.{PMArticle, PMAuthor, PMJournal} +import org.apache.commons.io.IOUtils +import org.apache.http.client.config.RequestConfig +import org.apache.http.client.methods.{HttpGet, HttpUriRequest} +import org.apache.http.impl.client.HttpClientBuilder +import org.apache.spark.SparkConf +import org.apache.spark.sql.expressions.Aggregator +import org.apache.spark.sql.functions.max +import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} +import org.slf4j.{Logger, LoggerFactory} + +object SparkDownloadEBILinks { + + + def createEBILinks(pmid:Long):EBILinkItem = { + + val res = requestLinks(pmid) + if (res!=null) + return EBILinkItem(pmid, res) + null + } + + + def requestLinks(PMID:Long):String = { + val r = new HttpGet(s"https://www.ebi.ac.uk/europepmc/webservices/rest/MED/$PMID/datalinks?format=json") + val timeout = 60; // seconds + val config = RequestConfig.custom() + .setConnectTimeout(timeout * 1000) + .setConnectionRequestTimeout(timeout * 1000) + .setSocketTimeout(timeout * 1000).build() + val client = HttpClientBuilder.create().setDefaultRequestConfig(config).build() + try { + var tries = 4 + while (tries > 0) { + println(s"requesting ${r.getURI}") + try { + val response = client.execute(r) + println(s"get response with status${response.getStatusLine.getStatusCode}") + if (response.getStatusLine.getStatusCode > 400) { + tries -= 1 + } + else + return IOUtils.toString(response.getEntity.getContent) + } catch { + case e: Throwable => + println(s"Error on requesting ${r.getURI}") + e.printStackTrace() + tries -= 1 + } + } + "" + } finally { + if (client != null) + client.close() + } + + } + def main(args: Array[String]): Unit = { + + val log: Logger = LoggerFactory.getLogger(getClass) + val MAX_ITEM_PER_PARTITION = 20000 + val conf: SparkConf = new SparkConf() + val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/ebi/ebi_download_update.json"))) + parser.parseArgument(args) + val spark: SparkSession = + SparkSession + .builder() + .config(conf) + .appName(SparkEBILinksToOaf.getClass.getSimpleName) + .master(parser.get("master")).getOrCreate() + + import spark.implicits._ + + implicit val PMEncoder: Encoder[PMArticle] = Encoders.kryo(classOf[PMArticle]) + implicit val PMJEncoder: Encoder[PMJournal] = Encoders.kryo(classOf[PMJournal]) + implicit val PMAEncoder: Encoder[PMAuthor] = Encoders.kryo(classOf[PMAuthor]) + + val sourcePath = parser.get("sourcePath") + log.info(s"sourcePath -> $sourcePath") + val workingPath = parser.get("workingPath") + log.info(s"workingPath -> $workingPath") + + log.info("Getting max pubmedId where the links have been requested") + val links:Dataset[EBILinkItem] = spark.read.load(s"$sourcePath/ebi_links_dataset").as[EBILinkItem] + val lastPMIDRequested =links.map(l => l.id).select(max("value")).first.getLong(0) + + log.info("Retrieving PMID to request links") + val pubmed = spark.read.load(s"$sourcePath/baseline_dataset").as[PMArticle] + pubmed.map(p => p.getPmid.toLong).where(s"value > $lastPMIDRequested").write.mode(SaveMode.Overwrite).save(s"$workingPath/id_to_request") + + val pmidToReq:Dataset[Long] = spark.read.load(s"$workingPath/id_to_request").as[Long] + + val total = pmidToReq.count() + + spark.createDataset(pmidToReq.rdd.repartition((total/MAX_ITEM_PER_PARTITION).toInt).map(pmid =>createEBILinks(pmid)).filter(l => l!= null)).write.mode(SaveMode.Overwrite).save(s"$workingPath/links_update") + + val updates:Dataset[EBILinkItem] =spark.read.load(s"$workingPath/links_update").as[EBILinkItem] + + links.union(updates).groupByKey(_.id) + .reduceGroups{(x,y) => + if (x == null || x.links ==null) + y + if (y ==null || y.links ==null) + x + if (x.links.length > y.links.length) + x + else + y + }.map(_._2).write.mode(SaveMode.Overwrite).save(s"$workingPath/links_final") + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/ebi_download_update.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/ebi_download_update.json new file mode 100644 index 000000000..0ae19234a --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/ebi_download_update.json @@ -0,0 +1,5 @@ +[ + {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, + {"paramName":"s", "paramLongName":"sourcePath","paramDescription": "the source Path", "paramRequired": true}, + {"paramName":"w", "paramLongName":"workingPath","paramDescription": "the working path ", "paramRequired": true} +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/update/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/update/oozie_app/config-default.xml new file mode 100644 index 000000000..17cd6c9a3 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/update/oozie_app/config-default.xml @@ -0,0 +1,68 @@ + + + + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + hive_metastore_uris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + spark2YarnHistoryServerAddress + http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089 + + + + + + + + + + + + + + + + + + + + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + spark2EventLogDir + /user/spark/spark2ApplicationHistory + + + spark2ExtraListeners + "com.cloudera.spark.lineage.NavigatorAppListener" + + + spark2SqlQueryExecutionListeners + "com.cloudera.spark.lineage.NavigatorQueryListener" + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/update/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/update/oozie_app/workflow.xml new file mode 100644 index 000000000..1b738caed --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/update/oozie_app/workflow.xml @@ -0,0 +1,59 @@ + + + + sourcePath + the Working Path + + + workingPath + the Working Path + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + yarn-cluster + cluster + Incremental Download EBI Links + eu.dnetlib.dhp.sx.graph.ebi.SparkDownloadEBILinks + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.shuffle.partitions=2000 + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --sourcePath${sourcePath} + --workingPath${workingPath} + --masteryarn + + + + + + + \ No newline at end of file