Dataset based implementation for SparkCreateDedupRecord phase, fixed datasource entity dump supplementing dedup unit tests

This commit is contained in:
Claudio Atzori 2020-04-21 12:06:08 +02:00
parent 5c9ef08a8e
commit 91e72a6944
4 changed files with 112 additions and 120 deletions

View File

@ -11,8 +11,6 @@ import eu.dnetlib.pace.config.DedupConfig;
import java.io.IOException; import java.io.IOException;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.dom4j.DocumentException; import org.dom4j.DocumentException;
@ -72,12 +70,9 @@ public class SparkCreateDedupRecord extends AbstractSparkAction {
Class<OafEntity> clazz = ModelSupport.entityTypes.get(EntityType.valueOf(subEntity)); Class<OafEntity> clazz = ModelSupport.entityTypes.get(EntityType.valueOf(subEntity));
DedupRecordFactory.createDedupRecord(spark, mergeRelPath, entityPath, clazz) DedupRecordFactory.createDedupRecord(spark, mergeRelPath, entityPath, clazz)
.map(
(MapFunction<OafEntity, String>)
value -> OBJECT_MAPPER.writeValueAsString(value),
Encoders.STRING())
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath); .json(outputPath);
} }
} }

View File

@ -33,7 +33,7 @@ public class SparkUpdateEntity extends AbstractSparkAction {
private static final Logger log = LoggerFactory.getLogger(SparkUpdateEntity.class); private static final Logger log = LoggerFactory.getLogger(SparkUpdateEntity.class);
final String IDJSONPATH = "$.id"; private static final String IDJSONPATH = "$.id";
public SparkUpdateEntity(ArgumentApplicationParser parser, SparkSession spark) { public SparkUpdateEntity(ArgumentApplicationParser parser, SparkSession spark) {
super(parser, spark); super(parser, spark);
@ -65,27 +65,25 @@ public class SparkUpdateEntity extends AbstractSparkAction {
log.info("workingPath: '{}'", workingPath); log.info("workingPath: '{}'", workingPath);
log.info("dedupGraphPath: '{}'", dedupGraphPath); log.info("dedupGraphPath: '{}'", dedupGraphPath);
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
// for each entity // for each entity
ModelSupport.entityTypes.forEach( ModelSupport.entityTypes.forEach(
(entity, clazz) -> { (type, clazz) -> {
final String outputPath = dedupGraphPath + "/" + entity; final String outputPath = dedupGraphPath + "/" + type;
removeOutputDir(spark, outputPath); removeOutputDir(spark, outputPath);
JavaRDD<String> sourceEntity = JavaRDD<String> sourceEntity =
sc.textFile( sc.textFile(
DedupUtility.createEntityPath( DedupUtility.createEntityPath(graphBasePath, type.toString()));
graphBasePath, entity.toString()));
if (mergeRelExists(workingPath, entity.toString())) { if (mergeRelExists(workingPath, type.toString())) {
final String mergeRelPath = final String mergeRelPath =
DedupUtility.createMergeRelPath( DedupUtility.createMergeRelPath(workingPath, "*", type.toString());
workingPath, "*", entity.toString());
final String dedupRecordPath = final String dedupRecordPath =
DedupUtility.createDedupRecordPath( DedupUtility.createDedupRecordPath(
workingPath, "*", entity.toString()); workingPath, "*", type.toString());
final Dataset<Relation> rel = final Dataset<Relation> rel =
spark.read().load(mergeRelPath).as(Encoders.bean(Relation.class)); spark.read().load(mergeRelPath).as(Encoders.bean(Relation.class));
@ -107,7 +105,6 @@ public class SparkUpdateEntity extends AbstractSparkAction {
MapDocumentUtil.getJPathString( MapDocumentUtil.getJPathString(
IDJSONPATH, s), IDJSONPATH, s),
s)); s));
JavaRDD<String> map = JavaRDD<String> map =
entitiesWithId entitiesWithId
.leftOuterJoin(mergedIds) .leftOuterJoin(mergedIds)

View File

@ -72,7 +72,7 @@ public class SparkDedupTest implements Serializable {
spark = spark =
SparkSession.builder() SparkSession.builder()
.appName(SparkCreateSimRels.class.getSimpleName()) .appName(SparkDedupTest.class.getSimpleName())
.master("local[*]") .master("local[*]")
.config(new SparkConf()) .config(new SparkConf())
.getOrCreate(); .getOrCreate();
@ -300,8 +300,8 @@ public class SparkDedupTest implements Serializable {
long deletedSw = long deletedSw =
jsc.textFile(testDedupGraphBasePath + "/software") jsc.textFile(testDedupGraphBasePath + "/software")
.filter(this::isDeletedByInference) .filter(this::isDeletedByInference)
.count(); .count();
assertEquals(mergedOrgs, deletedOrgs); assertEquals(mergedOrgs, deletedOrgs);
assertEquals(mergedPubs, deletedPubs); assertEquals(mergedPubs, deletedPubs);