This commit is contained in:
Claudio Atzori 2020-07-30 16:25:06 +02:00
commit 4bbfcf1ac6
1 changed files with 15 additions and 5 deletions

View File

@ -41,28 +41,37 @@ object SparkSplitOafTODLIEntities {
.getOrCreate() .getOrCreate()
val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/OAFDataset").as[Oaf]
val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/input/OAFDataset").as[Oaf]
val ebi_dataset:Dataset[DLIDataset] = spark.read.load(s"$workingPath/ebi/baseline_dataset_ebi").as[DLIDataset]
val ebi_publication:Dataset[DLIPublication] = spark.read.load(s"$workingPath/ebi/baseline_publication_ebi").as[DLIPublication]
val ebi_relation:Dataset[DLIRelation] = spark.read.load(s"$workingPath/ebi/baseline_relation_ebi").as[DLIRelation]
OAFDataset OAFDataset
.filter(s => s != null && s.isInstanceOf[DLIPublication]) .filter(s => s != null && s.isInstanceOf[DLIPublication])
.map(s =>s.asInstanceOf[DLIPublication]) .map(s =>s.asInstanceOf[DLIPublication])
.union(ebi_publication)
.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, pubEncoder)) .map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, pubEncoder))
.groupByKey(_._1)(Encoders.STRING) .groupByKey(_._1)(Encoders.STRING)
.agg(EBIAggregator.getDLIPublicationAggregator().toColumn) .agg(EBIAggregator.getDLIPublicationAggregator().toColumn)
.map(p => p._2) .map(p => p._2)
.repartition(1000) .repartition(1000)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/publication") .write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/publication")
OAFDataset OAFDataset
.filter(s => s != null && s.isInstanceOf[DLIDataset]) .filter(s => s != null && s.isInstanceOf[DLIDataset])
.map(s =>s.asInstanceOf[DLIDataset]) .map(s =>s.asInstanceOf[DLIDataset])
.union(ebi_dataset)
.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, datEncoder)) .map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, datEncoder))
.groupByKey(_._1)(Encoders.STRING) .groupByKey(_._1)(Encoders.STRING)
.agg(EBIAggregator.getDLIDatasetAggregator().toColumn) .agg(EBIAggregator.getDLIDatasetAggregator().toColumn)
.map(p => p._2) .map(p => p._2)
.repartition(1000) .repartition(1000)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/dataset") .write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/dataset")
OAFDataset OAFDataset
@ -73,18 +82,19 @@ object SparkSplitOafTODLIEntities {
.agg(EBIAggregator.getDLIUnknownAggregator().toColumn) .agg(EBIAggregator.getDLIUnknownAggregator().toColumn)
.map(p => p._2) .map(p => p._2)
.repartition(1000) .repartition(1000)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/unknown") .write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/unknown")
OAFDataset OAFDataset
.filter(s => s != null && s.isInstanceOf[DLIRelation]) .filter(s => s != null && s.isInstanceOf[DLIRelation])
.map(s =>s.asInstanceOf[DLIRelation]) .map(s =>s.asInstanceOf[DLIRelation])
.union(ebi_relation)
.map(d => (getKeyRelation(d), d))(Encoders.tuple(Encoders.STRING, relEncoder)) .map(d => (getKeyRelation(d), d))(Encoders.tuple(Encoders.STRING, relEncoder))
.groupByKey(_._1)(Encoders.STRING) .groupByKey(_._1)(Encoders.STRING)
.agg(EBIAggregator.getDLIRelationAggregator().toColumn) .agg(EBIAggregator.getDLIRelationAggregator().toColumn)
.map(p => p._2) .map(p => p._2)
.repartition(1000) .repartition(1000)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/relation") .write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/relation")