Oozie workflow for cleancontext #216

Merged
claudio.atzori merged 7 commits from cleancontext into beta 2022-04-22 15:46:40 +02:00
1 changed files with 27 additions and 48 deletions
Showing only changes of commit 54162f5c4f - Show all commits

View File

@ -59,52 +59,6 @@ object SparkGenerateDoiBoost {
val workingDirPath = parser.get("workingPath") val workingDirPath = parser.get("workingPath")
val openaireOrganizationPath = parser.get("openaireOrganizationPath") val openaireOrganizationPath = parser.get("openaireOrganizationPath")
val crossrefAggregator = new Aggregator[(String, Publication), Publication, Publication] with Serializable {
override def zero: Publication = new Publication
override def reduce(b: Publication, a: (String, Publication)): Publication = {
if (b == null) {
if (a != null && a._2 != null) {
a._2.setId(a._1)
return a._2
}
} else {
if (a != null && a._2 != null) {
b.mergeFrom(a._2)
b.setId(a._1)
val authors = AuthorMerger.mergeAuthor(b.getAuthor, a._2.getAuthor)
b.setAuthor(authors)
return b
}
}
new Publication
}
override def merge(b1: Publication, b2: Publication): Publication = {
if (b1 == null) {
if (b2 != null)
return b2
} else {
if (b2 != null) {
b1.mergeFrom(b2)
val authors = AuthorMerger.mergeAuthor(b1.getAuthor, b2.getAuthor)
b1.setAuthor(authors)
if (b2.getId != null && b2.getId.nonEmpty)
b1.setId(b2.getId)
return b1
}
}
new Publication
}
override def finish(reduction: Publication): Publication = reduction
override def bufferEncoder: Encoder[Publication] = Encoders.kryo[Publication]
override def outputEncoder: Encoder[Publication] = Encoders.kryo[Publication]
}
implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication] implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication]
implicit val mapEncoderOrg: Encoder[Organization] = Encoders.kryo[Organization] implicit val mapEncoderOrg: Encoder[Organization] = Encoders.kryo[Organization]
implicit val mapEncoderDataset: Encoder[OafDataset] = Encoders.kryo[OafDataset] implicit val mapEncoderDataset: Encoder[OafDataset] = Encoders.kryo[OafDataset]
@ -175,8 +129,33 @@ object SparkGenerateDoiBoost {
.map(DoiBoostMappingUtil.fixPublication) .map(DoiBoostMappingUtil.fixPublication)
.map(p => (p.getId, p)) .map(p => (p.getId, p))
.groupByKey(_._1) .groupByKey(_._1)
.agg(crossrefAggregator.toColumn) .reduceGroups((left, right) => {
.map(p => p._2) //Check left is not null
if (left != null && left._1 != null) {
//If right is null then return left
if (right == null || right._2 == null)
left
else {
// Here Left and Right are not null
// So we have to merge
val b1 = left._2
val b2 = right._2
b1.mergeFrom(b2)
b1.mergeOAFDataInfo(b2)
val authors = AuthorMerger.mergeAuthor(b1.getAuthor, b2.getAuthor)
b1.setAuthor(authors)
if (b2.getId != null && b2.getId.nonEmpty)
b1.setId(b2.getId)
//Return publication Merged
(b1.getId, b1)
}
} else {
// Left is Null so we return right
right
}
})
.filter(s => s != null && s._2 != null)
.map(s => s._2._2)
.write .write
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.save(s"$workingDirPath/doiBoostPublicationFiltered") .save(s"$workingDirPath/doiBoostPublicationFiltered")