Fill mergedIds field and filter mergerels with dedup records actually created #500
|
@ -2,14 +2,13 @@
|
||||||
package eu.dnetlib.dhp.oa.dedup;
|
package eu.dnetlib.dhp.oa.dedup;
|
||||||
|
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import org.apache.commons.beanutils.BeanUtils;
|
import org.apache.commons.beanutils.BeanUtils;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.spark.api.java.function.FlatMapFunction;
|
|
||||||
import org.apache.spark.api.java.function.FlatMapGroupsFunction;
|
import org.apache.spark.api.java.function.FlatMapGroupsFunction;
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
import org.apache.spark.api.java.function.ReduceFunction;
|
|
||||||
import org.apache.spark.sql.*;
|
import org.apache.spark.sql.*;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.oa.dedup.model.Identifier;
|
import eu.dnetlib.dhp.oa.dedup.model.Identifier;
|
||||||
|
@ -107,6 +106,8 @@ public class DedupRecordFactory {
|
||||||
|
|
||||||
final HashSet<String> acceptanceDate = new HashSet<>();
|
final HashSet<String> acceptanceDate = new HashSet<>();
|
||||||
|
|
||||||
|
boolean isVisible = false;
|
||||||
|
|
||||||
while (it.hasNext()) {
|
while (it.hasNext()) {
|
||||||
Tuple3<String, String, OafEntity> t = it.next();
|
Tuple3<String, String, OafEntity> t = it.next();
|
||||||
OafEntity entity = t._3();
|
OafEntity entity = t._3();
|
||||||
|
@ -114,6 +115,7 @@ public class DedupRecordFactory {
|
||||||
if (entity == null) {
|
if (entity == null) {
|
||||||
aliases.add(t._2());
|
aliases.add(t._2());
|
||||||
} else {
|
} else {
|
||||||
|
isVisible = isVisible || !entity.getDataInfo().getInvisible();
|
||||||
cliques.add(entity);
|
cliques.add(entity);
|
||||||
|
|
||||||
if (acceptanceDate.size() < MAX_ACCEPTANCE_DATE) {
|
if (acceptanceDate.size() < MAX_ACCEPTANCE_DATE) {
|
||||||
|
@ -129,13 +131,20 @@ public class DedupRecordFactory {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (acceptanceDate.size() >= MAX_ACCEPTANCE_DATE || cliques.isEmpty()) {
|
if (!isVisible || acceptanceDate.size() >= MAX_ACCEPTANCE_DATE || cliques.isEmpty()) {
|
||||||
return Collections.emptyIterator();
|
return Collections.emptyIterator();
|
||||||
}
|
}
|
||||||
|
|
||||||
OafEntity mergedEntity = MergeUtils.mergeGroup(dedupId, cliques.iterator());
|
OafEntity mergedEntity = MergeUtils.mergeGroup(dedupId, cliques.iterator());
|
||||||
// dedup records do not have date of transformation attribute
|
// dedup records do not have date of transformation attribute
|
||||||
mergedEntity.setDateoftransformation(null);
|
mergedEntity.setDateoftransformation(null);
|
||||||
|
mergedEntity
|
||||||
|
.setMergedIds(
|
||||||
|
Stream
|
||||||
|
.concat(cliques.stream().map(OafEntity::getId), aliases.stream())
|
||||||
|
.distinct()
|
||||||
|
.sorted()
|
||||||
|
.collect(Collectors.toList()));
|
||||||
|
|
||||||
return Stream
|
return Stream
|
||||||
.concat(
|
.concat(
|
||||||
|
|
|
@ -5,11 +5,11 @@ import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTION
|
||||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.PROVENANCE_DEDUP;
|
import static eu.dnetlib.dhp.schema.common.ModelConstants.PROVENANCE_DEDUP;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.sql.SaveMode;
|
import org.apache.spark.sql.*;
|
||||||
import org.apache.spark.sql.SparkSession;
|
|
||||||
import org.dom4j.DocumentException;
|
import org.dom4j.DocumentException;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -17,6 +17,7 @@ import org.xml.sax.SAXException;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.schema.common.EntityType;
|
import eu.dnetlib.dhp.schema.common.EntityType;
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||||
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
||||||
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
||||||
|
@ -25,6 +26,7 @@ import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||||
import eu.dnetlib.pace.config.DedupConfig;
|
import eu.dnetlib.pace.config.DedupConfig;
|
||||||
|
import scala.collection.JavaConverters;
|
||||||
|
|
||||||
public class SparkCreateDedupRecord extends AbstractSparkAction {
|
public class SparkCreateDedupRecord extends AbstractSparkAction {
|
||||||
|
|
||||||
|
@ -85,6 +87,36 @@ public class SparkCreateDedupRecord extends AbstractSparkAction {
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.option("compression", "gzip")
|
.option("compression", "gzip")
|
||||||
.json(outputPath);
|
.json(outputPath);
|
||||||
|
|
||||||
|
log.info("Updating mergerels for: '{}'", subEntity);
|
||||||
|
final Dataset<Row> dedupIds = spark
|
||||||
|
.read()
|
||||||
|
.schema("`id` STRING, `mergedIds` ARRAY<STRING>")
|
||||||
|
.json(outputPath)
|
||||||
|
.selectExpr("id as source", "explode(mergedIds) as target");
|
||||||
|
spark
|
||||||
|
.read()
|
||||||
|
.load(mergeRelPath)
|
||||||
|
.where("relClass == 'merges'")
|
||||||
|
.join(dedupIds, JavaConverters.asScalaBuffer(Arrays.asList("source", "target")).toSeq(), "left_semi")
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.save(workingPath + "/mergerel_filtered");
|
||||||
|
|
||||||
|
final Dataset<Row> validRels = spark.read().load(workingPath + "/mergerel_filtered");
|
||||||
|
|
||||||
|
final Dataset<Row> filteredMergeRels = validRels
|
||||||
|
.union(
|
||||||
|
validRels
|
||||||
|
.withColumnRenamed("source", "source_tmp")
|
||||||
|
.withColumnRenamed("target", "target_tmp")
|
||||||
|
.withColumn("relClass", functions.lit(ModelConstants.IS_MERGED_IN))
|
||||||
|
.withColumnRenamed("target_tmp", "source")
|
||||||
|
.withColumnRenamed("source_tmp", "target"));
|
||||||
|
|
||||||
|
saveParquet(filteredMergeRels, mergeRelPath, SaveMode.Overwrite);
|
||||||
|
removeOutputDir(spark, workingPath + "/mergerel_filtered");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
2
pom.xml
2
pom.xml
|
@ -937,7 +937,7 @@
|
||||||
<commons.logging.version>1.1.3</commons.logging.version>
|
<commons.logging.version>1.1.3</commons.logging.version>
|
||||||
<commons-validator.version>1.7</commons-validator.version>
|
<commons-validator.version>1.7</commons-validator.version>
|
||||||
<dateparser.version>1.0.7</dateparser.version>
|
<dateparser.version>1.0.7</dateparser.version>
|
||||||
<dhp-schemas.version>[8.0.1]</dhp-schemas.version>
|
<dhp-schemas.version>[9.0.0]</dhp-schemas.version>
|
||||||
<dhp.cdh.version>cdh5.9.2</dhp.cdh.version>
|
<dhp.cdh.version>cdh5.9.2</dhp.cdh.version>
|
||||||
<dhp.commons.lang.version>3.5</dhp.commons.lang.version>
|
<dhp.commons.lang.version>3.5</dhp.commons.lang.version>
|
||||||
<dhp.guava.version>11.0.2</dhp.guava.version>
|
<dhp.guava.version>11.0.2</dhp.guava.version>
|
||||||
|
|
Loading…
Reference in New Issue