diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/AbstractRestClient.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/AbstractRestClient.scala
index 8df203283..823187afe 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/AbstractRestClient.scala
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/AbstractRestClient.scala
@@ -1,9 +1,10 @@
package eu.dnetlib.dhp.actionmanager.datacite
import org.apache.commons.io.IOUtils
+import org.apache.http.client.config.RequestConfig
import org.apache.http.client.methods.{HttpGet, HttpPost, HttpRequestBase, HttpUriRequest}
import org.apache.http.entity.StringEntity
-import org.apache.http.impl.client.HttpClients
+import org.apache.http.impl.client.{HttpClientBuilder, HttpClients}
import java.io.IOException
@@ -56,11 +57,15 @@ abstract class AbstractRestClient extends Iterator[String]{
private def doHTTPRequest[A <: HttpUriRequest](r: A) :String ={
- val client = HttpClients.createDefault
+ val timeout = 60; // seconds
+ val config = RequestConfig.custom()
+ .setConnectTimeout(timeout * 1000)
+ .setConnectionRequestTimeout(timeout * 1000)
+ .setSocketTimeout(timeout * 1000).build()
+ val client =HttpClientBuilder.create().setDefaultRequestConfig(config).build()
var tries = 4
try {
while (tries > 0) {
-
println(s"requesting ${r.getURI}")
val response = client.execute(r)
println(s"get response with status${response.getStatusLine.getStatusCode}")
@@ -80,7 +85,5 @@ abstract class AbstractRestClient extends Iterator[String]{
throw new RuntimeException("Unable to close client ", e)
}
}
-
getBufferData()
-
}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ImportDatacite.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ImportDatacite.scala
index 931ac06f6..2b73d2955 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ImportDatacite.scala
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ImportDatacite.scala
@@ -140,7 +140,7 @@ object ImportDatacite {
private def writeSequenceFile(hdfsTargetPath: Path, timestamp: Long, conf: Configuration, bs:Int): Long = {
var from:Long = timestamp * 1000
- val delta:Long = 50000000L
+ val delta:Long = 100000000L
var client: DataciteAPIImporter = null
val now :Long =System.currentTimeMillis()
var i = 0
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkCreateActionset.scala b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkCreateActionset.scala
new file mode 100644
index 000000000..6f0cdcf8a
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkCreateActionset.scala
@@ -0,0 +1,90 @@
+package eu.dnetlib.dhp.sx.provision
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser
+import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation, Result}
+import org.apache.spark.{SparkConf, sql}
+import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.io.Source
+
+object SparkCreateActionset {
+
+ def main(args: Array[String]): Unit = {
+ val log: Logger = LoggerFactory.getLogger(getClass)
+ val conf: SparkConf = new SparkConf()
+ val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/actionset/generate_actionset.json")).mkString)
+ parser.parseArgument(args)
+
+
+ val spark: SparkSession =
+ SparkSession
+ .builder()
+ .config(conf)
+ .appName(getClass.getSimpleName)
+ .master(parser.get("master")).getOrCreate()
+
+
+ val sourcePath = parser.get("sourcePath")
+ log.info(s"sourcePath -> $sourcePath")
+
+ val targetPath = parser.get("targetPath")
+ log.info(s"targetPath -> $targetPath")
+
+ val workingDirFolder = parser.get("workingDirFolder")
+ log.info(s"workingDirFolder -> $workingDirFolder")
+
+ implicit val oafEncoders:Encoder[Oaf] = Encoders.kryo[Oaf]
+ implicit val resultEncoders:Encoder[Result] = Encoders.kryo[Result]
+ implicit val relationEncoders:Encoder[Relation] = Encoders.kryo[Relation]
+
+ import spark.implicits._
+
+ val relation = spark.read.load(s"$sourcePath/relation").as[Relation]
+
+ relation.filter(r => (r.getDataInfo== null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase.contains("merge"))
+ .flatMap(r => List(r.getSource,r.getTarget)).distinct().write.save(s"$workingDirFolder/id_relation")
+
+
+ val idRelation = spark.read.load(s"$workingDirFolder/id_relation").as[String]
+
+ log.info("extract source and target Identifier involved in relations")
+
+
+ log.info("save relation filtered")
+
+ relation.filter(r => (r.getDataInfo== null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase.contains("merge"))
+ .write.mode(SaveMode.Overwrite).save(s"$workingDirFolder/actionSetOaf")
+
+ log.info("saving publication")
+
+ val publication:Dataset[(String, Result)] = spark.read.load(s"$sourcePath/publication").as[Result].map(p => (p.getId, p))
+
+ publication
+ .joinWith(idRelation, publication("_1").equalTo(idRelation("value")))
+ .map(p => p._1._2)
+ .write.mode(SaveMode.Append).save(s"$workingDirFolder/actionSetOaf")
+
+ log.info("saving dataset")
+ val dataset:Dataset[(String, Result)] = spark.read.load(s"$sourcePath/dataset").as[Result].map(p => (p.getId, p))
+ dataset
+ .joinWith(idRelation, publication("_1").equalTo(idRelation("value")))
+ .map(p => p._1._2)
+ .write.mode(SaveMode.Append).save(s"$workingDirFolder/actionSetOaf")
+
+ log.info("saving software")
+ val software:Dataset[(String, Result)] = spark.read.load(s"$sourcePath/software").as[Result].map(p => (p.getId, p))
+ software
+ .joinWith(idRelation, publication("_1").equalTo(idRelation("value")))
+ .map(p => p._1._2)
+ .write.mode(SaveMode.Append).save(s"$workingDirFolder/actionSetOaf")
+
+ log.info("saving Other Research product")
+ val orp:Dataset[(String, Result)] = spark.read.load(s"$sourcePath/otherresearchproduct").as[Result].map(p => (p.getId, p))
+ orp
+ .joinWith(idRelation, publication("_1").equalTo(idRelation("value")))
+ .map(p => p._1._2)
+ .write.mode(SaveMode.Append).save(s"$workingDirFolder/actionSetOaf")
+ }
+
+}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkSaveActionSet.scala b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkSaveActionSet.scala
new file mode 100644
index 000000000..d1d0b8424
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkSaveActionSet.scala
@@ -0,0 +1,86 @@
+package eu.dnetlib.dhp.sx.provision
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import eu.dnetlib.dhp.application.ArgumentApplicationParser
+import eu.dnetlib.dhp.schema.action.AtomicAction
+import eu.dnetlib.dhp.schema.oaf.{Oaf, OtherResearchProduct, Publication, Relation, Software, Dataset => OafDataset}
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.io.compress.GzipCodec
+import org.apache.hadoop.mapred.SequenceFileOutputFormat
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.io.Source
+
+object SparkSaveActionSet {
+
+
+ def toActionSet(item: Oaf): (String, String) = {
+ val mapper = new ObjectMapper()
+
+ item match {
+ case dataset: OafDataset =>
+ val a: AtomicAction[OafDataset] = new AtomicAction[OafDataset]
+ a.setClazz(classOf[OafDataset])
+ a.setPayload(dataset)
+ (dataset.getClass.getCanonicalName, mapper.writeValueAsString(a))
+ case publication: Publication =>
+ val a: AtomicAction[Publication] = new AtomicAction[Publication]
+ a.setClazz(classOf[Publication])
+ a.setPayload(publication)
+ (publication.getClass.getCanonicalName, mapper.writeValueAsString(a))
+ case software: Software =>
+ val a: AtomicAction[Software] = new AtomicAction[Software]
+ a.setClazz(classOf[Software])
+ a.setPayload(software)
+ (software.getClass.getCanonicalName, mapper.writeValueAsString(a))
+ case orp: OtherResearchProduct =>
+ val a: AtomicAction[OtherResearchProduct] = new AtomicAction[OtherResearchProduct]
+ a.setClazz(classOf[OtherResearchProduct])
+ a.setPayload(orp)
+ (orp.getClass.getCanonicalName, mapper.writeValueAsString(a))
+
+ case relation: Relation =>
+ val a: AtomicAction[Relation] = new AtomicAction[Relation]
+ a.setClazz(classOf[Relation])
+ a.setPayload(relation)
+ (relation.getClass.getCanonicalName, mapper.writeValueAsString(a))
+ case _ =>
+ null
+ }
+
+ }
+
+ def main(args: Array[String]): Unit = {
+ val log: Logger = LoggerFactory.getLogger(getClass)
+ val conf: SparkConf = new SparkConf()
+ val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/actionset/save_actionset.json")).mkString)
+ parser.parseArgument(args)
+
+
+ val spark: SparkSession =
+ SparkSession
+ .builder()
+ .config(conf)
+ .appName(getClass.getSimpleName)
+ .master(parser.get("master")).getOrCreate()
+
+
+ val sourcePath = parser.get("sourcePath")
+ log.info(s"sourcePath -> $sourcePath")
+
+ val targetPath = parser.get("targetPath")
+ log.info(s"targetPath -> $targetPath")
+
+ implicit val oafEncoders:Encoder[Oaf] = Encoders.kryo[Oaf]
+ implicit val tEncoder:Encoder[(String,String)] = Encoders.tuple(Encoders.STRING,Encoders.STRING)
+
+ spark.read.load(sourcePath).as[Oaf]
+ .map(o =>toActionSet(o))
+ .filter(o => o!= null)
+ .rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$targetPath", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec])
+
+ }
+
+}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/generate_actionset.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/generate_actionset.json
new file mode 100644
index 000000000..0563808ea
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/generate_actionset.json
@@ -0,0 +1,6 @@
+[
+ {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
+ {"paramName":"s", "paramLongName":"sourcePath","paramDescription": "source path", "paramRequired": true},
+ {"paramName":"w", "paramLongName":"workingDirFolder","paramDescription": "the working Dir Folder", "paramRequired": true},
+ {"paramName":"t", "paramLongName":"targetPath","paramDescription": "the target path ", "paramRequired": true}
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/oozie_app/config-default.xml
new file mode 100644
index 000000000..dd3c32c62
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/oozie_app/config-default.xml
@@ -0,0 +1,23 @@
+
+
+ jobTracker
+ yarnRM
+
+
+ nameNode
+ hdfs://nameservice1
+
+
+ oozie.use.system.libpath
+ true
+
+
+ oozie.action.sharelib.for.spark
+ spark2
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/oozie_app/workflow.xml
new file mode 100644
index 000000000..ef86a1772
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/oozie_app/workflow.xml
@@ -0,0 +1,76 @@
+
+
+
+ sourcePath
+ the path of the consistent graph
+
+
+ workingDirFolder
+ the path of working dir ActionSet
+
+
+ outputPath
+ the path of Scholexplorer ActionSet
+
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+ yarn-cluster
+ cluster
+ Create Action Set
+ eu.dnetlib.dhp.sx.provision.SparkCreateActionset
+ dhp-aggregation-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.sql.shuffle.partitions=3840
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --sourcePath${sourcePath}
+ --targetPath${outputPath}
+ --workingDirFolder${workingDirFolder}
+ --masteryarn-cluster
+
+
+
+
+
+
+
+
+ yarn-cluster
+ cluster
+ Save Action Set
+ eu.dnetlib.dhp.sx.provision.SparkSaveActionSet
+ dhp-aggregation-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.sql.shuffle.partitions=3840
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --sourcePath${workingDirFolder}/actionSetOaf
+ --targetPath${outputPath}
+ --masteryarn-cluster
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/save_actionset.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/save_actionset.json
new file mode 100644
index 000000000..0264c825f
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/save_actionset.json
@@ -0,0 +1,5 @@
+[
+ {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
+ {"paramName":"s", "paramLongName":"sourcePath","paramDescription": "source path", "paramRequired": true},
+ {"paramName":"t", "paramLongName":"targetPath","paramDescription": "the target path ", "paramRequired": true}
+]
\ No newline at end of file