Fill the new mergedIds field when generating dedup records

Filter out dedup records composed of invisible records only
Filter out mergerels that have not been used when creating the dedup record (ungrouping of cliques)
This commit is contained in:
Giambattista Bloisi 2024-10-28 12:05:56 +01:00
parent 46dbb62598
commit 56224e034a
3 changed files with 47 additions and 6 deletions

View File

@ -2,14 +2,13 @@
package eu.dnetlib.dhp.oa.dedup;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.FlatMapGroupsFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.*;
import eu.dnetlib.dhp.oa.dedup.model.Identifier;
@ -107,6 +106,8 @@ public class DedupRecordFactory {
final HashSet<String> acceptanceDate = new HashSet<>();
boolean isVisible = false;
while (it.hasNext()) {
Tuple3<String, String, OafEntity> t = it.next();
OafEntity entity = t._3();
@ -114,6 +115,7 @@ public class DedupRecordFactory {
if (entity == null) {
aliases.add(t._2());
} else {
isVisible = isVisible || !entity.getDataInfo().getInvisible();
cliques.add(entity);
if (acceptanceDate.size() < MAX_ACCEPTANCE_DATE) {
@ -129,13 +131,20 @@ public class DedupRecordFactory {
}
if (acceptanceDate.size() >= MAX_ACCEPTANCE_DATE || cliques.isEmpty()) {
if (!isVisible || acceptanceDate.size() >= MAX_ACCEPTANCE_DATE || cliques.isEmpty()) {
return Collections.emptyIterator();
}
OafEntity mergedEntity = MergeUtils.mergeGroup(dedupId, cliques.iterator());
// dedup records do not have date of transformation attribute
mergedEntity.setDateoftransformation(null);
mergedEntity
.setMergedIds(
Stream
.concat(cliques.stream().map(OafEntity::getId), aliases.stream())
.distinct()
.sorted()
.collect(Collectors.toList()));
return Stream
.concat(

View File

@ -5,11 +5,11 @@ import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTION
import static eu.dnetlib.dhp.schema.common.ModelConstants.PROVENANCE_DEDUP;
import java.io.IOException;
import java.util.Arrays;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.*;
import org.dom4j.DocumentException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -17,6 +17,7 @@ import org.xml.sax.SAXException;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
@ -25,6 +26,7 @@ import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.config.DedupConfig;
import scala.collection.JavaConverters;
public class SparkCreateDedupRecord extends AbstractSparkAction {
@ -85,6 +87,36 @@ public class SparkCreateDedupRecord extends AbstractSparkAction {
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
log.info("Updating mergerels for: '{}'", subEntity);
final Dataset<Row> dedupIds = spark
.read()
.schema("`id` STRING, `mergedIds` ARRAY<STRING>")
.json(outputPath)
.selectExpr("id as source", "explode(mergedIds) as target");
spark
.read()
.load(mergeRelPath)
.where("relClass == 'merges'")
.join(dedupIds, JavaConverters.asScalaBuffer(Arrays.asList("source", "target")).toSeq(), "left_semi")
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.save(workingPath + "/mergerel_filtered");
final Dataset<Row> validRels = spark.read().load(workingPath + "/mergerel_filtered");
final Dataset<Row> filteredMergeRels = validRels
.union(
validRels
.withColumnRenamed("source", "source_tmp")
.withColumnRenamed("target", "target_tmp")
.withColumn("relClass", functions.lit(ModelConstants.IS_MERGED_IN))
.withColumnRenamed("target_tmp", "source")
.withColumnRenamed("source_tmp", "target"));
saveParquet(filteredMergeRels, mergeRelPath, SaveMode.Overwrite);
removeOutputDir(spark, workingPath + "/mergerel_filtered");
}
}

View File

@ -937,7 +937,7 @@
<commons.logging.version>1.1.3</commons.logging.version>
<commons-validator.version>1.7</commons-validator.version>
<dateparser.version>1.0.7</dateparser.version>
<dhp-schemas.version>[8.0.1]</dhp-schemas.version>
<dhp-schemas.version>[9.0.0]</dhp-schemas.version>
<dhp.cdh.version>cdh5.9.2</dhp.cdh.version>
<dhp.commons.lang.version>3.5</dhp.commons.lang.version>
<dhp.guava.version>11.0.2</dhp.guava.version>