From 16c91203bd434664fdb8b8a3633fb683880039c5 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Wed, 28 Jul 2021 10:30:49 +0200 Subject: [PATCH] implemented workflow of creation action set for scholexplorer --- .../datacite/AbstractRestClient.scala | 13 +-- .../datacite/ImportDatacite.scala | 2 +- .../sx/provision/SparkCreateActionset.scala | 90 +++++++++++++++++++ .../dhp/sx/provision/SparkSaveActionSet.scala | 86 ++++++++++++++++++ .../dhp/sx/actionset/generate_actionset.json | 6 ++ .../sx/actionset/oozie_app/config-default.xml | 23 +++++ .../dhp/sx/actionset/oozie_app/workflow.xml | 76 ++++++++++++++++ .../dhp/sx/actionset/save_actionset.json | 5 ++ 8 files changed, 295 insertions(+), 6 deletions(-) create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkCreateActionset.scala create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkSaveActionSet.scala create mode 100644 dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/generate_actionset.json create mode 100644 dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/oozie_app/workflow.xml create mode 100644 dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/save_actionset.json diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/AbstractRestClient.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/AbstractRestClient.scala index 8df2032830..823187afe6 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/AbstractRestClient.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/AbstractRestClient.scala @@ -1,9 +1,10 @@ package eu.dnetlib.dhp.actionmanager.datacite import org.apache.commons.io.IOUtils +import org.apache.http.client.config.RequestConfig import org.apache.http.client.methods.{HttpGet, HttpPost, HttpRequestBase, HttpUriRequest} import org.apache.http.entity.StringEntity -import org.apache.http.impl.client.HttpClients +import org.apache.http.impl.client.{HttpClientBuilder, HttpClients} import java.io.IOException @@ -56,11 +57,15 @@ abstract class AbstractRestClient extends Iterator[String]{ private def doHTTPRequest[A <: HttpUriRequest](r: A) :String ={ - val client = HttpClients.createDefault + val timeout = 60; // seconds + val config = RequestConfig.custom() + .setConnectTimeout(timeout * 1000) + .setConnectionRequestTimeout(timeout * 1000) + .setSocketTimeout(timeout * 1000).build() + val client =HttpClientBuilder.create().setDefaultRequestConfig(config).build() var tries = 4 try { while (tries > 0) { - println(s"requesting ${r.getURI}") val response = client.execute(r) println(s"get response with status${response.getStatusLine.getStatusCode}") @@ -80,7 +85,5 @@ abstract class AbstractRestClient extends Iterator[String]{ throw new RuntimeException("Unable to close client ", e) } } - getBufferData() - } \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ImportDatacite.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ImportDatacite.scala index 931ac06f64..2b73d29559 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ImportDatacite.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ImportDatacite.scala @@ -140,7 +140,7 @@ object ImportDatacite { private def writeSequenceFile(hdfsTargetPath: Path, timestamp: Long, conf: Configuration, bs:Int): Long = { var from:Long = timestamp * 1000 - val delta:Long = 50000000L + val delta:Long = 100000000L var client: DataciteAPIImporter = null val now :Long =System.currentTimeMillis() var i = 0 diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkCreateActionset.scala b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkCreateActionset.scala new file mode 100644 index 0000000000..6f0cdcf8aa --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkCreateActionset.scala @@ -0,0 +1,90 @@ +package eu.dnetlib.dhp.sx.provision + +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation, Result} +import org.apache.spark.{SparkConf, sql} +import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} +import org.slf4j.{Logger, LoggerFactory} + +import scala.io.Source + +object SparkCreateActionset { + + def main(args: Array[String]): Unit = { + val log: Logger = LoggerFactory.getLogger(getClass) + val conf: SparkConf = new SparkConf() + val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/actionset/generate_actionset.json")).mkString) + parser.parseArgument(args) + + + val spark: SparkSession = + SparkSession + .builder() + .config(conf) + .appName(getClass.getSimpleName) + .master(parser.get("master")).getOrCreate() + + + val sourcePath = parser.get("sourcePath") + log.info(s"sourcePath -> $sourcePath") + + val targetPath = parser.get("targetPath") + log.info(s"targetPath -> $targetPath") + + val workingDirFolder = parser.get("workingDirFolder") + log.info(s"workingDirFolder -> $workingDirFolder") + + implicit val oafEncoders:Encoder[Oaf] = Encoders.kryo[Oaf] + implicit val resultEncoders:Encoder[Result] = Encoders.kryo[Result] + implicit val relationEncoders:Encoder[Relation] = Encoders.kryo[Relation] + + import spark.implicits._ + + val relation = spark.read.load(s"$sourcePath/relation").as[Relation] + + relation.filter(r => (r.getDataInfo== null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase.contains("merge")) + .flatMap(r => List(r.getSource,r.getTarget)).distinct().write.save(s"$workingDirFolder/id_relation") + + + val idRelation = spark.read.load(s"$workingDirFolder/id_relation").as[String] + + log.info("extract source and target Identifier involved in relations") + + + log.info("save relation filtered") + + relation.filter(r => (r.getDataInfo== null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase.contains("merge")) + .write.mode(SaveMode.Overwrite).save(s"$workingDirFolder/actionSetOaf") + + log.info("saving publication") + + val publication:Dataset[(String, Result)] = spark.read.load(s"$sourcePath/publication").as[Result].map(p => (p.getId, p)) + + publication + .joinWith(idRelation, publication("_1").equalTo(idRelation("value"))) + .map(p => p._1._2) + .write.mode(SaveMode.Append).save(s"$workingDirFolder/actionSetOaf") + + log.info("saving dataset") + val dataset:Dataset[(String, Result)] = spark.read.load(s"$sourcePath/dataset").as[Result].map(p => (p.getId, p)) + dataset + .joinWith(idRelation, publication("_1").equalTo(idRelation("value"))) + .map(p => p._1._2) + .write.mode(SaveMode.Append).save(s"$workingDirFolder/actionSetOaf") + + log.info("saving software") + val software:Dataset[(String, Result)] = spark.read.load(s"$sourcePath/software").as[Result].map(p => (p.getId, p)) + software + .joinWith(idRelation, publication("_1").equalTo(idRelation("value"))) + .map(p => p._1._2) + .write.mode(SaveMode.Append).save(s"$workingDirFolder/actionSetOaf") + + log.info("saving Other Research product") + val orp:Dataset[(String, Result)] = spark.read.load(s"$sourcePath/otherresearchproduct").as[Result].map(p => (p.getId, p)) + orp + .joinWith(idRelation, publication("_1").equalTo(idRelation("value"))) + .map(p => p._1._2) + .write.mode(SaveMode.Append).save(s"$workingDirFolder/actionSetOaf") + } + +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkSaveActionSet.scala b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkSaveActionSet.scala new file mode 100644 index 0000000000..d1d0b84242 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkSaveActionSet.scala @@ -0,0 +1,86 @@ +package eu.dnetlib.dhp.sx.provision + +import com.fasterxml.jackson.databind.ObjectMapper +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.schema.action.AtomicAction +import eu.dnetlib.dhp.schema.oaf.{Oaf, OtherResearchProduct, Publication, Relation, Software, Dataset => OafDataset} +import org.apache.hadoop.io.Text +import org.apache.hadoop.io.compress.GzipCodec +import org.apache.hadoop.mapred.SequenceFileOutputFormat +import org.apache.spark.SparkConf +import org.apache.spark.sql.{Encoder, Encoders, SparkSession} +import org.slf4j.{Logger, LoggerFactory} + +import scala.io.Source + +object SparkSaveActionSet { + + + def toActionSet(item: Oaf): (String, String) = { + val mapper = new ObjectMapper() + + item match { + case dataset: OafDataset => + val a: AtomicAction[OafDataset] = new AtomicAction[OafDataset] + a.setClazz(classOf[OafDataset]) + a.setPayload(dataset) + (dataset.getClass.getCanonicalName, mapper.writeValueAsString(a)) + case publication: Publication => + val a: AtomicAction[Publication] = new AtomicAction[Publication] + a.setClazz(classOf[Publication]) + a.setPayload(publication) + (publication.getClass.getCanonicalName, mapper.writeValueAsString(a)) + case software: Software => + val a: AtomicAction[Software] = new AtomicAction[Software] + a.setClazz(classOf[Software]) + a.setPayload(software) + (software.getClass.getCanonicalName, mapper.writeValueAsString(a)) + case orp: OtherResearchProduct => + val a: AtomicAction[OtherResearchProduct] = new AtomicAction[OtherResearchProduct] + a.setClazz(classOf[OtherResearchProduct]) + a.setPayload(orp) + (orp.getClass.getCanonicalName, mapper.writeValueAsString(a)) + + case relation: Relation => + val a: AtomicAction[Relation] = new AtomicAction[Relation] + a.setClazz(classOf[Relation]) + a.setPayload(relation) + (relation.getClass.getCanonicalName, mapper.writeValueAsString(a)) + case _ => + null + } + + } + + def main(args: Array[String]): Unit = { + val log: Logger = LoggerFactory.getLogger(getClass) + val conf: SparkConf = new SparkConf() + val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/actionset/save_actionset.json")).mkString) + parser.parseArgument(args) + + + val spark: SparkSession = + SparkSession + .builder() + .config(conf) + .appName(getClass.getSimpleName) + .master(parser.get("master")).getOrCreate() + + + val sourcePath = parser.get("sourcePath") + log.info(s"sourcePath -> $sourcePath") + + val targetPath = parser.get("targetPath") + log.info(s"targetPath -> $targetPath") + + implicit val oafEncoders:Encoder[Oaf] = Encoders.kryo[Oaf] + implicit val tEncoder:Encoder[(String,String)] = Encoders.tuple(Encoders.STRING,Encoders.STRING) + + spark.read.load(sourcePath).as[Oaf] + .map(o =>toActionSet(o)) + .filter(o => o!= null) + .rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$targetPath", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec]) + + } + +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/generate_actionset.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/generate_actionset.json new file mode 100644 index 0000000000..0563808ea8 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/generate_actionset.json @@ -0,0 +1,6 @@ +[ + {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, + {"paramName":"s", "paramLongName":"sourcePath","paramDescription": "source path", "paramRequired": true}, + {"paramName":"w", "paramLongName":"workingDirFolder","paramDescription": "the working Dir Folder", "paramRequired": true}, + {"paramName":"t", "paramLongName":"targetPath","paramDescription": "the target path ", "paramRequired": true} +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/oozie_app/config-default.xml new file mode 100644 index 0000000000..dd3c32c620 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/oozie_app/config-default.xml @@ -0,0 +1,23 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + + oozie.launcher.mapreduce.user.classpath.first + true + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/oozie_app/workflow.xml new file mode 100644 index 0000000000..ef86a17728 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/oozie_app/workflow.xml @@ -0,0 +1,76 @@ + + + + sourcePath + the path of the consistent graph + + + workingDirFolder + the path of working dir ActionSet + + + outputPath + the path of Scholexplorer ActionSet + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + yarn-cluster + cluster + Create Action Set + eu.dnetlib.dhp.sx.provision.SparkCreateActionset + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --sourcePath${sourcePath} + --targetPath${outputPath} + --workingDirFolder${workingDirFolder} + --masteryarn-cluster + + + + + + + + + yarn-cluster + cluster + Save Action Set + eu.dnetlib.dhp.sx.provision.SparkSaveActionSet + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --sourcePath${workingDirFolder}/actionSetOaf + --targetPath${outputPath} + --masteryarn-cluster + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/save_actionset.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/save_actionset.json new file mode 100644 index 0000000000..0264c825f0 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/save_actionset.json @@ -0,0 +1,5 @@ +[ + {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, + {"paramName":"s", "paramLongName":"sourcePath","paramDescription": "source path", "paramRequired": true}, + {"paramName":"t", "paramLongName":"targetPath","paramDescription": "the target path ", "paramRequired": true} +] \ No newline at end of file