diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ResultTagger.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ResultTagger.java
index 64cbd70ba..0d6c81627 100644
--- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ResultTagger.java
+++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ResultTagger.java
@@ -130,7 +130,7 @@ public class ResultTagger implements Serializable {
// log.info("Remove constraints for " + communityId);
if (conf.getRemoveConstraintsMap().keySet().contains(communityId) &&
conf.getRemoveConstraintsMap().get(communityId).getCriteria() != null &&
- !conf.getRemoveConstraintsMap().get(communityId).getCriteria().isEmpty() &&
+ !conf.getRemoveConstraintsMap().get(communityId).getCriteria().isEmpty() &&
conf
.getRemoveConstraintsMap()
.get(communityId)
@@ -228,7 +228,7 @@ public class ResultTagger implements Serializable {
.forEach(communityId -> {
if (!removeCommunities.contains(communityId) &&
conf.getSelectionConstraintsMap().get(communityId).getCriteria() != null &&
- !conf.getSelectionConstraintsMap().get(communityId).getCriteria().isEmpty() &&
+ !conf.getSelectionConstraintsMap().get(communityId).getCriteria().isEmpty() &&
conf
.getSelectionConstraintsMap()
.get(communityId)
diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java
index 936bdba1d..8cdfcee5a 100644
--- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java
+++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java
@@ -90,7 +90,7 @@ public class SparkCountryPropagationJob {
if (!preparedInfoRaw.isEmpty()) {
res
- .joinWith(preparedInfoRaw, res.col("id").equalTo(prepared.col("resultId")), "left_outer")
+ .joinWith(preparedInfoRaw, res.col("id").equalTo(preparedInfoRaw.col("resultId")), "left_outer")
.map(getCountryMergeFn(), Encoders.bean(resultClazz))
.write()
.option("compression", "gzip")
diff --git a/dhp-workflows/dhp-incremental-graph/pom.xml b/dhp-workflows/dhp-incremental-graph/pom.xml
index 3979da15b..ba294072c 100644
--- a/dhp-workflows/dhp-incremental-graph/pom.xml
+++ b/dhp-workflows/dhp-incremental-graph/pom.xml
@@ -53,6 +53,11 @@
dhp-aggregation
${project.version}
+
+ eu.dnetlib.dhp
+ dhp-enrichment
+ ${project.version}
+
eu.dnetlib.dhp
dhp-graph-mapper
diff --git a/dhp-workflows/dhp-incremental-graph/src/main/java/eu/dnetlib/dhp/incremental/SparkAppendContextCleanedGraph.scala b/dhp-workflows/dhp-incremental-graph/src/main/java/eu/dnetlib/dhp/incremental/SparkAppendContextCleanedGraph.scala
new file mode 100644
index 000000000..63c17a74a
--- /dev/null
+++ b/dhp-workflows/dhp-incremental-graph/src/main/java/eu/dnetlib/dhp/incremental/SparkAppendContextCleanedGraph.scala
@@ -0,0 +1,96 @@
+package eu.dnetlib.dhp.incremental
+
+import eu.dnetlib.dhp.PropagationConstant
+import eu.dnetlib.dhp.application.ArgumentApplicationParser
+import eu.dnetlib.dhp.bulktag.community.TaggingConstants
+import eu.dnetlib.dhp.schema.common.ModelSupport
+import eu.dnetlib.dhp.schema.oaf.{Oaf, OafEntity}
+import org.apache.commons.io.IOUtils
+import org.apache.spark.SparkConf
+import org.apache.spark.sql._
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsScalaMapConverter, seqAsJavaListConverter}
+
+object SparkAppendContextCleanedGraph {
+
+ def main(args: Array[String]): Unit = {
+ val log: Logger = LoggerFactory.getLogger(getClass)
+ val conf: SparkConf = new SparkConf()
+
+ val parser = new ArgumentApplicationParser(
+ IOUtils.toString(
+ getClass.getResourceAsStream(
+ "/eu/dnetlib/dhp/oa/graph/incremental/resolution/resolve_relationsbyid_params.json"
+ )
+ )
+ )
+ parser.parseArgument(args)
+ conf.set("hive.metastore.uris", parser.get("hiveMetastoreUris"))
+
+ val graphBasePath = parser.get("graphBasePath")
+ log.info(s"graphBasePath -> $graphBasePath")
+ val relationPath = parser.get("relationPath")
+ log.info(s"relationPath -> $relationPath")
+ val targetPath = parser.get("targetGraph")
+ log.info(s"targetGraph -> $targetPath")
+
+ val hiveDbName = parser.get("hiveDbName")
+ log.info(s"hiveDbName -> $hiveDbName")
+
+ val spark: SparkSession =
+ SparkSession
+ .builder()
+ .config(conf)
+ .enableHiveSupport()
+ .appName(getClass.getSimpleName)
+ .getOrCreate()
+
+ for ((entity, clazz) <- ModelSupport.oafTypes.asScala) {
+ if (classOf[OafEntity].isAssignableFrom(clazz)) {
+ val classEnc: Encoder[Oaf] = Encoders.bean(clazz).asInstanceOf[Encoder[Oaf]]
+
+ spark
+ .table(s"${hiveDbName}.${entity}")
+ .as(classEnc)
+ .map(e => {
+ val oaf = e.asInstanceOf[OafEntity]
+ if (oaf.getContext != null) {
+ val newContext = oaf.getContext.asScala
+ .map(c => {
+ if (c.getDataInfo != null) {
+ c.setDataInfo(
+ c.getDataInfo.asScala
+ .filter(
+ di =>
+ !di.getInferenceprovenance.equals(PropagationConstant.PROPAGATION_DATA_INFO_TYPE)
+ && !di.getInferenceprovenance.equals(TaggingConstants.BULKTAG_DATA_INFO_TYPE)
+ )
+ .toList
+ .asJava
+ )
+ }
+ c
+ })
+ .filter(!_.getDataInfo.isEmpty)
+ .toList
+ .asJava
+ oaf.setContext(newContext)
+ }
+ e
+ })(classEnc)
+ .write
+ .option("compression", "gzip")
+ .mode(SaveMode.Append)
+ .json(s"$targetPath/${entity}")
+ } else {
+ spark
+ .table(s"${hiveDbName}.${entity}")
+ .write
+ .option("compression", "gzip")
+ .mode(SaveMode.Append)
+ .json(s"$targetPath/${entity}")
+ }
+ }
+ }
+}