diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/scholix/SparkCreateActionset.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/scholix/SparkCreateActionset.scala index b78f411ee1..7a87861db7 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/scholix/SparkCreateActionset.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/scholix/SparkCreateActionset.scala @@ -60,14 +60,10 @@ object SparkCreateActionset { val entities: Dataset[(String, Result)] = spark.read.load(s"$sourcePath/entities/*").as[Result].map(p => (p.getId, p))(Encoders.tuple(Encoders.STRING, resultEncoders)) - - entities.filter(r => r.isInstanceOf[Result]).map(r => r.asInstanceOf[Result]) entities .joinWith(idRelation, entities("_1").equalTo(idRelation("value"))) .map(p => p._1._2) .write.mode(SaveMode.Append).save(s"$workingDirFolder/actionSetOaf") - - } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala index 97b3cdc99a..2fc9623a89 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetllib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala @@ -114,11 +114,7 @@ object SparkCreateBaselineDataFrame { val hdfsWritePath: Path = new Path(s"$baselinePath/${u._1}") val fsDataOutputStream: FSDataOutputStream = fs.create(hdfsWritePath, true) val i = downloadBaselinePart(u._2) - val buffer = Array.fill[Byte](1024)(0) - while (i.read(buffer) > 0) { - fsDataOutputStream.write(buffer) - } - i.close() + IOUtils.copy(i, fsDataOutputStream) println(s"Downloaded ${u._2} into $baselinePath/${u._1}") fsDataOutputStream.close() } @@ -182,7 +178,7 @@ object SparkCreateBaselineDataFrame { downloadBaseLineUpdate(s"$workingPath/baseline", hdfsServerUri) - val k: RDD[(String, String)] = sc.wholeTextFiles(s"$workingPath/baseline_ftp", 2000) + val k: RDD[(String, String)] = sc.wholeTextFiles(s"$workingPath/baseline", 2000) val ds: Dataset[PMArticle] = spark.createDataset(k.filter(i => i._1.endsWith(".gz")).flatMap(i => { val xml = new XMLEventReader(Source.fromBytes(i._2.getBytes())) new PMParser(xml) diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml index f5a98ba5ef..4ed6dd8bf2 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml @@ -8,6 +8,10 @@ isLookupUrl The IS lookUp service endopoint + + targetPath + The target path + @@ -22,7 +26,7 @@ cluster Convert Baseline to OAF Dataset eu.dnetllib.dhp.sx.bio.ebi.SparkCreateBaselineDataFrame - dhp-graph-mapper-${projectVersion}.jar + dhp-aggregation-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} @@ -34,7 +38,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --workingPath${baselineWorkingPath} - --targetPath${baselineWorkingPath}/transformed + --targetPath${targetPath} --masteryarn --isLookupUrl${isLookupUrl} --hdfsServerUri${nameNode}