Oozie workflow for cleancontext #216
|
@ -59,52 +59,6 @@ object SparkGenerateDoiBoost {
|
||||||
val workingDirPath = parser.get("workingPath")
|
val workingDirPath = parser.get("workingPath")
|
||||||
val openaireOrganizationPath = parser.get("openaireOrganizationPath")
|
val openaireOrganizationPath = parser.get("openaireOrganizationPath")
|
||||||
|
|
||||||
val crossrefAggregator = new Aggregator[(String, Publication), Publication, Publication] with Serializable {
|
|
||||||
override def zero: Publication = new Publication
|
|
||||||
|
|
||||||
override def reduce(b: Publication, a: (String, Publication)): Publication = {
|
|
||||||
|
|
||||||
if (b == null) {
|
|
||||||
if (a != null && a._2 != null) {
|
|
||||||
a._2.setId(a._1)
|
|
||||||
return a._2
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (a != null && a._2 != null) {
|
|
||||||
b.mergeFrom(a._2)
|
|
||||||
b.setId(a._1)
|
|
||||||
val authors = AuthorMerger.mergeAuthor(b.getAuthor, a._2.getAuthor)
|
|
||||||
b.setAuthor(authors)
|
|
||||||
return b
|
|
||||||
}
|
|
||||||
}
|
|
||||||
new Publication
|
|
||||||
}
|
|
||||||
|
|
||||||
override def merge(b1: Publication, b2: Publication): Publication = {
|
|
||||||
if (b1 == null) {
|
|
||||||
if (b2 != null)
|
|
||||||
return b2
|
|
||||||
} else {
|
|
||||||
if (b2 != null) {
|
|
||||||
b1.mergeFrom(b2)
|
|
||||||
val authors = AuthorMerger.mergeAuthor(b1.getAuthor, b2.getAuthor)
|
|
||||||
b1.setAuthor(authors)
|
|
||||||
if (b2.getId != null && b2.getId.nonEmpty)
|
|
||||||
b1.setId(b2.getId)
|
|
||||||
return b1
|
|
||||||
}
|
|
||||||
}
|
|
||||||
new Publication
|
|
||||||
}
|
|
||||||
|
|
||||||
override def finish(reduction: Publication): Publication = reduction
|
|
||||||
|
|
||||||
override def bufferEncoder: Encoder[Publication] = Encoders.kryo[Publication]
|
|
||||||
|
|
||||||
override def outputEncoder: Encoder[Publication] = Encoders.kryo[Publication]
|
|
||||||
}
|
|
||||||
|
|
||||||
implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication]
|
implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication]
|
||||||
implicit val mapEncoderOrg: Encoder[Organization] = Encoders.kryo[Organization]
|
implicit val mapEncoderOrg: Encoder[Organization] = Encoders.kryo[Organization]
|
||||||
implicit val mapEncoderDataset: Encoder[OafDataset] = Encoders.kryo[OafDataset]
|
implicit val mapEncoderDataset: Encoder[OafDataset] = Encoders.kryo[OafDataset]
|
||||||
|
@ -175,8 +129,33 @@ object SparkGenerateDoiBoost {
|
||||||
.map(DoiBoostMappingUtil.fixPublication)
|
.map(DoiBoostMappingUtil.fixPublication)
|
||||||
.map(p => (p.getId, p))
|
.map(p => (p.getId, p))
|
||||||
.groupByKey(_._1)
|
.groupByKey(_._1)
|
||||||
.agg(crossrefAggregator.toColumn)
|
.reduceGroups((left, right) => {
|
||||||
.map(p => p._2)
|
//Check left is not null
|
||||||
|
if (left != null && left._1 != null) {
|
||||||
|
//If right is null then return left
|
||||||
|
if (right == null || right._2 == null)
|
||||||
|
left
|
||||||
|
else {
|
||||||
|
// Here Left and Right are not null
|
||||||
|
// So we have to merge
|
||||||
|
val b1 = left._2
|
||||||
|
val b2 = right._2
|
||||||
|
b1.mergeFrom(b2)
|
||||||
|
b1.mergeOAFDataInfo(b2)
|
||||||
|
val authors = AuthorMerger.mergeAuthor(b1.getAuthor, b2.getAuthor)
|
||||||
|
b1.setAuthor(authors)
|
||||||
|
if (b2.getId != null && b2.getId.nonEmpty)
|
||||||
|
b1.setId(b2.getId)
|
||||||
|
//Return publication Merged
|
||||||
|
(b1.getId, b1)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Left is Null so we return right
|
||||||
|
right
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.filter(s => s != null && s._2 != null)
|
||||||
|
.map(s => s._2._2)
|
||||||
.write
|
.write
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.save(s"$workingDirPath/doiBoostPublicationFiltered")
|
.save(s"$workingDirPath/doiBoostPublicationFiltered")
|
||||||
|
|
Loading…
Reference in New Issue