diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java index d345cf98f1..82bf87ccaf 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java @@ -1,10 +1,12 @@ package eu.dnetlib.dhp.oa.dedup; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; +import java.lang.reflect.InvocationTargetException; +import java.util.*; +import java.util.stream.Collectors; +import org.apache.commons.beanutils.BeanUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.sql.Dataset; @@ -15,6 +17,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; +import eu.dnetlib.dhp.oa.dedup.model.Identifier; import eu.dnetlib.dhp.oa.merge.AuthorMerger; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; @@ -74,33 +77,42 @@ public class DedupRecordFactory { public static T entityMerger( String id, Iterator> entities, long ts, DataInfo dataInfo, Class clazz) - throws IllegalAccessException, InstantiationException { + throws IllegalAccessException, InstantiationException, InvocationTargetException { - T entity = clazz.newInstance(); - entity.setDataInfo(dataInfo); + final Comparator> idComparator = new IdentifierComparator<>(); + + final LinkedList entityList = Lists + .newArrayList(entities) + .stream() + .map(t -> Identifier.newInstance(t._2())) + .sorted(idComparator) + .map(Identifier::getEntity) + .collect(Collectors.toCollection(LinkedList::new)); + + final T entity = clazz.newInstance(); + final T first = entityList.removeFirst(); + + BeanUtils.copyProperties(entity, first); final Collection dates = Lists.newArrayList(); final List> authors = Lists.newArrayList(); - entities - .forEachRemaining( - t -> { - T duplicate = t._2(); - + entityList + .forEach( + duplicate -> { entity.mergeFrom(duplicate); if (ModelSupport.isSubClass(duplicate, Result.class)) { Result r1 = (Result) duplicate; - if (r1.getAuthor() != null && !r1.getAuthor().isEmpty()) + if (r1.getAuthor() != null && StringUtils.isNotBlank(r1.getDateofacceptance().getValue())) authors.add(r1.getAuthor()); if (r1.getDateofacceptance() != null) dates.add(r1.getDateofacceptance().getValue()); } - }); // set authors and date if (ModelSupport.isSubClass(entity, Result.class)) { - ((Result) entity).setDateofacceptance(DatePicker.pick(dates)); + // ((Result) entity).setDateofacceptance(DatePicker.pick(dates)); ((Result) entity).setAuthor(AuthorMerger.merge(authors)); } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/IdGenerator.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/IdGenerator.java index 81cd30f88f..7e0d660622 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/IdGenerator.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/IdGenerator.java @@ -18,6 +18,10 @@ public class IdGenerator implements Serializable { if (pids == null || pids.isEmpty()) return defaultID; + return generateId(pids); + } + + private static String generateId(List> pids) { Identifier bp = pids .stream() .min(Identifier::compareTo) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/IdentifierComparator.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/IdentifierComparator.java new file mode 100644 index 0000000000..ba4e311289 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/IdentifierComparator.java @@ -0,0 +1,81 @@ + +package eu.dnetlib.dhp.oa.dedup; + +import java.util.Comparator; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import com.google.common.collect.Sets; + +import eu.dnetlib.dhp.oa.dedup.model.Identifier; +import eu.dnetlib.dhp.schema.common.EntityType; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.dhp.schema.oaf.StructuredProperty; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; +import eu.dnetlib.dhp.schema.oaf.utils.PidComparator; +import eu.dnetlib.dhp.schema.oaf.utils.PidType; + +public class IdentifierComparator implements Comparator> { + + public static int compareIdentifiers(Identifier left, Identifier right) { + return new IdentifierComparator<>().compare(left, right); + } + + @Override + public int compare(Identifier left, Identifier i) { + // priority in comparisons: 1) pidtype, 2) collectedfrom (depending on the entity type) , 3) date 4) + // alphabetical order of the originalID + + Set lKeys = Optional + .ofNullable(left.getCollectedFrom()) + .map(c -> c.stream().map(KeyValue::getKey).collect(Collectors.toSet())) + .orElse(Sets.newHashSet()); + + final Optional> cf = Optional.ofNullable(i.getCollectedFrom()); + Set rKeys = cf + .map(c -> c.stream().map(KeyValue::getKey).collect(Collectors.toSet())) + .orElse(Sets.newHashSet()); + + if (left.getPidType().compareTo(i.getPidType()) == 0) { // same type + if (left.getEntityType() == EntityType.publication) { + if (isFromDatasourceID(lKeys, ModelConstants.CROSSREF_ID) + && !isFromDatasourceID(rKeys, ModelConstants.CROSSREF_ID)) + return -1; + if (isFromDatasourceID(rKeys, ModelConstants.CROSSREF_ID) + && !isFromDatasourceID(lKeys, ModelConstants.CROSSREF_ID)) + return 1; + } + if (left.getEntityType() == EntityType.dataset) { + if (isFromDatasourceID(lKeys, ModelConstants.DATACITE_ID) + && !isFromDatasourceID(rKeys, ModelConstants.DATACITE_ID)) + return -1; + if (isFromDatasourceID(rKeys, ModelConstants.DATACITE_ID) + && !isFromDatasourceID(lKeys, ModelConstants.DATACITE_ID)) + return 1; + } + + if (left.getDate().compareTo(i.getDate()) == 0) {// same date + // we need to take the alphabetically lower id + return left.getOriginalID().compareTo(i.getOriginalID()); + } else + // we need to take the elder date + return left.getDate().compareTo(i.getDate()); + } else { + return new PidComparator<>(left.getEntity()).compare(toSP(left.getPidType()), toSP(i.getPidType())); + } + } + + public boolean isFromDatasourceID(Set collectedFrom, String dsId) { + return collectedFrom.contains(dsId); + } + + private StructuredProperty toSP(PidType pidType) { + return OafMapperUtils.structuredProperty("", pidType.toString(), pidType.toString(), "", "", new DataInfo()); + } + +} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/Identifier.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/Identifier.java index a25a853ef6..0cba4fc3ba 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/Identifier.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/Identifier.java @@ -11,6 +11,7 @@ import org.apache.commons.lang3.StringUtils; import com.google.common.collect.Sets; import eu.dnetlib.dhp.oa.dedup.DatePicker; +import eu.dnetlib.dhp.oa.dedup.IdentifierComparator; import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelSupport; @@ -83,60 +84,12 @@ public class Identifier implements Serializable, Comparable return entity.getId(); } - private PidType getPidType() { + public PidType getPidType() { return PidType.tryValueOf(StringUtils.substringBefore(StringUtils.substringAfter(entity.getId(), "|"), "_")); } @Override public int compareTo(Identifier i) { - // priority in comparisons: 1) pidtype, 2) collectedfrom (depending on the entity type) , 3) date 4) - // alphabetical order of the originalID - - Set lKeys = Optional - .ofNullable(getCollectedFrom()) - .map(c -> c.stream().map(KeyValue::getKey).collect(Collectors.toSet())) - .orElse(Sets.newHashSet()); - - final Optional> cf = Optional.ofNullable(i.getCollectedFrom()); - Set rKeys = cf - .map(c -> c.stream().map(KeyValue::getKey).collect(Collectors.toSet())) - .orElse(Sets.newHashSet()); - - if (this.getPidType().compareTo(i.getPidType()) == 0) { // same type - if (getEntityType() == EntityType.publication) { - if (isFromDatasourceID(lKeys, ModelConstants.CROSSREF_ID) - && !isFromDatasourceID(rKeys, ModelConstants.CROSSREF_ID)) - return -1; - if (isFromDatasourceID(rKeys, ModelConstants.CROSSREF_ID) - && !isFromDatasourceID(lKeys, ModelConstants.CROSSREF_ID)) - return 1; - } - if (getEntityType() == EntityType.dataset) { - if (isFromDatasourceID(lKeys, ModelConstants.DATACITE_ID) - && !isFromDatasourceID(rKeys, ModelConstants.DATACITE_ID)) - return -1; - if (isFromDatasourceID(rKeys, ModelConstants.DATACITE_ID) - && !isFromDatasourceID(lKeys, ModelConstants.DATACITE_ID)) - return 1; - } - - if (this.getDate().compareTo(i.getDate()) == 0) {// same date - // we need to take the alphabetically lower id - return this.getOriginalID().compareTo(i.getOriginalID()); - } else - // we need to take the elder date - return this.getDate().compareTo(i.getDate()); - } else { - return new PidComparator<>(getEntity()).compare(toSP(getPidType()), toSP(i.getPidType())); - } - - } - - private StructuredProperty toSP(PidType pidType) { - return OafMapperUtils.structuredProperty("", pidType.toString(), pidType.toString(), "", "", new DataInfo()); - } - - public boolean isFromDatasourceID(Set collectedFrom, String dsId) { - return collectedFrom.contains(dsId); + return IdentifierComparator.compareIdentifiers(this, i); } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java index aa3722ce51..c9cfb8cb28 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java @@ -7,6 +7,7 @@ import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.io.Serializable; +import java.lang.reflect.InvocationTargetException; import java.nio.file.Paths; import java.util.*; import java.util.stream.Collectors; @@ -54,7 +55,7 @@ class EntityMergerTest implements Serializable { } @Test - void softwareMergerTest() throws InstantiationException, IllegalAccessException { + void softwareMergerTest() throws InstantiationException, IllegalAccessException, InvocationTargetException { List> softwares = readSample( testEntityBasePath + "/software_merge.json", Software.class); @@ -69,7 +70,7 @@ class EntityMergerTest implements Serializable { } @Test - void publicationMergerTest() throws InstantiationException, IllegalAccessException { + void publicationMergerTest() throws InstantiationException, IllegalAccessException, InvocationTargetException { Publication pub_merged = DedupRecordFactory .entityMerger(dedupId, publications.iterator(), 0, dataInfo, Publication.class); @@ -134,7 +135,7 @@ class EntityMergerTest implements Serializable { } @Test - void publicationMergerTest2() throws InstantiationException, IllegalAccessException { + void publicationMergerTest2() throws InstantiationException, IllegalAccessException, InvocationTargetException { Publication pub_merged = DedupRecordFactory .entityMerger(dedupId, publications2.iterator(), 0, dataInfo, Publication.class); @@ -146,7 +147,7 @@ class EntityMergerTest implements Serializable { } @Test - void publicationMergerTest3() throws InstantiationException, IllegalAccessException { + void publicationMergerTest3() throws InstantiationException, IllegalAccessException, InvocationTargetException { Publication pub_merged = DedupRecordFactory .entityMerger(dedupId, publications3.iterator(), 0, dataInfo, Publication.class); @@ -156,7 +157,8 @@ class EntityMergerTest implements Serializable { } @Test - void publicationMergerTest4() throws InstantiationException, IllegalStateException, IllegalAccessException { + void publicationMergerTest4() + throws InstantiationException, IllegalStateException, IllegalAccessException, InvocationTargetException { Publication pub_merged = DedupRecordFactory .entityMerger(dedupId, publications4.iterator(), 0, dataInfo, Publication.class); @@ -166,7 +168,8 @@ class EntityMergerTest implements Serializable { } @Test - void publicationMergerTest5() throws InstantiationException, IllegalStateException, IllegalAccessException { + void publicationMergerTest5() + throws InstantiationException, IllegalStateException, IllegalAccessException, InvocationTargetException { System.out .println( diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java index 9c9ec43d52..3de14f577c 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java @@ -4,8 +4,7 @@ package eu.dnetlib.dhp.oa.dedup; import static java.nio.file.Files.createTempDirectory; import static org.apache.spark.sql.functions.count; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.lenient; import java.io.File; @@ -14,7 +13,11 @@ import java.io.IOException; import java.io.Serializable; import java.net.URISyntaxException; import java.nio.file.Paths; +import java.util.HashSet; import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; @@ -35,10 +38,13 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; +import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Sets; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.pace.util.MapDocumentUtil; @@ -105,57 +111,27 @@ public class SparkDedupTest implements Serializable { lenient() .when(isLookUpService.getResourceProfileByQuery(Mockito.contains(testActionSetId))) - .thenReturn( - IOUtils - .toString( - SparkDedupTest.class - .getResourceAsStream( - "/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator.xml"))); + .thenReturn(classPathResourceAsString("/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator.xml")); lenient() .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("organization"))) - .thenReturn( - IOUtils - .toString( - SparkDedupTest.class - .getResourceAsStream( - "/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json"))); + .thenReturn(classPathResourceAsString("/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json")); lenient() .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("publication"))) - .thenReturn( - IOUtils - .toString( - SparkDedupTest.class - .getResourceAsStream( - "/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json"))); + .thenReturn(classPathResourceAsString("/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json")); lenient() .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("software"))) - .thenReturn( - IOUtils - .toString( - SparkDedupTest.class - .getResourceAsStream( - "/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json"))); + .thenReturn(classPathResourceAsString("/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json")); lenient() .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("dataset"))) - .thenReturn( - IOUtils - .toString( - SparkDedupTest.class - .getResourceAsStream( - "/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json"))); + .thenReturn(classPathResourceAsString("/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json")); lenient() .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("otherresearchproduct"))) - .thenReturn( - IOUtils - .toString( - SparkDedupTest.class - .getResourceAsStream( - "/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json"))); + .thenReturn(classPathResourceAsString("/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json")); } @Test @@ -163,11 +139,7 @@ public class SparkDedupTest implements Serializable { void createSimRelsTest() throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkCreateSimRels.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json"))); + classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json")); parser .parseArgument( @@ -207,7 +179,7 @@ public class SparkDedupTest implements Serializable { .count(); assertEquals(3076, orgs_simrel); - assertEquals(7040, pubs_simrel); + assertEquals(7046, pubs_simrel); assertEquals(336, sw_simrel); assertEquals(442, ds_simrel); assertEquals(6784, orp_simrel); @@ -223,11 +195,7 @@ public class SparkDedupTest implements Serializable { void whitelistSimRelsTest() throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkWhitelistSimRels.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/whitelistSimRels_parameters.json"))); + classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/whitelistSimRels_parameters.json")); parser .parseArgument( @@ -264,7 +232,7 @@ public class SparkDedupTest implements Serializable { // entities simrels supposed to be equal to the number of previous step (no rels in whitelist) assertEquals(3076, orgs_simrel); - assertEquals(7040, pubs_simrel); + assertEquals(7046, pubs_simrel); assertEquals(442, ds_simrel); assertEquals(6784, orp_simrel); // System.out.println("orgs_simrel = " + orgs_simrel); @@ -306,11 +274,7 @@ public class SparkDedupTest implements Serializable { void cutMergeRelsTest() throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkCreateMergeRels.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json"))); + classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json")); parser .parseArgument( @@ -402,11 +366,7 @@ public class SparkDedupTest implements Serializable { void createMergeRelsTest() throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkCreateMergeRels.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json"))); + classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json")); parser .parseArgument( @@ -427,10 +387,10 @@ public class SparkDedupTest implements Serializable { .read() .load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel") .count(); - long pubs_mergerel = spark + final Dataset pubs = spark .read() .load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel") - .count(); + .as(Encoders.bean(Relation.class)); long sw_mergerel = spark .read() .load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel") @@ -445,8 +405,35 @@ public class SparkDedupTest implements Serializable { .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel") .count(); + final List merges = pubs + .filter("source == '50|doi_dedup___::d5021b53204e4fdeab6ff5d5bc468032'") + .collectAsList(); + assertEquals(3, merges.size()); + Set dups = Sets + .newHashSet( + "50|doi_________::3b1d0d8e8f930826665df9d6b82fbb73", + "50|doi_________::d5021b53204e4fdeab6ff5d5bc468032", + "50|arXiv_______::c93aeb433eb90ed7a86e29be00791b7c"); + merges.forEach(r -> { + assertEquals(ModelConstants.RESULT_RESULT, r.getRelType()); + assertEquals(ModelConstants.DEDUP, r.getSubRelType()); + assertEquals(ModelConstants.MERGES, r.getRelClass()); + assertTrue(dups.contains(r.getTarget())); + }); + + final List mergedIn = pubs + .filter("target == '50|doi_dedup___::d5021b53204e4fdeab6ff5d5bc468032'") + .collectAsList(); + assertEquals(3, mergedIn.size()); + mergedIn.forEach(r -> { + assertEquals(ModelConstants.RESULT_RESULT, r.getRelType()); + assertEquals(ModelConstants.DEDUP, r.getSubRelType()); + assertEquals(ModelConstants.IS_MERGED_IN, r.getRelClass()); + assertTrue(dups.contains(r.getSource())); + }); + assertEquals(1268, orgs_mergerel); - assertEquals(1444, pubs_mergerel); + assertEquals(1450, pubs.count()); assertEquals(286, sw_mergerel); assertEquals(472, ds_mergerel); assertEquals(738, orp_mergerel); @@ -463,11 +450,7 @@ public class SparkDedupTest implements Serializable { void createDedupRecordTest() throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkCreateDedupRecord.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/createDedupRecord_parameters.json"))); + classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/createDedupRecord_parameters.json")); parser .parseArgument( new String[] { @@ -483,12 +466,18 @@ public class SparkDedupTest implements Serializable { new SparkCreateDedupRecord(parser, spark).run(isLookUpService); + final ObjectMapper mapper = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + final Dataset pubs = spark + .read() + .textFile(testOutputBasePath + "/" + testActionSetId + "/publication_deduprecord") + .map( + (MapFunction) value -> mapper.readValue(value, Publication.class), + Encoders.bean(Publication.class)); long orgs_deduprecord = jsc .textFile(testOutputBasePath + "/" + testActionSetId + "/organization_deduprecord") .count(); - long pubs_deduprecord = jsc - .textFile(testOutputBasePath + "/" + testActionSetId + "/publication_deduprecord") - .count(); long sw_deduprecord = jsc .textFile(testOutputBasePath + "/" + testActionSetId + "/software_deduprecord") .count(); @@ -499,11 +488,13 @@ public class SparkDedupTest implements Serializable { .count(); assertEquals(86, orgs_deduprecord); - assertEquals(67, pubs_deduprecord); + assertEquals(68, pubs.count()); assertEquals(49, sw_deduprecord); assertEquals(97, ds_deduprecord); assertEquals(92, orp_deduprecord); + verifyRoot_1(mapper, pubs); + // System.out.println("orgs_deduprecord = " + orgs_deduprecord); // System.out.println("pubs_deduprecord = " + pubs_deduprecord); // System.out.println("sw_deduprecord = " + sw_deduprecord); @@ -511,16 +502,63 @@ public class SparkDedupTest implements Serializable { // System.out.println("orp_deduprecord = " + orp_deduprecord); } + private static void verifyRoot_1(ObjectMapper mapper, Dataset pubs) { + Publication root = pubs + .filter("id = '50|doi_dedup___::d5021b53204e4fdeab6ff5d5bc468032'") + .first(); + assertNotNull(root); + + final Dataset publication = spark + .read() + .textFile(DedupUtility.createEntityPath(testGraphBasePath, "publication")); + + Publication crossref_duplicate = publication + .map( + (MapFunction) value -> mapper.readValue(value, Publication.class), + Encoders.bean(Publication.class)) + .filter("id = '50|doi_________::d5021b53204e4fdeab6ff5d5bc468032'") + .collectAsList() + .get(0); + + assertEquals(crossref_duplicate.getJournal().getName(), root.getJournal().getName()); + assertEquals(crossref_duplicate.getJournal().getIssnPrinted(), root.getJournal().getIssnPrinted()); + assertEquals(crossref_duplicate.getPublisher().getValue(), root.getPublisher().getValue()); + + Set rootPids = root + .getPid() + .stream() + .map(StructuredProperty::getValue) + .collect(Collectors.toCollection(HashSet::new)); + Set dupPids = crossref_duplicate + .getPid() + .stream() + .map(StructuredProperty::getValue) + .collect(Collectors.toCollection(HashSet::new)); + + assertFalse(Sets.intersection(rootPids, dupPids).isEmpty()); + assertTrue(rootPids.contains("10.1109/jstqe.2022.3205716")); + + Optional instance_cr = root + .getInstance() + .stream() + .filter(i -> i.getCollectedfrom().getValue().equals("Crossref")) + .findFirst(); + assertTrue(instance_cr.isPresent()); + assertEquals("OPEN", instance_cr.get().getAccessright().getClassid()); + assertEquals("Open Access", instance_cr.get().getAccessright().getClassname()); + assertEquals(OpenAccessRoute.hybrid, instance_cr.get().getAccessright().getOpenAccessRoute()); + assertEquals( + "IEEE Journal of Selected Topics in Quantum Electronics", instance_cr.get().getHostedby().getValue()); + assertEquals("0001", instance_cr.get().getInstancetype().getClassid()); + assertEquals("Article", instance_cr.get().getInstancetype().getClassname()); + } + @Test @Order(6) void updateEntityTest() throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkUpdateEntity.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json"))); + classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json")); parser .parseArgument( new String[] { @@ -587,7 +625,7 @@ public class SparkDedupTest implements Serializable { .distinct() .count(); - assertEquals(898, publications); + assertEquals(902, publications); assertEquals(839, organizations); assertEquals(100, projects); assertEquals(100, datasource); @@ -640,11 +678,7 @@ public class SparkDedupTest implements Serializable { void propagateRelationTest() throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkPropagateRelation.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json"))); + classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json")); parser .parseArgument( new String[] { @@ -714,4 +748,12 @@ public class SparkDedupTest implements Serializable { public boolean isDeletedByInference(String s) { return s.contains("\"deletedbyinference\":true"); } + + private static String classPathResourceAsString(String path) throws IOException { + return IOUtils + .toString( + SparkDedupTest.class + .getResourceAsStream(path)); + } + } diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsDedupTest.java index 9312d83b10..88c28ab2fa 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsDedupTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsDedupTest.java @@ -143,7 +143,7 @@ public class SparkOpenorgsDedupTest implements Serializable { .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "organization")) .count(); - assertEquals(288, orgs_simrel); + assertEquals(290, orgs_simrel); } @Test @@ -172,7 +172,7 @@ public class SparkOpenorgsDedupTest implements Serializable { .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "organization")) .count(); - assertEquals(324, orgs_simrel); + assertEquals(326, orgs_simrel); } @Test diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkPublicationRootsTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkPublicationRootsTest.java new file mode 100644 index 0000000000..3cff836eb4 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkPublicationRootsTest.java @@ -0,0 +1,403 @@ + +package eu.dnetlib.dhp.oa.dedup; + +import static java.nio.file.Files.createTempDirectory; + +import static org.apache.spark.sql.functions.count; +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.lenient; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.net.URISyntaxException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.*; +import java.util.stream.Collectors; + +import org.apache.commons.cli.ParseException; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Sets; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; + +@ExtendWith(MockitoExtension.class) +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +public class SparkPublicationRootsTest implements Serializable { + + @Mock(serializable = true) + ISLookUpService isLookUpService; + + private static SparkSession spark; + private static String workingPath; + + private static String graphInputPath; + private static String graphOutputPath; + private static final String testActionSetId = "test-orchestrator"; + + private static Path testBaseTmpPath; + + private static final ObjectMapper MAPPER = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + @BeforeAll + public static void init() throws IOException, URISyntaxException { + + testBaseTmpPath = createTempDirectory(SparkPublicationRootsTest.class.getSimpleName() + "-"); + + final File entitiesSources = Paths + .get(SparkPublicationRootsTest.class.getResource("/eu/dnetlib/dhp/dedup/root").toURI()) + .toFile(); + + FileUtils + .copyDirectory( + entitiesSources, + testBaseTmpPath.resolve("input").toFile()); + + workingPath = testBaseTmpPath.resolve("workingPath").toString(); + graphInputPath = testBaseTmpPath.resolve("input").resolve("entities").toString(); + graphOutputPath = testBaseTmpPath.resolve("output").toString(); + + FileUtils.deleteDirectory(new File(workingPath)); + FileUtils.deleteDirectory(new File(graphOutputPath)); + + final SparkConf conf = new SparkConf(); + conf.set("spark.sql.shuffle.partitions", "10"); + spark = SparkSession + .builder() + .appName(SparkPublicationRootsTest.class.getSimpleName()) + .master("local[*]") + .config(conf) + .getOrCreate(); + } + + @BeforeEach + public void setUp() throws IOException, ISLookUpException { + + lenient() + .when(isLookUpService.getResourceProfileByQuery(Mockito.contains(testActionSetId))) + .thenReturn(classPathResourceAsString("/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator_publication.xml")); + + lenient() + .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("publication"))) + .thenReturn(classPathResourceAsString("/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json")); + } + + @AfterAll + public static void tearDown() throws IOException { + FileUtils.deleteDirectory(testBaseTmpPath.toFile()); + spark.close(); + } + + @Test + @Order(1) + void createSimRelsTest() throws Exception { + new SparkCreateSimRels(args( + "/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json", + new String[] { + "--graphBasePath", graphInputPath, + "--actionSetId", testActionSetId, + "--isLookUpUrl", "lookupurl", + "--workingPath", workingPath, + "--numPartitions", "5" + }), spark) + .run(isLookUpService); + + long pubs_simrel = spark + .read() + .load(DedupUtility.createSimRelPath(workingPath, testActionSetId, "publication")) + .count(); + + assertEquals(74, pubs_simrel); + } + + @Test + @Order(2) + void cutMergeRelsTest() throws Exception { + new SparkCreateMergeRels(args( + "/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json", + new String[] { + "--graphBasePath", graphInputPath, + "--actionSetId", testActionSetId, + "--isLookUpUrl", "lookupurl", + "--workingPath", workingPath, + "--cutConnectedComponent", "3" + }), spark) + .run(isLookUpService); + + long pubs_mergerel = spark + .read() + .load(workingPath + "/" + testActionSetId + "/publication_mergerel") + .as(Encoders.bean(Relation.class)) + .filter((FilterFunction) r -> r.getRelClass().equalsIgnoreCase("merges")) + .groupBy("source") + .agg(count("target").alias("cnt")) + .select("source", "cnt") + .where("cnt > 3") + .count(); + + assertEquals(0, pubs_mergerel); + + FileUtils.deleteDirectory(new File(workingPath + "/" + testActionSetId + "/publication_mergerel")); + } + + @Test + @Order(3) + void createMergeRelsTest() throws Exception { + new SparkCreateMergeRels(args( + "/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json", + new String[] { + "--graphBasePath", graphInputPath, + "--actionSetId", testActionSetId, + "--isLookUpUrl", "lookupurl", + "--workingPath", workingPath + }), spark) + .run(isLookUpService); + + final Dataset merges = spark + .read() + .load(workingPath + "/" + testActionSetId + "/publication_mergerel") + .as(Encoders.bean(Relation.class)); + + final List mergeList = merges + .filter("source == '50|doi_dedup___::d5021b53204e4fdeab6ff5d5bc468032'") + .collectAsList(); + assertEquals(3, mergeList.size()); + Set dups = Sets + .newHashSet( + "50|doi_________::3b1d0d8e8f930826665df9d6b82fbb73", + "50|doi_________::d5021b53204e4fdeab6ff5d5bc468032", + "50|arXiv_______::c93aeb433eb90ed7a86e29be00791b7c"); + mergeList.forEach(r -> { + assertEquals(ModelConstants.RESULT_RESULT, r.getRelType()); + assertEquals(ModelConstants.DEDUP, r.getSubRelType()); + assertEquals(ModelConstants.MERGES, r.getRelClass()); + assertTrue(dups.contains(r.getTarget())); + }); + + final List mergedIn = merges + .filter("target == '50|doi_dedup___::d5021b53204e4fdeab6ff5d5bc468032'") + .collectAsList(); + assertEquals(3, mergedIn.size()); + mergedIn.forEach(r -> { + assertEquals(ModelConstants.RESULT_RESULT, r.getRelType()); + assertEquals(ModelConstants.DEDUP, r.getSubRelType()); + assertEquals(ModelConstants.IS_MERGED_IN, r.getRelClass()); + assertTrue(dups.contains(r.getSource())); + }); + + assertEquals(32, merges.count()); + } + + @Test + @Order(4) + void createDedupRecordTest() throws Exception { + new SparkCreateDedupRecord(args( + "/eu/dnetlib/dhp/oa/dedup/createDedupRecord_parameters.json", + new String[] { + "--graphBasePath", graphInputPath, + "--actionSetId", testActionSetId, + "--isLookUpUrl", "lookupurl", + "--workingPath", workingPath + }), spark) + .run(isLookUpService); + + final Dataset roots = spark + .read() + .textFile(workingPath + "/" + testActionSetId + "/publication_deduprecord") + .map(asEntity(Publication.class), Encoders.bean(Publication.class)); + + assertEquals(3, roots.count()); + + final Dataset pubs = spark + .read() + .textFile(DedupUtility.createEntityPath(graphInputPath, "publication")) + .map(asEntity(Publication.class), Encoders.bean(Publication.class)); + + verifyRoot_case_1(roots, pubs); + verifyRoot_case_2(roots, pubs); + verifyRoot_case_3(roots, pubs); + } + + private static void verifyRoot_case_1(Dataset roots, Dataset pubs) { + Publication root = roots + .filter("id = '50|doi_dedup___::d5021b53204e4fdeab6ff5d5bc468032'") + .first(); + assertNotNull(root); + + Publication crossref_duplicate = pubs + .filter("id = '50|doi_________::d5021b53204e4fdeab6ff5d5bc468032'") + .collectAsList() + .get(0); + + assertEquals(crossref_duplicate.getJournal().getName(), root.getJournal().getName()); + assertEquals(crossref_duplicate.getJournal().getIssnPrinted(), root.getJournal().getIssnPrinted()); + assertEquals(crossref_duplicate.getPublisher().getValue(), root.getPublisher().getValue()); + + Set rootPids = root + .getPid() + .stream() + .map(StructuredProperty::getValue) + .collect(Collectors.toCollection(HashSet::new)); + Set dupPids = crossref_duplicate + .getPid() + .stream() + .map(StructuredProperty::getValue) + .collect(Collectors.toCollection(HashSet::new)); + + assertFalse(Sets.intersection(rootPids, dupPids).isEmpty()); + assertTrue(rootPids.contains("10.1109/jstqe.2022.3205716")); + + Optional instance_cr = root + .getInstance() + .stream() + .filter(i -> i.getCollectedfrom().getValue().equals("Crossref")) + .findFirst(); + assertTrue(instance_cr.isPresent()); + assertEquals("OPEN", instance_cr.get().getAccessright().getClassid()); + assertEquals("Open Access", instance_cr.get().getAccessright().getClassname()); + assertEquals(OpenAccessRoute.hybrid, instance_cr.get().getAccessright().getOpenAccessRoute()); + assertEquals( + "IEEE Journal of Selected Topics in Quantum Electronics", instance_cr.get().getHostedby().getValue()); + assertEquals("0001", instance_cr.get().getInstancetype().getClassid()); + assertEquals("Article", instance_cr.get().getInstancetype().getClassname()); + } + + private void verifyRoot_case_2(Dataset roots, Dataset pubs) { + Publication root = roots + .filter("id = '50|doi_dedup___::18aff3b55fb6876466a5d4bd82434885'") + .first(); + assertNotNull(root); + + Publication crossref_duplicate = pubs + .filter("id = '50|doi_________::18aff3b55fb6876466a5d4bd82434885'") + .first(); + + // System.err.println(new ObjectMapper().writeValueAsString(root)); + + assertEquals(crossref_duplicate.getJournal().getName(), root.getJournal().getName()); + assertEquals(crossref_duplicate.getJournal().getIssnOnline(), root.getJournal().getIssnOnline()); + assertEquals(crossref_duplicate.getJournal().getVol(), root.getJournal().getVol()); + + assertEquals(crossref_duplicate.getPublisher().getValue(), root.getPublisher().getValue()); + + Set dups_cf = pubs + .collectAsList() + .stream() + .flatMap(p -> p.getCollectedfrom().stream()) + .map(KeyValue::getValue) + .collect(Collectors.toCollection(HashSet::new)); + + Set root_cf = root + .getCollectedfrom() + .stream() + .map(KeyValue::getValue) + .collect(Collectors.toCollection(HashSet::new)); + + assertTrue(Sets.difference(root_cf, dups_cf).isEmpty()); + } + + private void verifyRoot_case_3(Dataset roots, Dataset pubs) { + Publication root = roots + .filter("id = '50|dedup_wf_001::31ca734cc22181b704c4aa8fd050062a'") + .first(); + assertNotNull(root); + + Publication pivot_duplicate = pubs + .filter("id = '50|od_______166::31ca734cc22181b704c4aa8fd050062a'") + .first(); + + assertEquals(pivot_duplicate.getPublisher().getValue(), root.getPublisher().getValue()); + + Set dups_cf = pubs + .collectAsList() + .stream() + .flatMap(p -> p.getCollectedfrom().stream()) + .map(KeyValue::getValue) + .collect(Collectors.toCollection(HashSet::new)); + + Set root_cf = root + .getCollectedfrom() + .stream() + .map(KeyValue::getValue) + .collect(Collectors.toCollection(HashSet::new)); + + assertTrue(Sets.difference(root_cf, dups_cf).isEmpty()); + } + + @Test + @Order(6) + void updateEntityTest() throws Exception { + new SparkUpdateEntity(args( + "/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json", + new String[] { + "--graphBasePath", graphInputPath, + "--workingPath", workingPath, + "--dedupGraphPath", graphOutputPath + }), spark) + .run(isLookUpService); + + long publications = spark.read().textFile(graphOutputPath + "/publication").count(); + + long mergedPubs = spark + .read() + .load(workingPath + "/" + testActionSetId + "/publication_mergerel") + .as(Encoders.bean(Relation.class)) + .where("relClass=='merges'") + .map((MapFunction) Relation::getTarget, Encoders.STRING()) + .distinct() + .count(); + + assertEquals(19, publications); // 16 originals + 3 roots + + long deletedPubs = spark + .read() + .textFile(graphOutputPath + "/publication") + .map(asEntity(Publication.class), Encoders.bean(Publication.class)) + .filter("datainfo.deletedbyinference == true") + .map((MapFunction) OafEntity::getId, Encoders.STRING()) + .distinct() + .count(); + + assertEquals(mergedPubs, deletedPubs); + } + + private static String classPathResourceAsString(String path) throws IOException { + return IOUtils + .toString( + SparkPublicationRootsTest.class + .getResourceAsStream(path)); + } + + private static MapFunction asEntity(Class clazz) { + return value -> MAPPER.readValue(value, clazz); + } + + private ArgumentApplicationParser args(String paramSpecs, String[] args) throws IOException, ParseException { + ArgumentApplicationParser parser = new ArgumentApplicationParser(classPathResourceAsString(paramSpecs)); + parser.parseArgument(args); + return parser; + } + +} diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkPublicationRootsTest2.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkPublicationRootsTest2.java new file mode 100644 index 0000000000..9afe1e34b8 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkPublicationRootsTest2.java @@ -0,0 +1,251 @@ + +package eu.dnetlib.dhp.oa.dedup; + +import static java.nio.file.Files.createTempDirectory; + +import static org.apache.spark.sql.functions.count; +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.lenient; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.net.URISyntaxException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.commons.cli.ParseException; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Sets; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; + +@ExtendWith(MockitoExtension.class) +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +public class SparkPublicationRootsTest2 implements Serializable { + + @Mock(serializable = true) + ISLookUpService isLookUpService; + private static SparkSession spark; + + private static String workingPath; + + private static String graphInputPath; + + private static String graphOutputPath; + + private static final String testActionSetId = "test-orchestrator"; + + private static Path testBaseTmpPath; + + private static final ObjectMapper MAPPER = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + @BeforeAll + public static void init() throws IOException, URISyntaxException { + + testBaseTmpPath = createTempDirectory(SparkPublicationRootsTest2.class.getSimpleName() + "-"); + + final File entitiesSources = Paths + .get(SparkPublicationRootsTest2.class.getResource("/eu/dnetlib/dhp/dedup/root").toURI()) + .toFile(); + + FileUtils + .copyDirectory( + entitiesSources, + testBaseTmpPath.resolve("input").toFile()); + + FileUtils + .copyFileToDirectory( + Paths + .get( + SparkPublicationRootsTest2.class + .getResource( + "/eu/dnetlib/dhp/dedup/root/alterations/publication/publication_1.gz") + .toURI()) + .toFile(), + testBaseTmpPath.resolve("input").resolve("entities").resolve("publication").toFile()); + + workingPath = testBaseTmpPath.resolve("workingPath").toString(); + graphInputPath = testBaseTmpPath.resolve("input").resolve("entities").toString(); + graphOutputPath = testBaseTmpPath.resolve("output").toString(); + + final SparkConf conf = new SparkConf(); + conf.set("spark.sql.shuffle.partitions", "10"); + spark = SparkSession + .builder() + .appName(SparkPublicationRootsTest2.class.getSimpleName()) + .master("local[*]") + .config(conf) + .getOrCreate(); + } + + @BeforeEach + public void setUp() throws IOException, ISLookUpException { + + lenient() + .when(isLookUpService.getResourceProfileByQuery(Mockito.contains(testActionSetId))) + .thenReturn(classPathResourceAsString("/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator_publication.xml")); + + lenient() + .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("publication"))) + .thenReturn(classPathResourceAsString("/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json")); + } + + @AfterAll + public static void tearDown() throws IOException { + FileUtils.deleteDirectory(testBaseTmpPath.toFile()); + } + + @Test + @Order(7) + void dedupAlteredDatasetTest() throws Exception { + + new SparkCreateSimRels(args( + "/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json", + new String[] { + "--graphBasePath", graphInputPath, + "--actionSetId", testActionSetId, + "--isLookUpUrl", "lookupurl", + "--workingPath", workingPath, + "--numPartitions", "5" + }), spark) + .run(isLookUpService); + + new SparkCreateMergeRels(args( + "/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json", + new String[] { + "--graphBasePath", graphInputPath, + "--actionSetId", testActionSetId, + "--isLookUpUrl", "lookupurl", + "--workingPath", workingPath + }), spark) + .run(isLookUpService); + + final Dataset merges = spark + .read() + .load(workingPath + "/" + testActionSetId + "/publication_mergerel") + .as(Encoders.bean(Relation.class)); + + assertEquals( + 3, merges + .filter("relclass == 'isMergedIn'") + .map((MapFunction) Relation::getTarget, Encoders.STRING()) + .distinct() + .count()); + assertEquals( + 4, merges + .filter("source == '50|doi_dedup___::b3aec7985136e36827176aaa1dd5082d'") + .count()); + + new SparkCreateDedupRecord(args( + "/eu/dnetlib/dhp/oa/dedup/createDedupRecord_parameters.json", + new String[] { + "--graphBasePath", graphInputPath, + "--actionSetId", testActionSetId, + "--isLookUpUrl", "lookupurl", + "--workingPath", workingPath + }), spark) + .run(isLookUpService); + + final Dataset roots = spark + .read() + .textFile(workingPath + "/" + testActionSetId + "/publication_deduprecord") + .map(asEntity(Publication.class), Encoders.bean(Publication.class)); + + assertEquals(3, roots.count()); + + final Dataset pubs = spark + .read() + .textFile(DedupUtility.createEntityPath(graphInputPath, "publication")) + .map(asEntity(Publication.class), Encoders.bean(Publication.class)); + + Publication root = roots + .filter("id = '50|doi_dedup___::b3aec7985136e36827176aaa1dd5082d'") + .first(); + assertNotNull(root); + + Publication crossref_duplicate = pubs + .filter("id = '50|doi_________::b3aec7985136e36827176aaa1dd5082d'") + .collectAsList() + .get(0); + + assertEquals(crossref_duplicate.getDateofacceptance().getValue(), root.getDateofacceptance().getValue()); + assertEquals(crossref_duplicate.getJournal().getName(), root.getJournal().getName()); + assertEquals(crossref_duplicate.getJournal().getIssnPrinted(), root.getJournal().getIssnPrinted()); + assertEquals(crossref_duplicate.getPublisher().getValue(), root.getPublisher().getValue()); + + Set rootPids = root + .getPid() + .stream() + .map(StructuredProperty::getValue) + .collect(Collectors.toCollection(HashSet::new)); + Set dupPids = crossref_duplicate + .getPid() + .stream() + .map(StructuredProperty::getValue) + .collect(Collectors.toCollection(HashSet::new)); + + assertFalse(Sets.intersection(rootPids, dupPids).isEmpty()); + assertTrue(rootPids.contains("10.1109/jstqe.2022.3205716")); + assertTrue(rootPids.contains("10.1109/jstqe.2023.9999999")); + + Optional instance_cr = root + .getInstance() + .stream() + .filter(i -> i.getCollectedfrom().getValue().equals("Crossref")) + .findFirst(); + assertTrue(instance_cr.isPresent()); + assertEquals("OPEN", instance_cr.get().getAccessright().getClassid()); + assertEquals("Open Access", instance_cr.get().getAccessright().getClassname()); + assertEquals(OpenAccessRoute.hybrid, instance_cr.get().getAccessright().getOpenAccessRoute()); + assertEquals( + "IEEE Journal of Selected Topics in Quantum Electronics", instance_cr.get().getHostedby().getValue()); + assertEquals("0001", instance_cr.get().getInstancetype().getClassid()); + assertEquals("Article", instance_cr.get().getInstancetype().getClassname()); + + } + + private static String classPathResourceAsString(String path) throws IOException { + return IOUtils + .toString( + SparkPublicationRootsTest2.class + .getResourceAsStream(path)); + } + + private static MapFunction asEntity(Class clazz) { + return value -> MAPPER.readValue(value, clazz); + } + + private ArgumentApplicationParser args(String paramSpecs, String[] args) throws IOException, ParseException { + ArgumentApplicationParser parser = new ArgumentApplicationParser(classPathResourceAsString(paramSpecs)); + parser.parseArgument(args); + return parser; + } + +} diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkStatsTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkStatsTest.java index 1ba2c717ce..b33b627e75 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkStatsTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkStatsTest.java @@ -168,11 +168,11 @@ public class SparkStatsTest implements Serializable { .textFile(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_blockstats") .count(); - assertEquals(477, orgs_blocks); + assertEquals(480, orgs_blocks); assertEquals(295, pubs_blocks); assertEquals(122, sw_blocks); assertEquals(191, ds_blocks); - assertEquals(171, orp_blocks); + assertEquals(178, orp_blocks); } @AfterAll diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/entities/publication/publication.gz b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/entities/publication/publication.gz index cf513d7302..a7457063bc 100644 Binary files a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/entities/publication/publication.gz and b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/entities/publication/publication.gz differ diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator_publication.xml b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator_publication.xml new file mode 100644 index 0000000000..b47d99b929 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator_publication.xml @@ -0,0 +1,24 @@ + +
+ + + + + +
+ + + + + + + + + + + + + + SECURITY_PARAMETERS + +
\ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/root/alterations/publication/publication_1.gz b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/root/alterations/publication/publication_1.gz new file mode 100644 index 0000000000..89ed9671e1 Binary files /dev/null and b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/root/alterations/publication/publication_1.gz differ diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/root/entities/publication/publication_0.gz b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/root/entities/publication/publication_0.gz new file mode 100644 index 0000000000..85706bc31f Binary files /dev/null and b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/root/entities/publication/publication_0.gz differ diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/log4j.properties b/dhp-workflows/dhp-dedup-openaire/src/test/resources/log4j.properties new file mode 100644 index 0000000000..d3e717dfa2 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/test/resources/log4j.properties @@ -0,0 +1,47 @@ +# Root logger option +log4j.rootLogger=DEBUG, stdout + +# Direct log messages to stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n + +# Change this to set Spark log level +log4j.logger.org.apache.spark=ERROR +log4j.rootCategory=WARN + +# Silence akka remoting +log4j.logger.Remoting=WARN + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.eclipse.jetty=WARN + +log4j.logger.org.apache.hadoop.mapreduce.lib.output.FileOutputCommitterFactory=WARN +log4j.logger.org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter=WARN +#log4j.logger.org.apache.parquet.hadoop.ParquetOutputFormat=WARN +#log4j.logger.org.apache.parquet.hadoop.InternalParquetRecordWriter=WARN +log4j.logger.org.apache.hadoop.io.compress.CodecPool=WARN +#log4j.logger.org.apache.hadoop.io.compress=WARN +#log4j.logger.org.apache.parquet.hadoop.codec.CodecConfig=WARN +log4j.logger.parquet.hadoop.ColumnChunkPageWriteStore=ERROR +log4j.logger.com.jayway.jsonpath.internal.path.CompiledPath=WARN +log4j.logger.org.apache.parquet.hadoop.ParquetRecordReader=ERROR +log4j.logger.parquet.hadoop=WARN +log4j.logger.org.eclipse.jetty.server.handler.ContextHandlerCollection=WARN +log4j.logger.org.spark_project.jetty.util.component.ContainerLifeCycle=WARN +log4j.logger.org.apache.hadoop.mapred.FileInputFormat=WARN +log4j.logger.org.spark_project.jetty.servlet.ServletHandler=WARN +log4j.logger.org.apache.commons.beanutils.converters.BooleanConverter=WARN +log4j.logger.org.apache.commons.beanutils.converters.StringConverter=WARN +log4j.logger.org.apache.commons.beanutils.converters.LongConverter=WARN +log4j.logger.org.apache.commons.beanutils.converters.ArrayConverter=WARN +log4j.logger.org.apache.commons.beanutils.converters.FloatConverter=WARN +log4j.logger.org.apache.commons.beanutils.converters.IntegerConverter=WARN +log4j.logger.org.apache.commons.beanutils.converters.DoubleConverter=WARN +log4j.logger.org.apache.commons.beanutils.converters.CharacterConverter=WARN +log4j.logger.org.apache.commons.beanutils.converters.ByteConverter=WARN +log4j.logger.org.apache.commons.beanutils.converters.BigIntegerConverter=WARN +log4j.logger.org.apache.commons.beanutils.converters.BigDecimalConverter=WARN +log4j.logger.org.apache.commons.beanutils.converters.ShortConverter=WARN +log4j.logger.org.apache.commons.beanutils.BeanUtils=WARN