From 2b666c8aa657f6ab4815d5af83d849df606ecd60 Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Wed, 27 Nov 2024 18:15:23 +0100 Subject: [PATCH] Job to include hive graph in the current generation phase, after cleaning context --- .../dhp/bulktag/community/ResultTagger.java | 4 +- .../SparkCountryPropagationJob.java | 2 +- dhp-workflows/dhp-incremental-graph/pom.xml | 5 + .../SparkAppendContextCleanedGraph.scala | 96 +++++++++++++++++++ 4 files changed, 104 insertions(+), 3 deletions(-) create mode 100644 dhp-workflows/dhp-incremental-graph/src/main/java/eu/dnetlib/dhp/incremental/SparkAppendContextCleanedGraph.scala diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ResultTagger.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ResultTagger.java index 64cbd70ba..0d6c81627 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ResultTagger.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ResultTagger.java @@ -130,7 +130,7 @@ public class ResultTagger implements Serializable { // log.info("Remove constraints for " + communityId); if (conf.getRemoveConstraintsMap().keySet().contains(communityId) && conf.getRemoveConstraintsMap().get(communityId).getCriteria() != null && - !conf.getRemoveConstraintsMap().get(communityId).getCriteria().isEmpty() && + !conf.getRemoveConstraintsMap().get(communityId).getCriteria().isEmpty() && conf .getRemoveConstraintsMap() .get(communityId) @@ -228,7 +228,7 @@ public class ResultTagger implements Serializable { .forEach(communityId -> { if (!removeCommunities.contains(communityId) && conf.getSelectionConstraintsMap().get(communityId).getCriteria() != null && - !conf.getSelectionConstraintsMap().get(communityId).getCriteria().isEmpty() && + !conf.getSelectionConstraintsMap().get(communityId).getCriteria().isEmpty() && conf .getSelectionConstraintsMap() .get(communityId) diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java index 936bdba1d..8cdfcee5a 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java @@ -90,7 +90,7 @@ public class SparkCountryPropagationJob { if (!preparedInfoRaw.isEmpty()) { res - .joinWith(preparedInfoRaw, res.col("id").equalTo(prepared.col("resultId")), "left_outer") + .joinWith(preparedInfoRaw, res.col("id").equalTo(preparedInfoRaw.col("resultId")), "left_outer") .map(getCountryMergeFn(), Encoders.bean(resultClazz)) .write() .option("compression", "gzip") diff --git a/dhp-workflows/dhp-incremental-graph/pom.xml b/dhp-workflows/dhp-incremental-graph/pom.xml index 3979da15b..ba294072c 100644 --- a/dhp-workflows/dhp-incremental-graph/pom.xml +++ b/dhp-workflows/dhp-incremental-graph/pom.xml @@ -53,6 +53,11 @@ dhp-aggregation ${project.version} + + eu.dnetlib.dhp + dhp-enrichment + ${project.version} + eu.dnetlib.dhp dhp-graph-mapper diff --git a/dhp-workflows/dhp-incremental-graph/src/main/java/eu/dnetlib/dhp/incremental/SparkAppendContextCleanedGraph.scala b/dhp-workflows/dhp-incremental-graph/src/main/java/eu/dnetlib/dhp/incremental/SparkAppendContextCleanedGraph.scala new file mode 100644 index 000000000..63c17a74a --- /dev/null +++ b/dhp-workflows/dhp-incremental-graph/src/main/java/eu/dnetlib/dhp/incremental/SparkAppendContextCleanedGraph.scala @@ -0,0 +1,96 @@ +package eu.dnetlib.dhp.incremental + +import eu.dnetlib.dhp.PropagationConstant +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.bulktag.community.TaggingConstants +import eu.dnetlib.dhp.schema.common.ModelSupport +import eu.dnetlib.dhp.schema.oaf.{Oaf, OafEntity} +import org.apache.commons.io.IOUtils +import org.apache.spark.SparkConf +import org.apache.spark.sql._ +import org.slf4j.{Logger, LoggerFactory} + +import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsScalaMapConverter, seqAsJavaListConverter} + +object SparkAppendContextCleanedGraph { + + def main(args: Array[String]): Unit = { + val log: Logger = LoggerFactory.getLogger(getClass) + val conf: SparkConf = new SparkConf() + + val parser = new ArgumentApplicationParser( + IOUtils.toString( + getClass.getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/incremental/resolution/resolve_relationsbyid_params.json" + ) + ) + ) + parser.parseArgument(args) + conf.set("hive.metastore.uris", parser.get("hiveMetastoreUris")) + + val graphBasePath = parser.get("graphBasePath") + log.info(s"graphBasePath -> $graphBasePath") + val relationPath = parser.get("relationPath") + log.info(s"relationPath -> $relationPath") + val targetPath = parser.get("targetGraph") + log.info(s"targetGraph -> $targetPath") + + val hiveDbName = parser.get("hiveDbName") + log.info(s"hiveDbName -> $hiveDbName") + + val spark: SparkSession = + SparkSession + .builder() + .config(conf) + .enableHiveSupport() + .appName(getClass.getSimpleName) + .getOrCreate() + + for ((entity, clazz) <- ModelSupport.oafTypes.asScala) { + if (classOf[OafEntity].isAssignableFrom(clazz)) { + val classEnc: Encoder[Oaf] = Encoders.bean(clazz).asInstanceOf[Encoder[Oaf]] + + spark + .table(s"${hiveDbName}.${entity}") + .as(classEnc) + .map(e => { + val oaf = e.asInstanceOf[OafEntity] + if (oaf.getContext != null) { + val newContext = oaf.getContext.asScala + .map(c => { + if (c.getDataInfo != null) { + c.setDataInfo( + c.getDataInfo.asScala + .filter( + di => + !di.getInferenceprovenance.equals(PropagationConstant.PROPAGATION_DATA_INFO_TYPE) + && !di.getInferenceprovenance.equals(TaggingConstants.BULKTAG_DATA_INFO_TYPE) + ) + .toList + .asJava + ) + } + c + }) + .filter(!_.getDataInfo.isEmpty) + .toList + .asJava + oaf.setContext(newContext) + } + e + })(classEnc) + .write + .option("compression", "gzip") + .mode(SaveMode.Append) + .json(s"$targetPath/${entity}") + } else { + spark + .table(s"${hiveDbName}.${entity}") + .write + .option("compression", "gzip") + .mode(SaveMode.Append) + .json(s"$targetPath/${entity}") + } + } + } +}