53 lines
1.6 KiB
Scala
53 lines
1.6 KiB
Scala
package eu.dnetlib.dhp.collection
|
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper
|
|
import eu.dnetlib.dhp.schema.oaf.common.ModelSupport
|
|
import eu.dnetlib.dhp.schema.oaf.{Entity, Oaf, Relation}
|
|
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode}
|
|
|
|
object CollectionUtils {
|
|
|
|
/** This method in pipeline to the transformation phase,
|
|
* generates relations in both verse, typically it should be a phase of flatMap
|
|
*
|
|
* @param i input OAF
|
|
* @return
|
|
* If the input OAF is an entity -> List(i)
|
|
* If the input OAF is a relation -> List(relation, inverseRelation)
|
|
*/
|
|
|
|
def fixRelations(i: Oaf): List[Oaf] = {
|
|
if (i.isInstanceOf[Entity])
|
|
List(i)
|
|
else {
|
|
val r: Relation = i.asInstanceOf[Relation]
|
|
val inverse = new Relation
|
|
inverse.setSource(r.getTarget)
|
|
inverse.setTarget(r.getSource)
|
|
inverse.setRelType(r.getRelType)
|
|
inverse.setSubRelType(r.getSubRelType)
|
|
inverse.setRelClass(r.getRelClass.getInverse)
|
|
inverse.setProvenance(r.getProvenance)
|
|
inverse.setProperties(r.getProperties)
|
|
inverse.setValidated(r.getValidated)
|
|
inverse.setValidationDate(r.getValidationDate)
|
|
List(r, inverse)
|
|
}
|
|
}
|
|
|
|
def saveDataset(dataset: Dataset[Oaf], targetPath: String): Unit = {
|
|
implicit val resultEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf])
|
|
val mapper = new ObjectMapper
|
|
|
|
dataset
|
|
.flatMap(i => CollectionUtils.fixRelations(i))
|
|
.filter(i => i != null)
|
|
.map(r => mapper.writeValueAsString(r))(Encoders.STRING)
|
|
.write
|
|
.mode(SaveMode.Overwrite)
|
|
.option("compression", "gzip")
|
|
.text(targetPath)
|
|
}
|
|
|
|
}
|