forked from D-Net/dnet-hadoop
implemented new oozie job to extract entities in a separate dataset
This commit is contained in:
parent
3010a362bc
commit
c97c8f0c44
|
@ -41,28 +41,37 @@ object SparkSplitOafTODLIEntities {
|
|||
.getOrCreate()
|
||||
|
||||
|
||||
val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/OAFDataset").as[Oaf]
|
||||
|
||||
|
||||
val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/input/OAFDataset").as[Oaf]
|
||||
|
||||
val ebi_dataset:Dataset[DLIDataset] = spark.read.load(s"$workingPath/ebi/baseline_dataset_ebi").as[DLIDataset]
|
||||
val ebi_publication:Dataset[DLIPublication] = spark.read.load(s"$workingPath/ebi/baseline_publication_ebi").as[DLIPublication]
|
||||
val ebi_relation:Dataset[DLIRelation] = spark.read.load(s"$workingPath/ebi/baseline_relation_ebi").as[DLIRelation]
|
||||
|
||||
|
||||
|
||||
OAFDataset
|
||||
.filter(s => s != null && s.isInstanceOf[DLIPublication])
|
||||
.map(s =>s.asInstanceOf[DLIPublication])
|
||||
.union(ebi_publication)
|
||||
.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, pubEncoder))
|
||||
.groupByKey(_._1)(Encoders.STRING)
|
||||
.agg(EBIAggregator.getDLIPublicationAggregator().toColumn)
|
||||
.map(p => p._2)
|
||||
.repartition(1000)
|
||||
.write.mode(SaveMode.Overwrite).save(s"$workingPath/publication")
|
||||
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/publication")
|
||||
|
||||
OAFDataset
|
||||
.filter(s => s != null && s.isInstanceOf[DLIDataset])
|
||||
.map(s =>s.asInstanceOf[DLIDataset])
|
||||
.union(ebi_dataset)
|
||||
.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, datEncoder))
|
||||
.groupByKey(_._1)(Encoders.STRING)
|
||||
.agg(EBIAggregator.getDLIDatasetAggregator().toColumn)
|
||||
.map(p => p._2)
|
||||
.repartition(1000)
|
||||
.write.mode(SaveMode.Overwrite).save(s"$workingPath/dataset")
|
||||
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/dataset")
|
||||
|
||||
|
||||
OAFDataset
|
||||
|
@ -73,18 +82,19 @@ object SparkSplitOafTODLIEntities {
|
|||
.agg(EBIAggregator.getDLIUnknownAggregator().toColumn)
|
||||
.map(p => p._2)
|
||||
.repartition(1000)
|
||||
.write.mode(SaveMode.Overwrite).save(s"$workingPath/unknown")
|
||||
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/unknown")
|
||||
|
||||
|
||||
OAFDataset
|
||||
.filter(s => s != null && s.isInstanceOf[DLIRelation])
|
||||
.map(s =>s.asInstanceOf[DLIRelation])
|
||||
.union(ebi_relation)
|
||||
.map(d => (getKeyRelation(d), d))(Encoders.tuple(Encoders.STRING, relEncoder))
|
||||
.groupByKey(_._1)(Encoders.STRING)
|
||||
.agg(EBIAggregator.getDLIRelationAggregator().toColumn)
|
||||
.map(p => p._2)
|
||||
.repartition(1000)
|
||||
.write.mode(SaveMode.Overwrite).save(s"$workingPath/relation")
|
||||
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/relation")
|
||||
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue