package eu.dnetlib.dhp.sx.graph import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.schema.common.ModelConstants import eu.dnetlib.dhp.schema.oaf.{OtherResearchProduct, Publication, Relation, Result, Software, Dataset => OafDataset} import org.apache.commons.io.IOUtils import org.apache.commons.lang3.StringUtils import org.apache.spark.SparkConf import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} import org.slf4j.{Logger, LoggerFactory} import scala.reflect.ClassTag import scala.util.Try object SparkConvertRDDtoDataset { def main(args: Array[String]): Unit = { val log: Logger = LoggerFactory.getLogger(getClass) val conf: SparkConf = new SparkConf() val parser = new ArgumentApplicationParser( IOUtils.toString( getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/convert_dataset_json_params.json") ) ) parser.parseArgument(args) val spark: SparkSession = SparkSession .builder() .config(conf) .appName(getClass.getSimpleName) .master(parser.get("master")) .getOrCreate() val sourcePath = parser.get("sourcePath") log.info(s"sourcePath -> $sourcePath") val t = parser.get("targetPath") log.info(s"targetPath -> $t") val subRelTypeFilter = parser.get("filterRelation") log.info(s"filterRelation -> $subRelTypeFilter") val entityPath = s"$t/entities" val relPath = s"$t/relation" val mapper = new ObjectMapper() implicit val datasetEncoder: Encoder[OafDataset] = Encoders.kryo(classOf[OafDataset]) implicit val publicationEncoder: Encoder[Publication] = Encoders.kryo(classOf[Publication]) implicit val relationEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation]) implicit val orpEncoder: Encoder[OtherResearchProduct] = Encoders.kryo(classOf[OtherResearchProduct]) implicit val softwareEncoder: Encoder[Software] = Encoders.kryo(classOf[Software]) log.info("Converting dataset") val rddDataset = spark.sparkContext .textFile(s"$sourcePath/dataset") .map(s => mapper.readValue(s, classOf[OafDataset])) .filter(r => r.getDataInfo != null && r.getDataInfo.getDeletedbyinference == false) spark .createDataset(rddDataset) .as[OafDataset] .write .mode(SaveMode.Overwrite) .save(s"$entityPath/dataset") log.info("Converting publication") val rddPublication = spark.sparkContext .textFile(s"$sourcePath/publication") .map(s => mapper.readValue(s, classOf[Publication])) .filter(r => r.getDataInfo != null && r.getDataInfo.getDeletedbyinference == false) spark .createDataset(rddPublication) .as[Publication] .write .mode(SaveMode.Overwrite) .save(s"$entityPath/publication") log.info("Converting software") val rddSoftware = spark.sparkContext .textFile(s"$sourcePath/software") .map(s => mapper.readValue(s, classOf[Software])) .filter(r => r.getDataInfo != null && r.getDataInfo.getDeletedbyinference == false) spark .createDataset(rddSoftware) .as[Software] .write .mode(SaveMode.Overwrite) .save(s"$entityPath/software") log.info("Converting otherresearchproduct") val rddOtherResearchProduct = spark.sparkContext .textFile(s"$sourcePath/otherresearchproduct") .map(s => mapper.readValue(s, classOf[OtherResearchProduct])) .filter(r => r.getDataInfo != null && r.getDataInfo.getDeletedbyinference == false) spark .createDataset(rddOtherResearchProduct) .as[OtherResearchProduct] .write .mode(SaveMode.Overwrite) .save(s"$entityPath/otherresearchproduct") log.info("Converting Relation") val relClassFilter = List( ModelConstants.MERGES, ModelConstants.IS_MERGED_IN, ModelConstants.HAS_AMONG_TOP_N_SIMILAR_DOCS, ModelConstants.IS_AMONG_TOP_N_SIMILAR_DOCS ) val rddRelation = spark.sparkContext .textFile(s"$sourcePath/relation") .map(s => mapper.readValue(s, classOf[Relation])) .filter(r => r.getDataInfo != null && !r.getDataInfo.getDeletedbyinference) .filter(r => r.getSource.startsWith("50") && r.getTarget.startsWith("50")) .filter(r => filterRelations(r)) //filter OpenCitations relations // .filter(r => // r.getDataInfo.getProvenanceaction != null && // !"sysimport:crosswalk:opencitations".equals(r.getDataInfo.getProvenanceaction.getClassid) // ) spark.createDataset(rddRelation).as[Relation].write.mode(SaveMode.Overwrite).save(s"$relPath") } private def filterRelations(r: Relation): Boolean = { /** * * We filter relation generated by dedups * and all the relation that have one single collectedFrom OpenCitation */ val relClassFilter = List( ModelConstants.MERGES, ModelConstants.IS_MERGED_IN, ModelConstants.HAS_AMONG_TOP_N_SIMILAR_DOCS, ModelConstants.IS_AMONG_TOP_N_SIMILAR_DOCS ) if (relClassFilter.exists(k => k.equalsIgnoreCase(r.getRelClass))) false else { if (r.getCollectedfrom == null || r.getCollectedfrom.size() == 0) false else if (r.getCollectedfrom.size() > 1) true else if ( r.getCollectedfrom.size() == 1 && r.getCollectedfrom.get(0) != null && "OpenCitations".equalsIgnoreCase( r.getCollectedfrom.get(0).getValue ) ) false else true } } }