diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/AbstractRestClient.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/AbstractRestClient.scala index 823187afe..92a870e37 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/AbstractRestClient.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/AbstractRestClient.scala @@ -64,26 +64,24 @@ abstract class AbstractRestClient extends Iterator[String]{ .setSocketTimeout(timeout * 1000).build() val client =HttpClientBuilder.create().setDefaultRequestConfig(config).build() var tries = 4 - try { - while (tries > 0) { + while (tries > 0) { println(s"requesting ${r.getURI}") - val response = client.execute(r) - println(s"get response with status${response.getStatusLine.getStatusCode}") - if (response.getStatusLine.getStatusCode > 400) { - tries -= 1 + try { + val response = client.execute(r) + println(s"get response with status${response.getStatusLine.getStatusCode}") + if (response.getStatusLine.getStatusCode > 400) { + tries -= 1 + } + else + return IOUtils.toString(response.getEntity.getContent) + } catch { + case e: Throwable => + println(s"Error on requesting ${r.getURI}") + e.printStackTrace() + tries-=1 } - else - return IOUtils.toString(response.getEntity.getContent) } "" - } catch { - case e: Throwable => - throw new RuntimeException("Error on executing request ", e) - } finally try client.close() - catch { - case e: IOException => - throw new RuntimeException("Unable to close client ", e) - } - } + } getBufferData() } \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/pom.xml b/dhp-workflows/dhp-graph-provision/pom.xml index c279436d7..e402d0600 100644 --- a/dhp-workflows/dhp-graph-provision/pom.xml +++ b/dhp-workflows/dhp-graph-provision/pom.xml @@ -9,6 +9,41 @@ dhp-graph-provision + + + + net.alchim31.maven + scala-maven-plugin + 4.0.1 + + + scala-compile-first + initialize + + add-source + compile + + + + scala-test-compile + process-test-resources + + testCompile + + + + + + -Xmax-classfile-name + 200 + + ${scala.version} + + + + + + diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkCreateActionset.scala b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkCreateActionset.scala index 6f0cdcf8a..faf386d25 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkCreateActionset.scala +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/sx/provision/SparkCreateActionset.scala @@ -43,7 +43,7 @@ object SparkCreateActionset { val relation = spark.read.load(s"$sourcePath/relation").as[Relation] relation.filter(r => (r.getDataInfo== null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase.contains("merge")) - .flatMap(r => List(r.getSource,r.getTarget)).distinct().write.save(s"$workingDirFolder/id_relation") + .flatMap(r => List(r.getSource,r.getTarget)).distinct().write.mode(SaveMode.Overwrite).save(s"$workingDirFolder/id_relation") val idRelation = spark.read.load(s"$workingDirFolder/id_relation").as[String] @@ -56,35 +56,18 @@ object SparkCreateActionset { relation.filter(r => (r.getDataInfo== null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase.contains("merge")) .write.mode(SaveMode.Overwrite).save(s"$workingDirFolder/actionSetOaf") - log.info("saving publication") + log.info("saving entities") - val publication:Dataset[(String, Result)] = spark.read.load(s"$sourcePath/publication").as[Result].map(p => (p.getId, p)) + val entities:Dataset[(String, Result)] = spark.read.load(s"$sourcePath/entities/*").as[Result].map(p => (p.getId, p))(Encoders.tuple(Encoders.STRING, resultEncoders)) - publication - .joinWith(idRelation, publication("_1").equalTo(idRelation("value"))) + + entities.filter(r => r.isInstanceOf[Result]).map(r => r.asInstanceOf[Result]) + entities + .joinWith(idRelation, entities("_1").equalTo(idRelation("value"))) .map(p => p._1._2) .write.mode(SaveMode.Append).save(s"$workingDirFolder/actionSetOaf") - log.info("saving dataset") - val dataset:Dataset[(String, Result)] = spark.read.load(s"$sourcePath/dataset").as[Result].map(p => (p.getId, p)) - dataset - .joinWith(idRelation, publication("_1").equalTo(idRelation("value"))) - .map(p => p._1._2) - .write.mode(SaveMode.Append).save(s"$workingDirFolder/actionSetOaf") - log.info("saving software") - val software:Dataset[(String, Result)] = spark.read.load(s"$sourcePath/software").as[Result].map(p => (p.getId, p)) - software - .joinWith(idRelation, publication("_1").equalTo(idRelation("value"))) - .map(p => p._1._2) - .write.mode(SaveMode.Append).save(s"$workingDirFolder/actionSetOaf") - - log.info("saving Other Research product") - val orp:Dataset[(String, Result)] = spark.read.load(s"$sourcePath/otherresearchproduct").as[Result].map(p => (p.getId, p)) - orp - .joinWith(idRelation, publication("_1").equalTo(idRelation("value"))) - .map(p => p._1._2) - .write.mode(SaveMode.Append).save(s"$workingDirFolder/actionSetOaf") } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/oozie_app/workflow.xml index ef86a1772..7c4b3dd26 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/sx/actionset/oozie_app/workflow.xml @@ -14,7 +14,7 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] @@ -26,7 +26,7 @@ cluster Create Action Set eu.dnetlib.dhp.sx.provision.SparkCreateActionset - dhp-aggregation-${projectVersion}.jar + dhp-graph-provision-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} @@ -42,7 +42,7 @@ --workingDirFolder${workingDirFolder} --masteryarn-cluster - + @@ -53,7 +53,7 @@ cluster Save Action Set eu.dnetlib.dhp.sx.provision.SparkSaveActionSet - dhp-aggregation-${projectVersion}.jar + dhp-graph-provision-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} --executor-cores=${sparkExecutorCores}