diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ebi/SparkEBILinksToOaf.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ebi/SparkEBILinksToOaf.scala index f14e5f264..1924d919e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ebi/SparkEBILinksToOaf.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ebi/SparkEBILinksToOaf.scala @@ -32,14 +32,9 @@ object SparkEBILinksToOaf { import spark.implicits._ implicit val PMEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf]) - val ebi_rdd:Dataset[EBILinkItem] = spark.createDataset(spark.sparkContext.textFile(sourcePath).map(s => BioDBToOAF.extractEBILinksFromDump(s))).as[EBILinkItem] - - ebi_rdd.write.mode(SaveMode.Overwrite).save(s"${sourcePath}_dataset") - val ebLinks:Dataset[EBILinkItem] = spark.read.load(s"${sourcePath}_dataset").as[EBILinkItem].filter(l => l.links!= null) ebLinks.flatMap(j =>BioDBToOAF.parse_ebi_links(j.links)) - .repartition(4000) .filter(p => BioDBToOAF.EBITargetLinksFilter(p)) .flatMap(p => BioDBToOAF.convertEBILinksToOaf(p)) .write.mode(SaveMode.Overwrite).save(targetPath) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/oozie_app/workflow.xml index 3f442c5c6..1fa099ebc 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/ebi/oozie_app/workflow.xml @@ -74,7 +74,7 @@ yarn-cluster cluster - Create Baselnie DataSet + Create Baseline DataSet eu.dnetlib.dhp.sx.ebi.SparkAddLinkUpdates dhp-graph-mapper-${projectVersion}.jar