196 lines
5.5 KiB
Scala
196 lines
5.5 KiB
Scala
package eu.dnetlib.dhp.oa.graph.hostedbymap
|
|
|
|
import eu.dnetlib.dhp.oa.graph.hostedbymap.model.EntityInfo
|
|
import org.apache.spark.sql.expressions.Aggregator
|
|
import org.apache.spark.sql.{Dataset, Encoder, Encoders, TypedColumn}
|
|
|
|
case class HostedByItemType(
|
|
id: String,
|
|
officialname: String,
|
|
issn: String,
|
|
eissn: String,
|
|
lissn: String,
|
|
openAccess: Boolean
|
|
) {}
|
|
|
|
case class HostedByInfo(
|
|
id: String,
|
|
officialname: String,
|
|
journal_id: String,
|
|
provenance: String,
|
|
id_type: String
|
|
) {}
|
|
|
|
object Aggregators {
|
|
|
|
def getId(s1: String, s2: String): String = {
|
|
if (s1.startsWith("10|")) {
|
|
return s1
|
|
}
|
|
s2
|
|
}
|
|
|
|
def getValue(s1: String, s2: String): String = {
|
|
if (!s1.equals("")) {
|
|
return s1
|
|
}
|
|
s2
|
|
}
|
|
|
|
def explodeHostedByItemType(
|
|
df: Dataset[(String, HostedByItemType)]
|
|
): Dataset[(String, HostedByItemType)] = {
|
|
val transformedData: Dataset[(String, HostedByItemType)] = df
|
|
.groupByKey(_._1)(Encoders.STRING)
|
|
.agg(Aggregators.hostedByAggregator)
|
|
.map { case (id: String, res: (String, HostedByItemType)) =>
|
|
res
|
|
}(Encoders.tuple(Encoders.STRING, Encoders.product[HostedByItemType]))
|
|
|
|
transformedData
|
|
}
|
|
|
|
val hostedByAggregator: TypedColumn[(String, HostedByItemType), (String, HostedByItemType)] =
|
|
new Aggregator[
|
|
(String, HostedByItemType),
|
|
(String, HostedByItemType),
|
|
(String, HostedByItemType)
|
|
] {
|
|
|
|
override def zero: (String, HostedByItemType) =
|
|
("", HostedByItemType("", "", "", "", "", false))
|
|
|
|
override def reduce(
|
|
b: (String, HostedByItemType),
|
|
a: (String, HostedByItemType)
|
|
): (String, HostedByItemType) = {
|
|
return merge(b, a)
|
|
}
|
|
|
|
override def merge(
|
|
b1: (String, HostedByItemType),
|
|
b2: (String, HostedByItemType)
|
|
): (String, HostedByItemType) = {
|
|
if (b1 == null) {
|
|
return b2
|
|
}
|
|
if (b2 == null) {
|
|
return b1
|
|
}
|
|
if (b1._2.id.startsWith("10|")) {
|
|
return (
|
|
b1._1,
|
|
HostedByItemType(
|
|
b1._2.id,
|
|
b1._2.officialname,
|
|
b1._2.issn,
|
|
b1._2.eissn,
|
|
b1._2.lissn,
|
|
b1._2.openAccess || b2._2.openAccess
|
|
)
|
|
)
|
|
|
|
}
|
|
return (
|
|
b2._1,
|
|
HostedByItemType(
|
|
b2._2.id,
|
|
b2._2.officialname,
|
|
b2._2.issn,
|
|
b2._2.eissn,
|
|
b2._2.lissn,
|
|
b1._2.openAccess || b2._2.openAccess
|
|
)
|
|
)
|
|
|
|
}
|
|
|
|
override def finish(reduction: (String, HostedByItemType)): (String, HostedByItemType) =
|
|
reduction
|
|
|
|
override def bufferEncoder: Encoder[(String, HostedByItemType)] =
|
|
Encoders.tuple(Encoders.STRING, Encoders.product[HostedByItemType])
|
|
|
|
override def outputEncoder: Encoder[(String, HostedByItemType)] =
|
|
Encoders.tuple(Encoders.STRING, Encoders.product[HostedByItemType])
|
|
}.toColumn
|
|
|
|
def resultToSingleIdAggregator: TypedColumn[EntityInfo, EntityInfo] =
|
|
new Aggregator[EntityInfo, EntityInfo, EntityInfo] {
|
|
override def zero: EntityInfo = EntityInfo.newInstance("", "", "")
|
|
|
|
override def reduce(b: EntityInfo, a: EntityInfo): EntityInfo = {
|
|
return merge(b, a)
|
|
}
|
|
|
|
override def merge(b1: EntityInfo, b2: EntityInfo): EntityInfo = {
|
|
if (b1 == null) {
|
|
return b2
|
|
}
|
|
if (b2 == null) {
|
|
return b1
|
|
}
|
|
if (!b1.getHostedById.equals("")) {
|
|
b1.setOpenAccess(b1.getOpenAccess || b2.getOpenAccess)
|
|
return b1
|
|
}
|
|
b2.setOpenAccess(b1.getOpenAccess || b2.getOpenAccess)
|
|
b2
|
|
|
|
}
|
|
override def finish(reduction: EntityInfo): EntityInfo = reduction
|
|
override def bufferEncoder: Encoder[EntityInfo] = Encoders.bean(classOf[EntityInfo])
|
|
|
|
override def outputEncoder: Encoder[EntityInfo] = Encoders.bean(classOf[EntityInfo])
|
|
}.toColumn
|
|
|
|
def resultToSingleId(df: Dataset[EntityInfo]): Dataset[EntityInfo] = {
|
|
val transformedData: Dataset[EntityInfo] = df
|
|
.groupByKey(_.getId)(Encoders.STRING)
|
|
.agg(Aggregators.resultToSingleIdAggregator)
|
|
.map { case (id: String, res: EntityInfo) =>
|
|
res
|
|
}(Encoders.bean(classOf[EntityInfo]))
|
|
|
|
transformedData
|
|
}
|
|
|
|
def datasourceToSingleIdAggregator: TypedColumn[EntityInfo, EntityInfo] =
|
|
new Aggregator[EntityInfo, EntityInfo, EntityInfo] {
|
|
override def zero: EntityInfo = EntityInfo.newInstance("", "", "")
|
|
|
|
override def reduce(b: EntityInfo, a: EntityInfo): EntityInfo = {
|
|
return merge(b, a)
|
|
}
|
|
|
|
override def merge(b1: EntityInfo, b2: EntityInfo): EntityInfo = {
|
|
if (b1 == null) {
|
|
return b2
|
|
}
|
|
if (b2 == null) {
|
|
return b1
|
|
}
|
|
if (!b1.getHostedById.equals("")) {
|
|
return b1
|
|
}
|
|
b2
|
|
|
|
}
|
|
override def finish(reduction: EntityInfo): EntityInfo = reduction
|
|
override def bufferEncoder: Encoder[EntityInfo] = Encoders.bean(classOf[EntityInfo])
|
|
|
|
override def outputEncoder: Encoder[EntityInfo] = Encoders.bean(classOf[EntityInfo])
|
|
}.toColumn
|
|
|
|
def datasourceToSingleId(df: Dataset[EntityInfo]): Dataset[EntityInfo] = {
|
|
val transformedData: Dataset[EntityInfo] = df
|
|
.groupByKey(_.getHostedById)(Encoders.STRING)
|
|
.agg(Aggregators.datasourceToSingleIdAggregator)
|
|
.map { case (id: String, res: EntityInfo) =>
|
|
res
|
|
}(Encoders.bean(classOf[EntityInfo]))
|
|
|
|
transformedData
|
|
}
|
|
}
|