diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java index 2acec6546e..1fe83cec29 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java @@ -73,7 +73,7 @@ public class SparkPropagateRelation extends AbstractSparkAction { .load(DedupUtility.createMergeRelPath(workingPath, "*", "*")) .as(Encoders.bean(Relation.class)); - // + // Dataset> mergedIds = mergeRels .where(col("relClass").equalTo(ModelConstants.MERGES)) .select(col("source"), col("target")) @@ -116,31 +116,32 @@ public class SparkPropagateRelation extends AbstractSparkAction { .map((MapFunction, Relation>) t -> t._2(), Encoders.bean(Relation.class)); } - //redirect the relations to the dedupID + // redirect the relations to the dedupID private static Dataset createNewRels( - Dataset rels, //all the relations to be redirected - Dataset> mergedIds, //merge rels: + Dataset rels, // all the relations to be redirected + Dataset> mergedIds, // merge rels: MapFunction, Tuple2>, Tuple2>, Relation> mapRel) { - // + // Dataset> mapped = rels .map( (MapFunction>) r -> new Tuple3<>(getId(r, FieldType.SOURCE), r, getId(r, FieldType.TARGET)), Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class), Encoders.STRING())); - //< , > + // < , > Dataset, Tuple2>> relSource = mapped .joinWith(mergedIds, mapped.col("_1").equalTo(mergedIds.col("_1")), "left_outer"); - //< <, >, > + // < <, >, > Dataset, Tuple2>, Tuple2>> relSourceTarget = relSource - .joinWith(mergedIds, relSource.col("_1._3").equalTo(mergedIds.col("_1")), "left_outer"); + .joinWith(mergedIds, relSource.col("_1._3").equalTo(mergedIds.col("_1")), "left_outer"); return relSourceTarget .filter( - (FilterFunction, Tuple2>, Tuple2>>) - r -> r._1()._1() != null || r._2() != null) + (FilterFunction, Tuple2>, Tuple2>>) r -> r + ._1() + ._1() != null || r._2() != null) .map(mapRel, Encoders.bean(Relation.class)) .distinct(); } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/sx/pangaea/PangaeaUtils.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/sx/pangaea/PangaeaUtils.scala new file mode 100644 index 0000000000..a8a737c23a --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/sx/pangaea/PangaeaUtils.scala @@ -0,0 +1,85 @@ +package eu.dnetlib.sx.pangaea + + +import org.apache.spark.sql.expressions.Aggregator +import org.apache.spark.sql.{Encoder, Encoders} +import org.json4s +import org.json4s.DefaultFormats +import org.json4s.jackson.JsonMethods.parse + +import java.text.SimpleDateFormat +import java.util.Date + + +case class PangaeaDataModel(datestamp:String, identifier:String, xml:String) {} + + + +object PangaeaUtils { + + + def toDataset(input:String):PangaeaDataModel = { + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + lazy val json: json4s.JValue = parse(input) + + val d = new Date() + val s:String = s"${new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")format(d)}Z" + + val ds = (json \ "internal-datestamp").extractOrElse[String](s) + val identifier= (json \ "metadatalink").extractOrElse[String]() + val xml= (json \ "xml").extract[String] + PangaeaDataModel(ds, identifier,xml) + } + + + def getDatasetAggregator(): Aggregator[(String, PangaeaDataModel), PangaeaDataModel, PangaeaDataModel] = new Aggregator[(String, PangaeaDataModel), PangaeaDataModel, PangaeaDataModel]{ + + + override def zero: PangaeaDataModel = null + + override def reduce(b: PangaeaDataModel, a: (String, PangaeaDataModel)): PangaeaDataModel = { + if (b == null) + a._2 + else { + if (a == null) + b + else { + val ts1 = b.datestamp + val ts2 = a._2.datestamp + if (ts1 > ts2) + b + else + a._2 + + } + } + } + + override def merge(b1: PangaeaDataModel, b2: PangaeaDataModel): PangaeaDataModel = { + if (b1 == null) + b2 + else { + if (b2 == null) + b1 + else { + val ts1 = b1.datestamp + val ts2 = b2.datestamp + if (ts1 > ts2) + b1 + else + b2 + + } + } + } + override def finish(reduction: PangaeaDataModel): PangaeaDataModel = reduction + + override def bufferEncoder: Encoder[PangaeaDataModel] = Encoders.kryo[PangaeaDataModel] + + override def outputEncoder: Encoder[PangaeaDataModel] = Encoders.kryo[PangaeaDataModel] + } + + + + +} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/sx/pangaea/SparkGeneratePanagaeaDataset.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/sx/pangaea/SparkGeneratePanagaeaDataset.scala new file mode 100644 index 0000000000..17b286a7e4 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/sx/pangaea/SparkGeneratePanagaeaDataset.scala @@ -0,0 +1,53 @@ +package eu.dnetlib.sx.pangaea + +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.sx.ebi.SparkCreateEBIDataFrame +import org.apache.spark.rdd.RDD +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} +import org.slf4j.{Logger, LoggerFactory} + +import scala.collection.JavaConverters._ +import scala.io.Source + +object SparkGeneratePanagaeaDataset { + + + def main(args: Array[String]): Unit = { + val logger: Logger = LoggerFactory.getLogger(getClass) + val conf: SparkConf = new SparkConf() + val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/pangaea/pangaea_to_dataset.json")).mkString) + parser.parseArgument(args) + + + val spark: SparkSession = + SparkSession + .builder() + .config(conf) + .appName(SparkCreateEBIDataFrame.getClass.getSimpleName) + .master(parser.get("master")).getOrCreate() + + parser.getObjectMap.asScala.foreach(s => logger.info(s"${s._1} -> ${s._2}")) + logger.info("Converting sequential file into Dataset") + val sc:SparkContext = spark.sparkContext + + val workingPath:String = parser.get("workingPath") + + implicit val pangaeaEncoders: Encoder[PangaeaDataModel] = Encoders.kryo[PangaeaDataModel] + + val inputRDD:RDD[PangaeaDataModel] = sc.textFile(s"$workingPath/update").map(s => PangaeaUtils.toDataset(s)) + + spark.createDataset(inputRDD).as[PangaeaDataModel] + .map(s => (s.identifier,s))(Encoders.tuple(Encoders.STRING, pangaeaEncoders)) + .groupByKey(_._1)(Encoders.STRING) + .agg(PangaeaUtils.getDatasetAggregator().toColumn) + .map(s => s._2) + .write.mode(SaveMode.Overwrite).save(s"$workingPath/dataset_updated") + + } + + + + + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/pangaea/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/pangaea/oozie_app/config-default.xml new file mode 100644 index 0000000000..bdd48b0ab2 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/pangaea/oozie_app/config-default.xml @@ -0,0 +1,19 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/pangaea/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/pangaea/oozie_app/workflow.xml new file mode 100644 index 0000000000..60acee2119 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/pangaea/oozie_app/workflow.xml @@ -0,0 +1,40 @@ + + + + pangaeaWorkingPath + the Pangaea Working Path + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + yarn + cluster + Convert Pangaea to Dataset + eu.dnetlib.sx.pangaea.SparkGeneratePanagaeaDataset + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --workingPath${pangaeaWorkingPath} + --masteryarn + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/pangaea/pangaea_to_dataset.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/pangaea/pangaea_to_dataset.json new file mode 100644 index 0000000000..366f1426e4 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/pangaea/pangaea_to_dataset.json @@ -0,0 +1,4 @@ +[ + {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, + {"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true} +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/pangaea/PangaeaTransformTest.scala b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/pangaea/PangaeaTransformTest.scala new file mode 100644 index 0000000000..55eb4ee989 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/pangaea/PangaeaTransformTest.scala @@ -0,0 +1,25 @@ +package eu.dnetlib.dhp.sx.pangaea + +import org.junit.jupiter.api.Test +import java.util.TimeZone +import java.text.SimpleDateFormat +import java.util.Date +class PangaeaTransformTest { + + + + @Test + def test_dateStamp() :Unit ={ + + + + val d = new Date() + + val s:String = s"${new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")format(d)}Z" + + + println(s) + + } + +}