From 9910ce06ae384c0e145db67665381e6ebe048f49 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Wed, 28 Jun 2023 11:38:16 +0200 Subject: [PATCH] added to CreateSimRel the feature to write time log --- .../dhp/application/dedup/log/DedupLogModel.scala | 9 ++++++++- .../dhp/application/dedup/log/DedupLogWriter.scala | 8 +++----- .../eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java | 10 ++++------ .../dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml | 3 ++- 4 files changed, 17 insertions(+), 13 deletions(-) diff --git a/dhp-common/src/main/scala/eu/dnetlib/dhp/application/dedup/log/DedupLogModel.scala b/dhp-common/src/main/scala/eu/dnetlib/dhp/application/dedup/log/DedupLogModel.scala index c1473e7df1..d74ec3f696 100644 --- a/dhp-common/src/main/scala/eu/dnetlib/dhp/application/dedup/log/DedupLogModel.scala +++ b/dhp-common/src/main/scala/eu/dnetlib/dhp/application/dedup/log/DedupLogModel.scala @@ -1,3 +1,10 @@ package eu.dnetlib.dhp.application.dedup.log -case class DedupLogModel(tag:String, configuration:String, entity:String, startTS:Long, endTS:Long, totalMs:Long ) {} +case class DedupLogModel( + tag: String, + configuration: String, + entity: String, + startTS: Long, + endTS: Long, + totalMs: Long +) {} diff --git a/dhp-common/src/main/scala/eu/dnetlib/dhp/application/dedup/log/DedupLogWriter.scala b/dhp-common/src/main/scala/eu/dnetlib/dhp/application/dedup/log/DedupLogWriter.scala index 3060a13ae0..4409c01d92 100644 --- a/dhp-common/src/main/scala/eu/dnetlib/dhp/application/dedup/log/DedupLogWriter.scala +++ b/dhp-common/src/main/scala/eu/dnetlib/dhp/application/dedup/log/DedupLogWriter.scala @@ -2,15 +2,13 @@ package eu.dnetlib.dhp.application.dedup.log import org.apache.spark.sql.{SaveMode, SparkSession} -class DedupLogWriter (path:String) { +class DedupLogWriter(path: String) { - - def appendLog(dedupLogModel: DedupLogModel, spark:SparkSession): Unit = { + def appendLog(dedupLogModel: DedupLogModel, spark: SparkSession): Unit = { import spark.implicits._ - val df = spark.createDataset[DedupLogModel](data = List(dedupLogModel)) + val df = spark.createDataset[DedupLogModel](data = List(dedupLogModel)) df.write.mode(SaveMode.Append).save(path) - } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java index c015078c57..bf8906bea4 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java @@ -4,8 +4,6 @@ package eu.dnetlib.dhp.oa.dedup; import java.io.IOException; import java.util.Optional; -import eu.dnetlib.dhp.application.dedup.log.DedupLogModel; -import eu.dnetlib.dhp.application.dedup.log.DedupLogWriter; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; @@ -22,6 +20,8 @@ import org.slf4j.LoggerFactory; import org.xml.sax.SAXException; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.application.dedup.log.DedupLogModel; +import eu.dnetlib.dhp.application.dedup.log.DedupLogWriter; import eu.dnetlib.dhp.oa.dedup.model.Block; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.utils.ISLookupClientFactory; @@ -74,11 +74,9 @@ public class SparkCreateSimRels extends AbstractSparkAction { log.info("actionSetId: '{}'", actionSetId); log.info("workingPath: '{}'", workingPath); - final String dfLogPath = parser.get("dataframeLog"); final String runTag = Optional.ofNullable(parser.get("runTAG")).orElse("UNKNOWN"); - // for each dedup configuration for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) { @@ -93,7 +91,6 @@ public class SparkCreateSimRels extends AbstractSparkAction { JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - JavaPairRDD mapDocuments = sc .textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) .repartition(numPartitions) @@ -120,7 +117,8 @@ public class SparkCreateSimRels extends AbstractSparkAction { saveParquet(simRels, outputPath, SaveMode.Overwrite); final long end = System.currentTimeMillis(); if (StringUtils.isNotBlank(dfLogPath)) { - final DedupLogModel model = new DedupLogModel(runTag, dedupConf.toString(),entity, start, end, end-start); + final DedupLogModel model = new DedupLogModel(runTag, dedupConf.toString(), entity, start, end, + end - start); new DedupLogWriter(dfLogPath).appendLog(model, spark); } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml index ba2270c8a8..0a87826030 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml @@ -134,7 +134,8 @@ --workingPath${workingPath} --numPartitions15000 - + +