diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java index c3b6751ba..f9fc8a21a 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java @@ -1,12 +1,12 @@ package eu.dnetlib.dhp.oa.dedup; -import java.util.Collection; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; +import java.lang.reflect.InvocationTargetException; +import java.util.*; import java.util.stream.Collectors; +import org.apache.commons.beanutils.BeanUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.sql.Dataset; @@ -15,14 +15,12 @@ import org.apache.spark.sql.SparkSession; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import eu.dnetlib.dhp.oa.dedup.model.Identifier; import eu.dnetlib.dhp.oa.merge.AuthorMerger; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; -import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; import scala.Tuple2; public class DedupRecordFactory { @@ -79,23 +77,25 @@ public class DedupRecordFactory { public static T entityMerger( String id, Iterator> entities, long ts, DataInfo dataInfo, Class clazz) - throws IllegalAccessException, InstantiationException { + throws IllegalAccessException, InstantiationException, InvocationTargetException { - T entity = clazz.newInstance(); - entity.setDataInfo(dataInfo); + final Comparator> idComparator = new IdentifierComparator<>(); - final Collection dates = Lists.newArrayList(); - final List> authors = Lists.newArrayList(); - - final Comparator> idComparator = new IdentifierComparator().reversed(); - - final List entityList = Lists + final LinkedList entityList = Lists .newArrayList(entities) .stream() .map(t -> Identifier.newInstance(t._2())) .sorted(idComparator) .map(Identifier::getEntity) - .collect(Collectors.toList()); + .collect(Collectors.toCollection(LinkedList::new)); + + final T entity = clazz.newInstance(); + final T first = entityList.removeFirst(); + + BeanUtils.copyProperties(entity, first); + + final Collection dates = Lists.newArrayList(); + final List> authors = Lists.newArrayList(); entityList .forEach( @@ -103,12 +103,11 @@ public class DedupRecordFactory { entity.mergeFrom(duplicate); if (ModelSupport.isSubClass(duplicate, Result.class)) { Result r1 = (Result) duplicate; - if (r1.getAuthor() != null && !r1.getAuthor().isEmpty()) + if (r1.getAuthor() != null && StringUtils.isNotBlank(r1.getDateofacceptance().getValue())) authors.add(r1.getAuthor()); if (r1.getDateofacceptance() != null) dates.add(r1.getDateofacceptance().getValue()); } - }); // set authors and date diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java index aa3722ce5..c9cfb8cb2 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java @@ -7,6 +7,7 @@ import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.io.Serializable; +import java.lang.reflect.InvocationTargetException; import java.nio.file.Paths; import java.util.*; import java.util.stream.Collectors; @@ -54,7 +55,7 @@ class EntityMergerTest implements Serializable { } @Test - void softwareMergerTest() throws InstantiationException, IllegalAccessException { + void softwareMergerTest() throws InstantiationException, IllegalAccessException, InvocationTargetException { List> softwares = readSample( testEntityBasePath + "/software_merge.json", Software.class); @@ -69,7 +70,7 @@ class EntityMergerTest implements Serializable { } @Test - void publicationMergerTest() throws InstantiationException, IllegalAccessException { + void publicationMergerTest() throws InstantiationException, IllegalAccessException, InvocationTargetException { Publication pub_merged = DedupRecordFactory .entityMerger(dedupId, publications.iterator(), 0, dataInfo, Publication.class); @@ -134,7 +135,7 @@ class EntityMergerTest implements Serializable { } @Test - void publicationMergerTest2() throws InstantiationException, IllegalAccessException { + void publicationMergerTest2() throws InstantiationException, IllegalAccessException, InvocationTargetException { Publication pub_merged = DedupRecordFactory .entityMerger(dedupId, publications2.iterator(), 0, dataInfo, Publication.class); @@ -146,7 +147,7 @@ class EntityMergerTest implements Serializable { } @Test - void publicationMergerTest3() throws InstantiationException, IllegalAccessException { + void publicationMergerTest3() throws InstantiationException, IllegalAccessException, InvocationTargetException { Publication pub_merged = DedupRecordFactory .entityMerger(dedupId, publications3.iterator(), 0, dataInfo, Publication.class); @@ -156,7 +157,8 @@ class EntityMergerTest implements Serializable { } @Test - void publicationMergerTest4() throws InstantiationException, IllegalStateException, IllegalAccessException { + void publicationMergerTest4() + throws InstantiationException, IllegalStateException, IllegalAccessException, InvocationTargetException { Publication pub_merged = DedupRecordFactory .entityMerger(dedupId, publications4.iterator(), 0, dataInfo, Publication.class); @@ -166,7 +168,8 @@ class EntityMergerTest implements Serializable { } @Test - void publicationMergerTest5() throws InstantiationException, IllegalStateException, IllegalAccessException { + void publicationMergerTest5() + throws InstantiationException, IllegalStateException, IllegalAccessException, InvocationTargetException { System.out .println( diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupPublicationTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupPublicationTest.java new file mode 100644 index 000000000..c657d1865 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupPublicationTest.java @@ -0,0 +1,389 @@ + +package eu.dnetlib.dhp.oa.dedup; + +import static java.nio.file.Files.createTempDirectory; + +import static org.apache.spark.sql.functions.count; +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.lenient; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.net.URISyntaxException; +import java.nio.file.Paths; +import java.util.*; +import java.util.stream.Collectors; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Sets; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; + +@ExtendWith(MockitoExtension.class) +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +public class SparkDedupPublicationTest implements Serializable { + + @Mock(serializable = true) + ISLookUpService isLookUpService; + + private static SparkSession spark; + private static JavaSparkContext jsc; + + private static String testGraphBasePath; + private static String testOutputBasePath; + private static String testDedupGraphBasePath; + private static final String testActionSetId = "test-orchestrator"; + + @BeforeAll + public static void cleanUp() throws IOException, URISyntaxException { + + testGraphBasePath = Paths + .get(SparkDedupPublicationTest.class.getResource("/eu/dnetlib/dhp/dedup/entities2").toURI()) + .toFile() + .getAbsolutePath(); + testOutputBasePath = createTempDirectory(SparkDedupPublicationTest.class.getSimpleName() + "-") + .toAbsolutePath() + .toString(); + + testDedupGraphBasePath = createTempDirectory(SparkDedupPublicationTest.class.getSimpleName() + "-") + .toAbsolutePath() + .toString(); + + FileUtils.deleteDirectory(new File(testOutputBasePath)); + FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); + + final SparkConf conf = new SparkConf(); + conf.set("spark.sql.shuffle.partitions", "10"); + spark = SparkSession + .builder() + .appName(SparkDedupPublicationTest.class.getSimpleName()) + .master("local[*]") + .config(conf) + .getOrCreate(); + + jsc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + } + + @BeforeEach + public void setUp() throws IOException, ISLookUpException { + + lenient() + .when(isLookUpService.getResourceProfileByQuery(Mockito.contains(testActionSetId))) + .thenReturn(classPathResourceAsString("/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator_publication.xml")); + + lenient() + .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("publication"))) + .thenReturn(classPathResourceAsString("/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json")); + } + + @Test + @Order(1) + void createSimRelsTest() throws Exception { + + ArgumentApplicationParser parser = new ArgumentApplicationParser( + classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json")); + + parser + .parseArgument( + new String[] { + "--graphBasePath", testGraphBasePath, + "--actionSetId", testActionSetId, + "--isLookUpUrl", "lookupurl", + "--workingPath", testOutputBasePath, + "--numPartitions", "5" + }); + + new SparkCreateSimRels(parser, spark).run(isLookUpService); + + long pubs_simrel = spark + .read() + .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "publication")) + .count(); + + assertEquals(62, pubs_simrel); + } + + @Test + @Order(2) + void cutMergeRelsTest() throws Exception { + + ArgumentApplicationParser parser = new ArgumentApplicationParser( + classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json")); + + parser + .parseArgument( + new String[] { + "--graphBasePath", testGraphBasePath, + "--actionSetId", testActionSetId, + "--isLookUpUrl", "lookupurl", + "--workingPath", testOutputBasePath, + "--cutConnectedComponent", "3" + }); + + new SparkCreateMergeRels(parser, spark).run(isLookUpService); + + long pubs_mergerel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel") + .as(Encoders.bean(Relation.class)) + .filter((FilterFunction) r -> r.getRelClass().equalsIgnoreCase("merges")) + .groupBy("source") + .agg(count("target").alias("cnt")) + .select("source", "cnt") + .where("cnt > 3") + .count(); + + assertEquals(0, pubs_mergerel); + + FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel")); + } + + @Test + @Order(3) + void createMergeRelsTest() throws Exception { + + ArgumentApplicationParser parser = new ArgumentApplicationParser( + classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json")); + + parser + .parseArgument( + new String[] { + "--graphBasePath", testGraphBasePath, + "--actionSetId", testActionSetId, + "--isLookUpUrl", "lookupurl", + "--workingPath", testOutputBasePath + }); + + new SparkCreateMergeRels(parser, spark).run(isLookUpService); + + final Dataset pubs = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel") + .as(Encoders.bean(Relation.class)); + + final List merges = pubs + .filter("source == '50|doi_dedup___::d5021b53204e4fdeab6ff5d5bc468032'") + .collectAsList(); + assertEquals(3, merges.size()); + Set dups = Sets + .newHashSet( + "50|doi_________::3b1d0d8e8f930826665df9d6b82fbb73", + "50|doi_________::d5021b53204e4fdeab6ff5d5bc468032", + "50|arXiv_______::c93aeb433eb90ed7a86e29be00791b7c"); + merges.forEach(r -> { + assertEquals(ModelConstants.RESULT_RESULT, r.getRelType()); + assertEquals(ModelConstants.DEDUP, r.getSubRelType()); + assertEquals(ModelConstants.MERGES, r.getRelClass()); + assertTrue(dups.contains(r.getTarget())); + }); + + final List mergedIn = pubs + .filter("target == '50|doi_dedup___::d5021b53204e4fdeab6ff5d5bc468032'") + .collectAsList(); + assertEquals(3, mergedIn.size()); + mergedIn.forEach(r -> { + assertEquals(ModelConstants.RESULT_RESULT, r.getRelType()); + assertEquals(ModelConstants.DEDUP, r.getSubRelType()); + assertEquals(ModelConstants.IS_MERGED_IN, r.getRelClass()); + assertTrue(dups.contains(r.getSource())); + }); + + assertEquals(24, pubs.count()); + } + + @Test + @Order(4) + void createDedupRecordTest() throws Exception { + + ArgumentApplicationParser parser = new ArgumentApplicationParser( + classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/createDedupRecord_parameters.json")); + parser + .parseArgument( + new String[] { + "--graphBasePath", testGraphBasePath, + "--actionSetId", testActionSetId, + "--isLookUpUrl", "lookupurl", + "--workingPath", testOutputBasePath + }); + + new SparkCreateDedupRecord(parser, spark).run(isLookUpService); + + final ObjectMapper mapper = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + final Dataset roots = spark + .read() + .textFile(testOutputBasePath + "/" + testActionSetId + "/publication_deduprecord") + .map( + (MapFunction) value -> mapper.readValue(value, Publication.class), + Encoders.bean(Publication.class)); + + assertEquals(2, roots.count()); + + final Dataset pubs = spark + .read() + .textFile(DedupUtility.createEntityPath(testGraphBasePath, "publication")) + .map( + (MapFunction) value -> mapper.readValue(value, Publication.class), + Encoders.bean(Publication.class)); + + verifyRoot_case_1(roots, pubs); + verifyRoot_case_2(roots, pubs); + } + + private static void verifyRoot_case_1(Dataset roots, Dataset pubs) { + Publication root = roots + .filter("id = '50|doi_dedup___::d5021b53204e4fdeab6ff5d5bc468032'") + .first(); + assertNotNull(root); + + Publication crossref_duplicate = pubs + .filter("id = '50|doi_________::d5021b53204e4fdeab6ff5d5bc468032'") + .collectAsList() + .get(0); + + assertEquals(crossref_duplicate.getJournal().getName(), root.getJournal().getName()); + assertEquals(crossref_duplicate.getJournal().getIssnPrinted(), root.getJournal().getIssnPrinted()); + assertEquals(crossref_duplicate.getPublisher().getValue(), root.getPublisher().getValue()); + + Set rootPids = root + .getPid() + .stream() + .map(StructuredProperty::getValue) + .collect(Collectors.toCollection(HashSet::new)); + Set dupPids = crossref_duplicate + .getPid() + .stream() + .map(StructuredProperty::getValue) + .collect(Collectors.toCollection(HashSet::new)); + + assertFalse(Sets.intersection(rootPids, dupPids).isEmpty()); + assertTrue(rootPids.contains("10.1109/jstqe.2022.3205716")); + + Optional instance_cr = root + .getInstance() + .stream() + .filter(i -> i.getCollectedfrom().getValue().equals("Crossref")) + .findFirst(); + assertTrue(instance_cr.isPresent()); + assertEquals("OPEN", instance_cr.get().getAccessright().getClassid()); + assertEquals("Open Access", instance_cr.get().getAccessright().getClassname()); + assertEquals(OpenAccessRoute.hybrid, instance_cr.get().getAccessright().getOpenAccessRoute()); + assertEquals( + "IEEE Journal of Selected Topics in Quantum Electronics", instance_cr.get().getHostedby().getValue()); + assertEquals("0001", instance_cr.get().getInstancetype().getClassid()); + assertEquals("Article", instance_cr.get().getInstancetype().getClassname()); + } + + private void verifyRoot_case_2(Dataset roots, Dataset pubs) throws JsonProcessingException { + Publication root = roots + .filter("id = '50|doi_dedup___::18aff3b55fb6876466a5d4bd82434885'") + .first(); + assertNotNull(root); + + Publication crossref_duplicate = pubs + .filter("id = '50|doi_________::18aff3b55fb6876466a5d4bd82434885'") + .first(); + + //System.err.println(new ObjectMapper().writeValueAsString(root)); + + assertEquals(crossref_duplicate.getJournal().getName(), root.getJournal().getName()); + assertEquals(crossref_duplicate.getJournal().getIssnOnline(), root.getJournal().getIssnOnline()); + assertEquals(crossref_duplicate.getJournal().getVol(), root.getJournal().getVol()); + + assertEquals(crossref_duplicate.getPublisher().getValue(), root.getPublisher().getValue()); + + Set dups_cf = pubs + .collectAsList() + .stream() + .flatMap(p -> p.getCollectedfrom().stream()) + .map(KeyValue::getValue) + .collect(Collectors.toCollection(HashSet::new)); + + Set root_cf = root + .getCollectedfrom() + .stream() + .map(KeyValue::getValue) + .collect(Collectors.toCollection(HashSet::new)); + + assertTrue(Sets.difference(root_cf, dups_cf).isEmpty()); + } + + @Test + @Order(6) + void updateEntityTest() throws Exception { + + ArgumentApplicationParser parser = new ArgumentApplicationParser( + classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json")); + parser + .parseArgument( + new String[] { + "-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath + }); + + new SparkUpdateEntity(parser, spark).run(isLookUpService); + + long publications = jsc.textFile(testDedupGraphBasePath + "/publication").count(); + + long mergedPubs = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel") + .as(Encoders.bean(Relation.class)) + .where("relClass=='merges'") + .javaRDD() + .map(Relation::getTarget) + .distinct() + .count(); + + assertEquals(14, publications); + + long deletedPubs = jsc + .textFile(testDedupGraphBasePath + "/publication") + .filter(this::isDeletedByInference) + .count(); + + assertEquals(mergedPubs, deletedPubs); + } + + @AfterAll + public static void finalCleanUp() throws IOException { + FileUtils.deleteDirectory(new File(testOutputBasePath)); + FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); + } + + public boolean isDeletedByInference(String s) { + return s.contains("\"deletedbyinference\":true"); + } + + private static String classPathResourceAsString(String path) throws IOException { + return IOUtils + .toString( + SparkDedupPublicationTest.class + .getResourceAsStream(path)); + } + +} diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java index 9c9ec43d5..3de14f577 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java @@ -4,8 +4,7 @@ package eu.dnetlib.dhp.oa.dedup; import static java.nio.file.Files.createTempDirectory; import static org.apache.spark.sql.functions.count; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.lenient; import java.io.File; @@ -14,7 +13,11 @@ import java.io.IOException; import java.io.Serializable; import java.net.URISyntaxException; import java.nio.file.Paths; +import java.util.HashSet; import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; @@ -35,10 +38,13 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; +import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Sets; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.pace.util.MapDocumentUtil; @@ -105,57 +111,27 @@ public class SparkDedupTest implements Serializable { lenient() .when(isLookUpService.getResourceProfileByQuery(Mockito.contains(testActionSetId))) - .thenReturn( - IOUtils - .toString( - SparkDedupTest.class - .getResourceAsStream( - "/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator.xml"))); + .thenReturn(classPathResourceAsString("/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator.xml")); lenient() .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("organization"))) - .thenReturn( - IOUtils - .toString( - SparkDedupTest.class - .getResourceAsStream( - "/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json"))); + .thenReturn(classPathResourceAsString("/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json")); lenient() .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("publication"))) - .thenReturn( - IOUtils - .toString( - SparkDedupTest.class - .getResourceAsStream( - "/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json"))); + .thenReturn(classPathResourceAsString("/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json")); lenient() .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("software"))) - .thenReturn( - IOUtils - .toString( - SparkDedupTest.class - .getResourceAsStream( - "/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json"))); + .thenReturn(classPathResourceAsString("/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json")); lenient() .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("dataset"))) - .thenReturn( - IOUtils - .toString( - SparkDedupTest.class - .getResourceAsStream( - "/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json"))); + .thenReturn(classPathResourceAsString("/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json")); lenient() .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("otherresearchproduct"))) - .thenReturn( - IOUtils - .toString( - SparkDedupTest.class - .getResourceAsStream( - "/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json"))); + .thenReturn(classPathResourceAsString("/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json")); } @Test @@ -163,11 +139,7 @@ public class SparkDedupTest implements Serializable { void createSimRelsTest() throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkCreateSimRels.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json"))); + classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json")); parser .parseArgument( @@ -207,7 +179,7 @@ public class SparkDedupTest implements Serializable { .count(); assertEquals(3076, orgs_simrel); - assertEquals(7040, pubs_simrel); + assertEquals(7046, pubs_simrel); assertEquals(336, sw_simrel); assertEquals(442, ds_simrel); assertEquals(6784, orp_simrel); @@ -223,11 +195,7 @@ public class SparkDedupTest implements Serializable { void whitelistSimRelsTest() throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkWhitelistSimRels.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/whitelistSimRels_parameters.json"))); + classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/whitelistSimRels_parameters.json")); parser .parseArgument( @@ -264,7 +232,7 @@ public class SparkDedupTest implements Serializable { // entities simrels supposed to be equal to the number of previous step (no rels in whitelist) assertEquals(3076, orgs_simrel); - assertEquals(7040, pubs_simrel); + assertEquals(7046, pubs_simrel); assertEquals(442, ds_simrel); assertEquals(6784, orp_simrel); // System.out.println("orgs_simrel = " + orgs_simrel); @@ -306,11 +274,7 @@ public class SparkDedupTest implements Serializable { void cutMergeRelsTest() throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkCreateMergeRels.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json"))); + classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json")); parser .parseArgument( @@ -402,11 +366,7 @@ public class SparkDedupTest implements Serializable { void createMergeRelsTest() throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkCreateMergeRels.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json"))); + classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json")); parser .parseArgument( @@ -427,10 +387,10 @@ public class SparkDedupTest implements Serializable { .read() .load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel") .count(); - long pubs_mergerel = spark + final Dataset pubs = spark .read() .load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel") - .count(); + .as(Encoders.bean(Relation.class)); long sw_mergerel = spark .read() .load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel") @@ -445,8 +405,35 @@ public class SparkDedupTest implements Serializable { .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel") .count(); + final List merges = pubs + .filter("source == '50|doi_dedup___::d5021b53204e4fdeab6ff5d5bc468032'") + .collectAsList(); + assertEquals(3, merges.size()); + Set dups = Sets + .newHashSet( + "50|doi_________::3b1d0d8e8f930826665df9d6b82fbb73", + "50|doi_________::d5021b53204e4fdeab6ff5d5bc468032", + "50|arXiv_______::c93aeb433eb90ed7a86e29be00791b7c"); + merges.forEach(r -> { + assertEquals(ModelConstants.RESULT_RESULT, r.getRelType()); + assertEquals(ModelConstants.DEDUP, r.getSubRelType()); + assertEquals(ModelConstants.MERGES, r.getRelClass()); + assertTrue(dups.contains(r.getTarget())); + }); + + final List mergedIn = pubs + .filter("target == '50|doi_dedup___::d5021b53204e4fdeab6ff5d5bc468032'") + .collectAsList(); + assertEquals(3, mergedIn.size()); + mergedIn.forEach(r -> { + assertEquals(ModelConstants.RESULT_RESULT, r.getRelType()); + assertEquals(ModelConstants.DEDUP, r.getSubRelType()); + assertEquals(ModelConstants.IS_MERGED_IN, r.getRelClass()); + assertTrue(dups.contains(r.getSource())); + }); + assertEquals(1268, orgs_mergerel); - assertEquals(1444, pubs_mergerel); + assertEquals(1450, pubs.count()); assertEquals(286, sw_mergerel); assertEquals(472, ds_mergerel); assertEquals(738, orp_mergerel); @@ -463,11 +450,7 @@ public class SparkDedupTest implements Serializable { void createDedupRecordTest() throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkCreateDedupRecord.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/createDedupRecord_parameters.json"))); + classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/createDedupRecord_parameters.json")); parser .parseArgument( new String[] { @@ -483,12 +466,18 @@ public class SparkDedupTest implements Serializable { new SparkCreateDedupRecord(parser, spark).run(isLookUpService); + final ObjectMapper mapper = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + final Dataset pubs = spark + .read() + .textFile(testOutputBasePath + "/" + testActionSetId + "/publication_deduprecord") + .map( + (MapFunction) value -> mapper.readValue(value, Publication.class), + Encoders.bean(Publication.class)); long orgs_deduprecord = jsc .textFile(testOutputBasePath + "/" + testActionSetId + "/organization_deduprecord") .count(); - long pubs_deduprecord = jsc - .textFile(testOutputBasePath + "/" + testActionSetId + "/publication_deduprecord") - .count(); long sw_deduprecord = jsc .textFile(testOutputBasePath + "/" + testActionSetId + "/software_deduprecord") .count(); @@ -499,11 +488,13 @@ public class SparkDedupTest implements Serializable { .count(); assertEquals(86, orgs_deduprecord); - assertEquals(67, pubs_deduprecord); + assertEquals(68, pubs.count()); assertEquals(49, sw_deduprecord); assertEquals(97, ds_deduprecord); assertEquals(92, orp_deduprecord); + verifyRoot_1(mapper, pubs); + // System.out.println("orgs_deduprecord = " + orgs_deduprecord); // System.out.println("pubs_deduprecord = " + pubs_deduprecord); // System.out.println("sw_deduprecord = " + sw_deduprecord); @@ -511,16 +502,63 @@ public class SparkDedupTest implements Serializable { // System.out.println("orp_deduprecord = " + orp_deduprecord); } + private static void verifyRoot_1(ObjectMapper mapper, Dataset pubs) { + Publication root = pubs + .filter("id = '50|doi_dedup___::d5021b53204e4fdeab6ff5d5bc468032'") + .first(); + assertNotNull(root); + + final Dataset publication = spark + .read() + .textFile(DedupUtility.createEntityPath(testGraphBasePath, "publication")); + + Publication crossref_duplicate = publication + .map( + (MapFunction) value -> mapper.readValue(value, Publication.class), + Encoders.bean(Publication.class)) + .filter("id = '50|doi_________::d5021b53204e4fdeab6ff5d5bc468032'") + .collectAsList() + .get(0); + + assertEquals(crossref_duplicate.getJournal().getName(), root.getJournal().getName()); + assertEquals(crossref_duplicate.getJournal().getIssnPrinted(), root.getJournal().getIssnPrinted()); + assertEquals(crossref_duplicate.getPublisher().getValue(), root.getPublisher().getValue()); + + Set rootPids = root + .getPid() + .stream() + .map(StructuredProperty::getValue) + .collect(Collectors.toCollection(HashSet::new)); + Set dupPids = crossref_duplicate + .getPid() + .stream() + .map(StructuredProperty::getValue) + .collect(Collectors.toCollection(HashSet::new)); + + assertFalse(Sets.intersection(rootPids, dupPids).isEmpty()); + assertTrue(rootPids.contains("10.1109/jstqe.2022.3205716")); + + Optional instance_cr = root + .getInstance() + .stream() + .filter(i -> i.getCollectedfrom().getValue().equals("Crossref")) + .findFirst(); + assertTrue(instance_cr.isPresent()); + assertEquals("OPEN", instance_cr.get().getAccessright().getClassid()); + assertEquals("Open Access", instance_cr.get().getAccessright().getClassname()); + assertEquals(OpenAccessRoute.hybrid, instance_cr.get().getAccessright().getOpenAccessRoute()); + assertEquals( + "IEEE Journal of Selected Topics in Quantum Electronics", instance_cr.get().getHostedby().getValue()); + assertEquals("0001", instance_cr.get().getInstancetype().getClassid()); + assertEquals("Article", instance_cr.get().getInstancetype().getClassname()); + } + @Test @Order(6) void updateEntityTest() throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkUpdateEntity.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json"))); + classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json")); parser .parseArgument( new String[] { @@ -587,7 +625,7 @@ public class SparkDedupTest implements Serializable { .distinct() .count(); - assertEquals(898, publications); + assertEquals(902, publications); assertEquals(839, organizations); assertEquals(100, projects); assertEquals(100, datasource); @@ -640,11 +678,7 @@ public class SparkDedupTest implements Serializable { void propagateRelationTest() throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkPropagateRelation.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json"))); + classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json")); parser .parseArgument( new String[] { @@ -714,4 +748,12 @@ public class SparkDedupTest implements Serializable { public boolean isDeletedByInference(String s) { return s.contains("\"deletedbyinference\":true"); } + + private static String classPathResourceAsString(String path) throws IOException { + return IOUtils + .toString( + SparkDedupTest.class + .getResourceAsStream(path)); + } + } diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/entities/publication/publication.gz b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/entities/publication/publication.gz index cf513d730..a7457063b 100644 Binary files a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/entities/publication/publication.gz and b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/entities/publication/publication.gz differ diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/entities2/publication/publication.gz b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/entities2/publication/publication.gz new file mode 100644 index 000000000..96dd21817 Binary files /dev/null and b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/entities2/publication/publication.gz differ diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator_publication.xml b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator_publication.xml new file mode 100644 index 000000000..b47d99b92 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator_publication.xml @@ -0,0 +1,24 @@ + +
+ + + + + +
+ + + + + + + + + + + + + + SECURITY_PARAMETERS + +
\ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/log4j.properties b/dhp-workflows/dhp-dedup-openaire/src/test/resources/log4j.properties index ce37270c6..d3e717dfa 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/resources/log4j.properties +++ b/dhp-workflows/dhp-dedup-openaire/src/test/resources/log4j.properties @@ -19,7 +19,29 @@ log4j.logger.org.eclipse.jetty=WARN log4j.logger.org.apache.hadoop.mapreduce.lib.output.FileOutputCommitterFactory=WARN log4j.logger.org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter=WARN -log4j.logger.org.apache.parquet.hadoop.ParquetOutputFormat=WARN -log4j.logger.org.apache.parquet.hadoop.InternalParquetRecordWriter=WARN +#log4j.logger.org.apache.parquet.hadoop.ParquetOutputFormat=WARN +#log4j.logger.org.apache.parquet.hadoop.InternalParquetRecordWriter=WARN log4j.logger.org.apache.hadoop.io.compress.CodecPool=WARN -log4j.logger.org.apache.parquet.hadoop.codec.CodecConfig=WARN \ No newline at end of file +#log4j.logger.org.apache.hadoop.io.compress=WARN +#log4j.logger.org.apache.parquet.hadoop.codec.CodecConfig=WARN +log4j.logger.parquet.hadoop.ColumnChunkPageWriteStore=ERROR +log4j.logger.com.jayway.jsonpath.internal.path.CompiledPath=WARN +log4j.logger.org.apache.parquet.hadoop.ParquetRecordReader=ERROR +log4j.logger.parquet.hadoop=WARN +log4j.logger.org.eclipse.jetty.server.handler.ContextHandlerCollection=WARN +log4j.logger.org.spark_project.jetty.util.component.ContainerLifeCycle=WARN +log4j.logger.org.apache.hadoop.mapred.FileInputFormat=WARN +log4j.logger.org.spark_project.jetty.servlet.ServletHandler=WARN +log4j.logger.org.apache.commons.beanutils.converters.BooleanConverter=WARN +log4j.logger.org.apache.commons.beanutils.converters.StringConverter=WARN +log4j.logger.org.apache.commons.beanutils.converters.LongConverter=WARN +log4j.logger.org.apache.commons.beanutils.converters.ArrayConverter=WARN +log4j.logger.org.apache.commons.beanutils.converters.FloatConverter=WARN +log4j.logger.org.apache.commons.beanutils.converters.IntegerConverter=WARN +log4j.logger.org.apache.commons.beanutils.converters.DoubleConverter=WARN +log4j.logger.org.apache.commons.beanutils.converters.CharacterConverter=WARN +log4j.logger.org.apache.commons.beanutils.converters.ByteConverter=WARN +log4j.logger.org.apache.commons.beanutils.converters.BigIntegerConverter=WARN +log4j.logger.org.apache.commons.beanutils.converters.BigDecimalConverter=WARN +log4j.logger.org.apache.commons.beanutils.converters.ShortConverter=WARN +log4j.logger.org.apache.commons.beanutils.BeanUtils=WARN