Fill mergedIds field and filter mergerels with dedup records actually created #500
|
@ -2,14 +2,13 @@
|
|||
package eu.dnetlib.dhp.oa.dedup;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.apache.commons.beanutils.BeanUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.spark.api.java.function.FlatMapFunction;
|
||||
import org.apache.spark.api.java.function.FlatMapGroupsFunction;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.api.java.function.ReduceFunction;
|
||||
import org.apache.spark.sql.*;
|
||||
|
||||
import eu.dnetlib.dhp.oa.dedup.model.Identifier;
|
||||
|
@ -107,6 +106,8 @@ public class DedupRecordFactory {
|
|||
|
||||
final HashSet<String> acceptanceDate = new HashSet<>();
|
||||
|
||||
boolean isVisible = false;
|
||||
|
||||
while (it.hasNext()) {
|
||||
Tuple3<String, String, OafEntity> t = it.next();
|
||||
OafEntity entity = t._3();
|
||||
|
@ -114,6 +115,7 @@ public class DedupRecordFactory {
|
|||
if (entity == null) {
|
||||
aliases.add(t._2());
|
||||
} else {
|
||||
isVisible = isVisible || !entity.getDataInfo().getInvisible();
|
||||
cliques.add(entity);
|
||||
|
||||
if (acceptanceDate.size() < MAX_ACCEPTANCE_DATE) {
|
||||
|
@ -129,13 +131,20 @@ public class DedupRecordFactory {
|
|||
|
||||
}
|
||||
|
||||
if (acceptanceDate.size() >= MAX_ACCEPTANCE_DATE || cliques.isEmpty()) {
|
||||
if (!isVisible || acceptanceDate.size() >= MAX_ACCEPTANCE_DATE || cliques.isEmpty()) {
|
||||
return Collections.emptyIterator();
|
||||
}
|
||||
|
||||
OafEntity mergedEntity = MergeUtils.mergeGroup(dedupId, cliques.iterator());
|
||||
// dedup records do not have date of transformation attribute
|
||||
mergedEntity.setDateoftransformation(null);
|
||||
mergedEntity
|
||||
.setMergedIds(
|
||||
Stream
|
||||
.concat(cliques.stream().map(OafEntity::getId), aliases.stream())
|
||||
.distinct()
|
||||
.sorted()
|
||||
.collect(Collectors.toList()));
|
||||
|
||||
return Stream
|
||||
.concat(
|
||||
|
|
|
@ -5,11 +5,11 @@ import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTION
|
|||
import static eu.dnetlib.dhp.schema.common.ModelConstants.PROVENANCE_DEDUP;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.apache.spark.sql.*;
|
||||
import org.dom4j.DocumentException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -17,6 +17,7 @@ import org.xml.sax.SAXException;
|
|||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.schema.common.EntityType;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
||||
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
||||
|
@ -25,6 +26,7 @@ import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
|||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
import scala.collection.JavaConverters;
|
||||
|
||||
public class SparkCreateDedupRecord extends AbstractSparkAction {
|
||||
|
||||
|
@ -85,6 +87,36 @@ public class SparkCreateDedupRecord extends AbstractSparkAction {
|
|||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(outputPath);
|
||||
|
||||
log.info("Updating mergerels for: '{}'", subEntity);
|
||||
final Dataset<Row> dedupIds = spark
|
||||
.read()
|
||||
.schema("`id` STRING, `mergedIds` ARRAY<STRING>")
|
||||
.json(outputPath)
|
||||
.selectExpr("id as source", "explode(mergedIds) as target");
|
||||
spark
|
||||
.read()
|
||||
.load(mergeRelPath)
|
||||
.where("relClass == 'merges'")
|
||||
.join(dedupIds, JavaConverters.asScalaBuffer(Arrays.asList("source", "target")).toSeq(), "left_semi")
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.save(workingPath + "/mergerel_filtered");
|
||||
|
||||
final Dataset<Row> validRels = spark.read().load(workingPath + "/mergerel_filtered");
|
||||
|
||||
final Dataset<Row> filteredMergeRels = validRels
|
||||
.union(
|
||||
validRels
|
||||
.withColumnRenamed("source", "source_tmp")
|
||||
.withColumnRenamed("target", "target_tmp")
|
||||
.withColumn("relClass", functions.lit(ModelConstants.IS_MERGED_IN))
|
||||
.withColumnRenamed("target_tmp", "source")
|
||||
.withColumnRenamed("source_tmp", "target"));
|
||||
|
||||
saveParquet(filteredMergeRels, mergeRelPath, SaveMode.Overwrite);
|
||||
removeOutputDir(spark, workingPath + "/mergerel_filtered");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
2
pom.xml
2
pom.xml
|
@ -937,7 +937,7 @@
|
|||
<commons.logging.version>1.1.3</commons.logging.version>
|
||||
<commons-validator.version>1.7</commons-validator.version>
|
||||
<dateparser.version>1.0.7</dateparser.version>
|
||||
<dhp-schemas.version>[8.0.1]</dhp-schemas.version>
|
||||
<dhp-schemas.version>[9.0.0]</dhp-schemas.version>
|
||||
<dhp.cdh.version>cdh5.9.2</dhp.cdh.version>
|
||||
<dhp.commons.lang.version>3.5</dhp.commons.lang.version>
|
||||
<dhp.guava.version>11.0.2</dhp.guava.version>
|
||||
|
|
Loading…
Reference in New Issue