From 5af5a8ae42c5c3303bad4f0eeaccee518ef0480f Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 9 Nov 2022 14:20:59 +0100 Subject: [PATCH] added IdentifierComparator --- .../dhp/oa/dedup/DedupRecordFactory.java | 21 ++++- .../eu/dnetlib/dhp/oa/dedup/IdGenerator.java | 4 + .../dhp/oa/dedup/IdentifierComparator.java | 81 +++++++++++++++++++ .../dhp/oa/dedup/model/Identifier.java | 53 +----------- .../dhp/oa/dedup/SparkOpenorgsDedupTest.java | 4 +- .../dnetlib/dhp/oa/dedup/SparkStatsTest.java | 4 +- .../src/test/resources/log4j.properties | 25 ++++++ 7 files changed, 134 insertions(+), 58 deletions(-) create mode 100644 dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/IdentifierComparator.java create mode 100644 dhp-workflows/dhp-dedup-openaire/src/test/resources/log4j.properties diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java index d345cf98f..c3b6751ba 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java @@ -2,8 +2,10 @@ package eu.dnetlib.dhp.oa.dedup; import java.util.Collection; +import java.util.Comparator; import java.util.Iterator; import java.util.List; +import java.util.stream.Collectors; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapGroupsFunction; @@ -13,11 +15,14 @@ import org.apache.spark.sql.SparkSession; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Iterators; import com.google.common.collect.Lists; +import eu.dnetlib.dhp.oa.dedup.model.Identifier; import eu.dnetlib.dhp.oa.merge.AuthorMerger; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; import scala.Tuple2; public class DedupRecordFactory { @@ -82,11 +87,19 @@ public class DedupRecordFactory { final Collection dates = Lists.newArrayList(); final List> authors = Lists.newArrayList(); - entities - .forEachRemaining( - t -> { - T duplicate = t._2(); + final Comparator> idComparator = new IdentifierComparator().reversed(); + final List entityList = Lists + .newArrayList(entities) + .stream() + .map(t -> Identifier.newInstance(t._2())) + .sorted(idComparator) + .map(Identifier::getEntity) + .collect(Collectors.toList()); + + entityList + .forEach( + duplicate -> { entity.mergeFrom(duplicate); if (ModelSupport.isSubClass(duplicate, Result.class)) { Result r1 = (Result) duplicate; diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/IdGenerator.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/IdGenerator.java index 81cd30f88..7e0d66062 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/IdGenerator.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/IdGenerator.java @@ -18,6 +18,10 @@ public class IdGenerator implements Serializable { if (pids == null || pids.isEmpty()) return defaultID; + return generateId(pids); + } + + private static String generateId(List> pids) { Identifier bp = pids .stream() .min(Identifier::compareTo) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/IdentifierComparator.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/IdentifierComparator.java new file mode 100644 index 000000000..ba4e31128 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/IdentifierComparator.java @@ -0,0 +1,81 @@ + +package eu.dnetlib.dhp.oa.dedup; + +import java.util.Comparator; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import com.google.common.collect.Sets; + +import eu.dnetlib.dhp.oa.dedup.model.Identifier; +import eu.dnetlib.dhp.schema.common.EntityType; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.dhp.schema.oaf.StructuredProperty; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; +import eu.dnetlib.dhp.schema.oaf.utils.PidComparator; +import eu.dnetlib.dhp.schema.oaf.utils.PidType; + +public class IdentifierComparator implements Comparator> { + + public static int compareIdentifiers(Identifier left, Identifier right) { + return new IdentifierComparator<>().compare(left, right); + } + + @Override + public int compare(Identifier left, Identifier i) { + // priority in comparisons: 1) pidtype, 2) collectedfrom (depending on the entity type) , 3) date 4) + // alphabetical order of the originalID + + Set lKeys = Optional + .ofNullable(left.getCollectedFrom()) + .map(c -> c.stream().map(KeyValue::getKey).collect(Collectors.toSet())) + .orElse(Sets.newHashSet()); + + final Optional> cf = Optional.ofNullable(i.getCollectedFrom()); + Set rKeys = cf + .map(c -> c.stream().map(KeyValue::getKey).collect(Collectors.toSet())) + .orElse(Sets.newHashSet()); + + if (left.getPidType().compareTo(i.getPidType()) == 0) { // same type + if (left.getEntityType() == EntityType.publication) { + if (isFromDatasourceID(lKeys, ModelConstants.CROSSREF_ID) + && !isFromDatasourceID(rKeys, ModelConstants.CROSSREF_ID)) + return -1; + if (isFromDatasourceID(rKeys, ModelConstants.CROSSREF_ID) + && !isFromDatasourceID(lKeys, ModelConstants.CROSSREF_ID)) + return 1; + } + if (left.getEntityType() == EntityType.dataset) { + if (isFromDatasourceID(lKeys, ModelConstants.DATACITE_ID) + && !isFromDatasourceID(rKeys, ModelConstants.DATACITE_ID)) + return -1; + if (isFromDatasourceID(rKeys, ModelConstants.DATACITE_ID) + && !isFromDatasourceID(lKeys, ModelConstants.DATACITE_ID)) + return 1; + } + + if (left.getDate().compareTo(i.getDate()) == 0) {// same date + // we need to take the alphabetically lower id + return left.getOriginalID().compareTo(i.getOriginalID()); + } else + // we need to take the elder date + return left.getDate().compareTo(i.getDate()); + } else { + return new PidComparator<>(left.getEntity()).compare(toSP(left.getPidType()), toSP(i.getPidType())); + } + } + + public boolean isFromDatasourceID(Set collectedFrom, String dsId) { + return collectedFrom.contains(dsId); + } + + private StructuredProperty toSP(PidType pidType) { + return OafMapperUtils.structuredProperty("", pidType.toString(), pidType.toString(), "", "", new DataInfo()); + } + +} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/Identifier.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/Identifier.java index a25a853ef..0cba4fc3b 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/Identifier.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/Identifier.java @@ -11,6 +11,7 @@ import org.apache.commons.lang3.StringUtils; import com.google.common.collect.Sets; import eu.dnetlib.dhp.oa.dedup.DatePicker; +import eu.dnetlib.dhp.oa.dedup.IdentifierComparator; import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelSupport; @@ -83,60 +84,12 @@ public class Identifier implements Serializable, Comparable return entity.getId(); } - private PidType getPidType() { + public PidType getPidType() { return PidType.tryValueOf(StringUtils.substringBefore(StringUtils.substringAfter(entity.getId(), "|"), "_")); } @Override public int compareTo(Identifier i) { - // priority in comparisons: 1) pidtype, 2) collectedfrom (depending on the entity type) , 3) date 4) - // alphabetical order of the originalID - - Set lKeys = Optional - .ofNullable(getCollectedFrom()) - .map(c -> c.stream().map(KeyValue::getKey).collect(Collectors.toSet())) - .orElse(Sets.newHashSet()); - - final Optional> cf = Optional.ofNullable(i.getCollectedFrom()); - Set rKeys = cf - .map(c -> c.stream().map(KeyValue::getKey).collect(Collectors.toSet())) - .orElse(Sets.newHashSet()); - - if (this.getPidType().compareTo(i.getPidType()) == 0) { // same type - if (getEntityType() == EntityType.publication) { - if (isFromDatasourceID(lKeys, ModelConstants.CROSSREF_ID) - && !isFromDatasourceID(rKeys, ModelConstants.CROSSREF_ID)) - return -1; - if (isFromDatasourceID(rKeys, ModelConstants.CROSSREF_ID) - && !isFromDatasourceID(lKeys, ModelConstants.CROSSREF_ID)) - return 1; - } - if (getEntityType() == EntityType.dataset) { - if (isFromDatasourceID(lKeys, ModelConstants.DATACITE_ID) - && !isFromDatasourceID(rKeys, ModelConstants.DATACITE_ID)) - return -1; - if (isFromDatasourceID(rKeys, ModelConstants.DATACITE_ID) - && !isFromDatasourceID(lKeys, ModelConstants.DATACITE_ID)) - return 1; - } - - if (this.getDate().compareTo(i.getDate()) == 0) {// same date - // we need to take the alphabetically lower id - return this.getOriginalID().compareTo(i.getOriginalID()); - } else - // we need to take the elder date - return this.getDate().compareTo(i.getDate()); - } else { - return new PidComparator<>(getEntity()).compare(toSP(getPidType()), toSP(i.getPidType())); - } - - } - - private StructuredProperty toSP(PidType pidType) { - return OafMapperUtils.structuredProperty("", pidType.toString(), pidType.toString(), "", "", new DataInfo()); - } - - public boolean isFromDatasourceID(Set collectedFrom, String dsId) { - return collectedFrom.contains(dsId); + return IdentifierComparator.compareIdentifiers(this, i); } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsDedupTest.java index 9312d83b1..88c28ab2f 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsDedupTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsDedupTest.java @@ -143,7 +143,7 @@ public class SparkOpenorgsDedupTest implements Serializable { .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "organization")) .count(); - assertEquals(288, orgs_simrel); + assertEquals(290, orgs_simrel); } @Test @@ -172,7 +172,7 @@ public class SparkOpenorgsDedupTest implements Serializable { .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "organization")) .count(); - assertEquals(324, orgs_simrel); + assertEquals(326, orgs_simrel); } @Test diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkStatsTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkStatsTest.java index 1ba2c717c..b33b627e7 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkStatsTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkStatsTest.java @@ -168,11 +168,11 @@ public class SparkStatsTest implements Serializable { .textFile(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_blockstats") .count(); - assertEquals(477, orgs_blocks); + assertEquals(480, orgs_blocks); assertEquals(295, pubs_blocks); assertEquals(122, sw_blocks); assertEquals(191, ds_blocks); - assertEquals(171, orp_blocks); + assertEquals(178, orp_blocks); } @AfterAll diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/log4j.properties b/dhp-workflows/dhp-dedup-openaire/src/test/resources/log4j.properties new file mode 100644 index 000000000..ce37270c6 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/test/resources/log4j.properties @@ -0,0 +1,25 @@ +# Root logger option +log4j.rootLogger=DEBUG, stdout + +# Direct log messages to stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n + +# Change this to set Spark log level +log4j.logger.org.apache.spark=ERROR +log4j.rootCategory=WARN + +# Silence akka remoting +log4j.logger.Remoting=WARN + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.eclipse.jetty=WARN + +log4j.logger.org.apache.hadoop.mapreduce.lib.output.FileOutputCommitterFactory=WARN +log4j.logger.org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter=WARN +log4j.logger.org.apache.parquet.hadoop.ParquetOutputFormat=WARN +log4j.logger.org.apache.parquet.hadoop.InternalParquetRecordWriter=WARN +log4j.logger.org.apache.hadoop.io.compress.CodecPool=WARN +log4j.logger.org.apache.parquet.hadoop.codec.CodecConfig=WARN \ No newline at end of file