diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSplitOafTODLIEntities.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSplitOafTODLIEntities.scala index b93dd8c3c..b36c6abef 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSplitOafTODLIEntities.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSplitOafTODLIEntities.scala @@ -41,28 +41,37 @@ object SparkSplitOafTODLIEntities { .getOrCreate() - val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/OAFDataset").as[Oaf] + + + val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/input/OAFDataset").as[Oaf] + + val ebi_dataset:Dataset[DLIDataset] = spark.read.load(s"$workingPath/ebi/baseline_dataset_ebi").as[DLIDataset] + val ebi_publication:Dataset[DLIPublication] = spark.read.load(s"$workingPath/ebi/baseline_publication_ebi").as[DLIPublication] + val ebi_relation:Dataset[DLIRelation] = spark.read.load(s"$workingPath/ebi/baseline_relation_ebi").as[DLIRelation] + OAFDataset .filter(s => s != null && s.isInstanceOf[DLIPublication]) .map(s =>s.asInstanceOf[DLIPublication]) + .union(ebi_publication) .map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, pubEncoder)) .groupByKey(_._1)(Encoders.STRING) .agg(EBIAggregator.getDLIPublicationAggregator().toColumn) .map(p => p._2) .repartition(1000) - .write.mode(SaveMode.Overwrite).save(s"$workingPath/publication") + .write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/publication") OAFDataset .filter(s => s != null && s.isInstanceOf[DLIDataset]) .map(s =>s.asInstanceOf[DLIDataset]) + .union(ebi_dataset) .map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, datEncoder)) .groupByKey(_._1)(Encoders.STRING) .agg(EBIAggregator.getDLIDatasetAggregator().toColumn) .map(p => p._2) .repartition(1000) - .write.mode(SaveMode.Overwrite).save(s"$workingPath/dataset") + .write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/dataset") OAFDataset @@ -73,18 +82,19 @@ object SparkSplitOafTODLIEntities { .agg(EBIAggregator.getDLIUnknownAggregator().toColumn) .map(p => p._2) .repartition(1000) - .write.mode(SaveMode.Overwrite).save(s"$workingPath/unknown") + .write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/unknown") OAFDataset .filter(s => s != null && s.isInstanceOf[DLIRelation]) .map(s =>s.asInstanceOf[DLIRelation]) + .union(ebi_relation) .map(d => (getKeyRelation(d), d))(Encoders.tuple(Encoders.STRING, relEncoder)) .groupByKey(_._1)(Encoders.STRING) .agg(EBIAggregator.getDLIRelationAggregator().toColumn) .map(p => p._2) .repartition(1000) - .write.mode(SaveMode.Overwrite).save(s"$workingPath/relation") + .write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/relation")