Master branch updates from beta September 2023 #337
|
@ -60,7 +60,7 @@ object SparkGenerateDoiBoost {
|
|||
val openaireOrganizationPath = parser.get("openaireOrganizationPath")
|
||||
|
||||
val crossrefAggregator = new Aggregator[(String, Publication), Publication, Publication] with Serializable {
|
||||
override def zero: Publication = new Publication
|
||||
override def zero: Publication = null
|
||||
|
||||
override def reduce(b: Publication, a: (String, Publication)): Publication = {
|
||||
|
||||
|
@ -177,12 +177,43 @@ object SparkGenerateDoiBoost {
|
|||
.map(DoiBoostMappingUtil.fixPublication)
|
||||
.map(p => (p.getId, p))
|
||||
.groupByKey(_._1)
|
||||
.agg(crossrefAggregator.toColumn)
|
||||
.map(p => p._2)
|
||||
.reduceGroups((left, right) =>
|
||||
{
|
||||
//Check left is not null
|
||||
if (left != null && left._1 != null)
|
||||
{
|
||||
//If right is null then return left
|
||||
if (right == null || right._2 == null)
|
||||
left
|
||||
else {
|
||||
// Here Left and Right are not null
|
||||
// So we have to merge
|
||||
val b1 = left._2
|
||||
val b2 = right._2
|
||||
b1.mergeFrom(b2)
|
||||
b1.mergeOAFDataInfo(b2)
|
||||
val authors = AuthorMerger.mergeAuthor(b1.getAuthor, b2.getAuthor)
|
||||
b1.setAuthor(authors)
|
||||
if (b2.getId != null && b2.getId.nonEmpty)
|
||||
b1.setId(b2.getId)
|
||||
//Return publication Merged
|
||||
(b1.getId, b1)
|
||||
}
|
||||
}
|
||||
else {
|
||||
// Left is Null so we return right
|
||||
right
|
||||
}
|
||||
}
|
||||
|
||||
)
|
||||
.filter(s => s!= null && s._2!=null)
|
||||
.map(s => s._2)
|
||||
.write
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(s"$workingDirPath/doiBoostPublicationFiltered")
|
||||
|
||||
|
||||
val affiliationPath = parser.get("affiliationPath")
|
||||
val paperAffiliationPath = parser.get("paperAffiliationPath")
|
||||
|
||||
|
|
Loading…
Reference in New Issue