added control to check if the entity exists

This commit is contained in:
Sandro La Bruzzo 2021-07-22 16:08:54 +02:00
parent 62ae36a3d2
commit 058b636d4d
1 changed files with 39 additions and 32 deletions

View File

@ -23,6 +23,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
@ -77,48 +78,54 @@ public class SparkUpdateEntity extends AbstractSparkAction {
(type, clazz) -> { (type, clazz) -> {
final String outputPath = dedupGraphPath + "/" + type; final String outputPath = dedupGraphPath + "/" + type;
removeOutputDir(spark, outputPath); removeOutputDir(spark, outputPath);
final String ip = DedupUtility.createEntityPath(graphBasePath, type.toString());
if (HdfsSupport.exists(ip, sc.hadoopConfiguration())) {
JavaRDD<String> sourceEntity = sc
.textFile(DedupUtility.createEntityPath(graphBasePath, type.toString()));
JavaRDD<String> sourceEntity = sc if (mergeRelExists(workingPath, type.toString())) {
.textFile(DedupUtility.createEntityPath(graphBasePath, type.toString()));
if (mergeRelExists(workingPath, type.toString())) { final String mergeRelPath = DedupUtility
.createMergeRelPath(workingPath, "*", type.toString());
final String dedupRecordPath = DedupUtility
.createDedupRecordPath(workingPath, "*", type.toString());
final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, "*", type.toString()); final Dataset<Relation> rel = spark
final String dedupRecordPath = DedupUtility .read()
.createDedupRecordPath(workingPath, "*", type.toString()); .load(mergeRelPath)
.as(Encoders.bean(Relation.class));
final Dataset<Relation> rel = spark.read().load(mergeRelPath).as(Encoders.bean(Relation.class)); final JavaPairRDD<String, String> mergedIds = rel
.where("relClass == 'merges'")
.where("source != target")
.select(rel.col("target"))
.distinct()
.toJavaRDD()
.mapToPair(
(PairFunction<Row, String, String>) r -> new Tuple2<>(r.getString(0), "d"));
final JavaPairRDD<String, String> mergedIds = rel JavaPairRDD<String, String> entitiesWithId = sourceEntity
.where("relClass == 'merges'") .mapToPair(
.where("source != target") (PairFunction<String, String, String>) s -> new Tuple2<>(
.select(rel.col("target")) MapDocumentUtil.getJPathString(IDJSONPATH, s), s));
.distinct() if (type == EntityType.organization) // exclude root records from organizations
.toJavaRDD() entitiesWithId = excludeRootOrgs(entitiesWithId, rel);
.mapToPair(
(PairFunction<Row, String, String>) r -> new Tuple2<>(r.getString(0), "d"));
JavaPairRDD<String, String> entitiesWithId = sourceEntity JavaRDD<String> map = entitiesWithId
.mapToPair( .leftOuterJoin(mergedIds)
(PairFunction<String, String, String>) s -> new Tuple2<>( .map(k -> {
MapDocumentUtil.getJPathString(IDJSONPATH, s), s)); if (k._2()._2().isPresent()) {
if (type == EntityType.organization) // exclude root records from organizations return updateDeletedByInference(k._2()._1(), clazz);
entitiesWithId = excludeRootOrgs(entitiesWithId, rel); }
return k._2()._1();
});
JavaRDD<String> map = entitiesWithId sourceEntity = map.union(sc.textFile(dedupRecordPath));
.leftOuterJoin(mergedIds)
.map(k -> {
if (k._2()._2().isPresent()) {
return updateDeletedByInference(k._2()._1(), clazz);
}
return k._2()._1();
});
sourceEntity = map.union(sc.textFile(dedupRecordPath)); }
sourceEntity.saveAsTextFile(outputPath, GzipCodec.class);
} }
sourceEntity.saveAsTextFile(outputPath, GzipCodec.class);
}); });
} }