Job to include hive graph in the current generation phase, after cleaning context

This commit is contained in:
Giambattista Bloisi 2024-11-27 18:15:23 +01:00
parent 1d80c1da57
commit 2b666c8aa6
4 changed files with 104 additions and 3 deletions

View File

@ -130,7 +130,7 @@ public class ResultTagger implements Serializable {
// log.info("Remove constraints for " + communityId); // log.info("Remove constraints for " + communityId);
if (conf.getRemoveConstraintsMap().keySet().contains(communityId) && if (conf.getRemoveConstraintsMap().keySet().contains(communityId) &&
conf.getRemoveConstraintsMap().get(communityId).getCriteria() != null && conf.getRemoveConstraintsMap().get(communityId).getCriteria() != null &&
!conf.getRemoveConstraintsMap().get(communityId).getCriteria().isEmpty() && !conf.getRemoveConstraintsMap().get(communityId).getCriteria().isEmpty() &&
conf conf
.getRemoveConstraintsMap() .getRemoveConstraintsMap()
.get(communityId) .get(communityId)
@ -228,7 +228,7 @@ public class ResultTagger implements Serializable {
.forEach(communityId -> { .forEach(communityId -> {
if (!removeCommunities.contains(communityId) && if (!removeCommunities.contains(communityId) &&
conf.getSelectionConstraintsMap().get(communityId).getCriteria() != null && conf.getSelectionConstraintsMap().get(communityId).getCriteria() != null &&
!conf.getSelectionConstraintsMap().get(communityId).getCriteria().isEmpty() && !conf.getSelectionConstraintsMap().get(communityId).getCriteria().isEmpty() &&
conf conf
.getSelectionConstraintsMap() .getSelectionConstraintsMap()
.get(communityId) .get(communityId)

View File

@ -90,7 +90,7 @@ public class SparkCountryPropagationJob {
if (!preparedInfoRaw.isEmpty()) { if (!preparedInfoRaw.isEmpty()) {
res res
.joinWith(preparedInfoRaw, res.col("id").equalTo(prepared.col("resultId")), "left_outer") .joinWith(preparedInfoRaw, res.col("id").equalTo(preparedInfoRaw.col("resultId")), "left_outer")
.map(getCountryMergeFn(), Encoders.bean(resultClazz)) .map(getCountryMergeFn(), Encoders.bean(resultClazz))
.write() .write()
.option("compression", "gzip") .option("compression", "gzip")

View File

@ -53,6 +53,11 @@
<artifactId>dhp-aggregation</artifactId> <artifactId>dhp-aggregation</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-enrichment</artifactId>
<version>${project.version}</version>
</dependency>
<dependency> <dependency>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-graph-mapper</artifactId> <artifactId>dhp-graph-mapper</artifactId>

View File

@ -0,0 +1,96 @@
package eu.dnetlib.dhp.incremental
import eu.dnetlib.dhp.PropagationConstant
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.bulktag.community.TaggingConstants
import eu.dnetlib.dhp.schema.common.ModelSupport
import eu.dnetlib.dhp.schema.oaf.{Oaf, OafEntity}
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsScalaMapConverter, seqAsJavaListConverter}
object SparkAppendContextCleanedGraph {
def main(args: Array[String]): Unit = {
val log: Logger = LoggerFactory.getLogger(getClass)
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(
IOUtils.toString(
getClass.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/incremental/resolution/resolve_relationsbyid_params.json"
)
)
)
parser.parseArgument(args)
conf.set("hive.metastore.uris", parser.get("hiveMetastoreUris"))
val graphBasePath = parser.get("graphBasePath")
log.info(s"graphBasePath -> $graphBasePath")
val relationPath = parser.get("relationPath")
log.info(s"relationPath -> $relationPath")
val targetPath = parser.get("targetGraph")
log.info(s"targetGraph -> $targetPath")
val hiveDbName = parser.get("hiveDbName")
log.info(s"hiveDbName -> $hiveDbName")
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.enableHiveSupport()
.appName(getClass.getSimpleName)
.getOrCreate()
for ((entity, clazz) <- ModelSupport.oafTypes.asScala) {
if (classOf[OafEntity].isAssignableFrom(clazz)) {
val classEnc: Encoder[Oaf] = Encoders.bean(clazz).asInstanceOf[Encoder[Oaf]]
spark
.table(s"${hiveDbName}.${entity}")
.as(classEnc)
.map(e => {
val oaf = e.asInstanceOf[OafEntity]
if (oaf.getContext != null) {
val newContext = oaf.getContext.asScala
.map(c => {
if (c.getDataInfo != null) {
c.setDataInfo(
c.getDataInfo.asScala
.filter(
di =>
!di.getInferenceprovenance.equals(PropagationConstant.PROPAGATION_DATA_INFO_TYPE)
&& !di.getInferenceprovenance.equals(TaggingConstants.BULKTAG_DATA_INFO_TYPE)
)
.toList
.asJava
)
}
c
})
.filter(!_.getDataInfo.isEmpty)
.toList
.asJava
oaf.setContext(newContext)
}
e
})(classEnc)
.write
.option("compression", "gzip")
.mode(SaveMode.Append)
.json(s"$targetPath/${entity}")
} else {
spark
.table(s"${hiveDbName}.${entity}")
.write
.option("compression", "gzip")
.mode(SaveMode.Append)
.json(s"$targetPath/${entity}")
}
}
}
}