forked from antonis.lempesis/dnet-hadoop
avoid to save intermediate dataset before generation of Sequence file
This commit is contained in:
parent
e79445a8b4
commit
7834a35768
|
@ -41,30 +41,34 @@ object SparkGenerateDOIBoostActionSet {
|
||||||
val workingDirPath = parser.get("targetPath")
|
val workingDirPath = parser.get("targetPath")
|
||||||
val sequenceFilePath = parser.get("sFilePath")
|
val sequenceFilePath = parser.get("sFilePath")
|
||||||
|
|
||||||
spark.read.load(dbDatasetPath).as[OafDataset]
|
val asDataset = spark.read.load(dbDatasetPath).as[OafDataset]
|
||||||
.map(d =>DoiBoostMappingUtil.fixResult(d))
|
.map(d =>DoiBoostMappingUtil.fixResult(d))
|
||||||
.map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
|
.map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
|
||||||
.write.mode(SaveMode.Overwrite).save(s"$workingDirPath/actionSet")
|
// .write.mode(SaveMode.Overwrite).save(s"$workingDirPath/actionSet")
|
||||||
|
|
||||||
spark.read.load(dbPublicationPath).as[Publication]
|
val asPublication =spark.read.load(dbPublicationPath).as[Publication]
|
||||||
.map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
|
.map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
|
||||||
.write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet")
|
// .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet")
|
||||||
|
|
||||||
spark.read.load(dbOrganizationPath).as[Organization]
|
val asOrganization = spark.read.load(dbOrganizationPath).as[Organization]
|
||||||
.map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
|
.map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
|
||||||
.write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet")
|
// .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet")
|
||||||
|
|
||||||
|
|
||||||
spark.read.load(crossRefRelation).as[Relation]
|
val asCRelation = spark.read.load(crossRefRelation).as[Relation]
|
||||||
.map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
|
.map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
|
||||||
.write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet")
|
// .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet")
|
||||||
|
|
||||||
spark.read.load(dbaffiliationRelationPath).as[Relation]
|
val asRelAffiliation = spark.read.load(dbaffiliationRelationPath).as[Relation]
|
||||||
.map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
|
.map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
|
||||||
.write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet")
|
// .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet")
|
||||||
|
|
||||||
|
|
||||||
val d: Dataset[(String, String)] =spark.read.load(s"$workingDirPath/actionSet").as[(String,String)]
|
|
||||||
|
|
||||||
|
val d: Dataset[(String, String)] = asDataset.union(asPublication).union(asOrganization).union(asCRelation).union(asRelAffiliation)
|
||||||
|
|
||||||
|
// spark.read.load(s"$workingDirPath/actionSet").as[(String,String)]
|
||||||
|
|
||||||
d.rdd.repartition(6000).map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$sequenceFilePath", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec])
|
d.rdd.repartition(6000).map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$sequenceFilePath", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec])
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue