added IdentifierComparator

This commit is contained in:
Claudio Atzori 2022-11-09 14:20:59 +01:00
parent 0419953470
commit 5af5a8ae42
7 changed files with 134 additions and 58 deletions

View File

@ -2,8 +2,10 @@
package eu.dnetlib.dhp.oa.dedup; package eu.dnetlib.dhp.oa.dedup;
import java.util.Collection; import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.stream.Collectors;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.api.java.function.MapGroupsFunction;
@ -13,11 +15,14 @@ import org.apache.spark.sql.SparkSession;
import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import eu.dnetlib.dhp.oa.dedup.model.Identifier;
import eu.dnetlib.dhp.oa.merge.AuthorMerger; import eu.dnetlib.dhp.oa.merge.AuthorMerger;
import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
import scala.Tuple2; import scala.Tuple2;
public class DedupRecordFactory { public class DedupRecordFactory {
@ -82,11 +87,19 @@ public class DedupRecordFactory {
final Collection<String> dates = Lists.newArrayList(); final Collection<String> dates = Lists.newArrayList();
final List<List<Author>> authors = Lists.newArrayList(); final List<List<Author>> authors = Lists.newArrayList();
entities final Comparator<Identifier<T>> idComparator = new IdentifierComparator<T>().reversed();
.forEachRemaining(
t -> {
T duplicate = t._2();
final List<T> entityList = Lists
.newArrayList(entities)
.stream()
.map(t -> Identifier.newInstance(t._2()))
.sorted(idComparator)
.map(Identifier::getEntity)
.collect(Collectors.toList());
entityList
.forEach(
duplicate -> {
entity.mergeFrom(duplicate); entity.mergeFrom(duplicate);
if (ModelSupport.isSubClass(duplicate, Result.class)) { if (ModelSupport.isSubClass(duplicate, Result.class)) {
Result r1 = (Result) duplicate; Result r1 = (Result) duplicate;

View File

@ -18,6 +18,10 @@ public class IdGenerator implements Serializable {
if (pids == null || pids.isEmpty()) if (pids == null || pids.isEmpty())
return defaultID; return defaultID;
return generateId(pids);
}
private static <T extends OafEntity> String generateId(List<Identifier<T>> pids) {
Identifier<T> bp = pids Identifier<T> bp = pids
.stream() .stream()
.min(Identifier::compareTo) .min(Identifier::compareTo)

View File

@ -0,0 +1,81 @@
package eu.dnetlib.dhp.oa.dedup;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import com.google.common.collect.Sets;
import eu.dnetlib.dhp.oa.dedup.model.Identifier;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import eu.dnetlib.dhp.schema.oaf.utils.PidComparator;
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
public class IdentifierComparator<T extends OafEntity> implements Comparator<Identifier<T>> {
public static int compareIdentifiers(Identifier left, Identifier right) {
return new IdentifierComparator<>().compare(left, right);
}
@Override
public int compare(Identifier<T> left, Identifier<T> i) {
// priority in comparisons: 1) pidtype, 2) collectedfrom (depending on the entity type) , 3) date 4)
// alphabetical order of the originalID
Set<String> lKeys = Optional
.ofNullable(left.getCollectedFrom())
.map(c -> c.stream().map(KeyValue::getKey).collect(Collectors.toSet()))
.orElse(Sets.newHashSet());
final Optional<List<KeyValue>> cf = Optional.ofNullable(i.getCollectedFrom());
Set<String> rKeys = cf
.map(c -> c.stream().map(KeyValue::getKey).collect(Collectors.toSet()))
.orElse(Sets.newHashSet());
if (left.getPidType().compareTo(i.getPidType()) == 0) { // same type
if (left.getEntityType() == EntityType.publication) {
if (isFromDatasourceID(lKeys, ModelConstants.CROSSREF_ID)
&& !isFromDatasourceID(rKeys, ModelConstants.CROSSREF_ID))
return -1;
if (isFromDatasourceID(rKeys, ModelConstants.CROSSREF_ID)
&& !isFromDatasourceID(lKeys, ModelConstants.CROSSREF_ID))
return 1;
}
if (left.getEntityType() == EntityType.dataset) {
if (isFromDatasourceID(lKeys, ModelConstants.DATACITE_ID)
&& !isFromDatasourceID(rKeys, ModelConstants.DATACITE_ID))
return -1;
if (isFromDatasourceID(rKeys, ModelConstants.DATACITE_ID)
&& !isFromDatasourceID(lKeys, ModelConstants.DATACITE_ID))
return 1;
}
if (left.getDate().compareTo(i.getDate()) == 0) {// same date
// we need to take the alphabetically lower id
return left.getOriginalID().compareTo(i.getOriginalID());
} else
// we need to take the elder date
return left.getDate().compareTo(i.getDate());
} else {
return new PidComparator<>(left.getEntity()).compare(toSP(left.getPidType()), toSP(i.getPidType()));
}
}
public boolean isFromDatasourceID(Set<String> collectedFrom, String dsId) {
return collectedFrom.contains(dsId);
}
private StructuredProperty toSP(PidType pidType) {
return OafMapperUtils.structuredProperty("", pidType.toString(), pidType.toString(), "", "", new DataInfo());
}
}

View File

@ -11,6 +11,7 @@ import org.apache.commons.lang3.StringUtils;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import eu.dnetlib.dhp.oa.dedup.DatePicker; import eu.dnetlib.dhp.oa.dedup.DatePicker;
import eu.dnetlib.dhp.oa.dedup.IdentifierComparator;
import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.common.ModelSupport;
@ -83,60 +84,12 @@ public class Identifier<T extends OafEntity> implements Serializable, Comparable
return entity.getId(); return entity.getId();
} }
private PidType getPidType() { public PidType getPidType() {
return PidType.tryValueOf(StringUtils.substringBefore(StringUtils.substringAfter(entity.getId(), "|"), "_")); return PidType.tryValueOf(StringUtils.substringBefore(StringUtils.substringAfter(entity.getId(), "|"), "_"));
} }
@Override @Override
public int compareTo(Identifier<T> i) { public int compareTo(Identifier<T> i) {
// priority in comparisons: 1) pidtype, 2) collectedfrom (depending on the entity type) , 3) date 4) return IdentifierComparator.compareIdentifiers(this, i);
// alphabetical order of the originalID
Set<String> lKeys = Optional
.ofNullable(getCollectedFrom())
.map(c -> c.stream().map(KeyValue::getKey).collect(Collectors.toSet()))
.orElse(Sets.newHashSet());
final Optional<List<KeyValue>> cf = Optional.ofNullable(i.getCollectedFrom());
Set<String> rKeys = cf
.map(c -> c.stream().map(KeyValue::getKey).collect(Collectors.toSet()))
.orElse(Sets.newHashSet());
if (this.getPidType().compareTo(i.getPidType()) == 0) { // same type
if (getEntityType() == EntityType.publication) {
if (isFromDatasourceID(lKeys, ModelConstants.CROSSREF_ID)
&& !isFromDatasourceID(rKeys, ModelConstants.CROSSREF_ID))
return -1;
if (isFromDatasourceID(rKeys, ModelConstants.CROSSREF_ID)
&& !isFromDatasourceID(lKeys, ModelConstants.CROSSREF_ID))
return 1;
}
if (getEntityType() == EntityType.dataset) {
if (isFromDatasourceID(lKeys, ModelConstants.DATACITE_ID)
&& !isFromDatasourceID(rKeys, ModelConstants.DATACITE_ID))
return -1;
if (isFromDatasourceID(rKeys, ModelConstants.DATACITE_ID)
&& !isFromDatasourceID(lKeys, ModelConstants.DATACITE_ID))
return 1;
}
if (this.getDate().compareTo(i.getDate()) == 0) {// same date
// we need to take the alphabetically lower id
return this.getOriginalID().compareTo(i.getOriginalID());
} else
// we need to take the elder date
return this.getDate().compareTo(i.getDate());
} else {
return new PidComparator<>(getEntity()).compare(toSP(getPidType()), toSP(i.getPidType()));
}
}
private StructuredProperty toSP(PidType pidType) {
return OafMapperUtils.structuredProperty("", pidType.toString(), pidType.toString(), "", "", new DataInfo());
}
public boolean isFromDatasourceID(Set<String> collectedFrom, String dsId) {
return collectedFrom.contains(dsId);
} }
} }

View File

@ -143,7 +143,7 @@ public class SparkOpenorgsDedupTest implements Serializable {
.load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "organization")) .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "organization"))
.count(); .count();
assertEquals(288, orgs_simrel); assertEquals(290, orgs_simrel);
} }
@Test @Test
@ -172,7 +172,7 @@ public class SparkOpenorgsDedupTest implements Serializable {
.load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "organization")) .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "organization"))
.count(); .count();
assertEquals(324, orgs_simrel); assertEquals(326, orgs_simrel);
} }
@Test @Test

View File

@ -168,11 +168,11 @@ public class SparkStatsTest implements Serializable {
.textFile(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_blockstats") .textFile(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_blockstats")
.count(); .count();
assertEquals(477, orgs_blocks); assertEquals(480, orgs_blocks);
assertEquals(295, pubs_blocks); assertEquals(295, pubs_blocks);
assertEquals(122, sw_blocks); assertEquals(122, sw_blocks);
assertEquals(191, ds_blocks); assertEquals(191, ds_blocks);
assertEquals(171, orp_blocks); assertEquals(178, orp_blocks);
} }
@AfterAll @AfterAll

View File

@ -0,0 +1,25 @@
# Root logger option
log4j.rootLogger=DEBUG, stdout
# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
# Change this to set Spark log level
log4j.logger.org.apache.spark=ERROR
log4j.rootCategory=WARN
# Silence akka remoting
log4j.logger.Remoting=WARN
# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.apache.hadoop.mapreduce.lib.output.FileOutputCommitterFactory=WARN
log4j.logger.org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter=WARN
log4j.logger.org.apache.parquet.hadoop.ParquetOutputFormat=WARN
log4j.logger.org.apache.parquet.hadoop.InternalParquetRecordWriter=WARN
log4j.logger.org.apache.hadoop.io.compress.CodecPool=WARN
log4j.logger.org.apache.parquet.hadoop.codec.CodecConfig=WARN