diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java index f9fc8a21a1..82bf87ccaf 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java @@ -112,7 +112,7 @@ public class DedupRecordFactory { // set authors and date if (ModelSupport.isSubClass(entity, Result.class)) { - ((Result) entity).setDateofacceptance(DatePicker.pick(dates)); + // ((Result) entity).setDateofacceptance(DatePicker.pick(dates)); ((Result) entity).setAuthor(AuthorMerger.merge(authors)); } diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupPublicationTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkPublicationRootsTest.java similarity index 57% rename from dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupPublicationTest.java rename to dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkPublicationRootsTest.java index 773de65faf..3cff836eb4 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupPublicationTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkPublicationRootsTest.java @@ -11,16 +11,17 @@ import java.io.File; import java.io.IOException; import java.io.Serializable; import java.net.URISyntaxException; +import java.nio.file.Path; import java.nio.file.Paths; import java.util.*; import java.util.stream.Collectors; +import org.apache.commons.cli.ParseException; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FilterFunction; -import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -31,7 +32,6 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Sets; @@ -44,48 +44,52 @@ import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; @ExtendWith(MockitoExtension.class) @TestMethodOrder(MethodOrderer.OrderAnnotation.class) -public class SparkDedupPublicationTest implements Serializable { +public class SparkPublicationRootsTest implements Serializable { @Mock(serializable = true) ISLookUpService isLookUpService; private static SparkSession spark; - private static JavaSparkContext jsc; + private static String workingPath; - private static String testGraphBasePath; - private static String testOutputBasePath; - private static String testDedupGraphBasePath; + private static String graphInputPath; + private static String graphOutputPath; private static final String testActionSetId = "test-orchestrator"; + private static Path testBaseTmpPath; + + private static final ObjectMapper MAPPER = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + @BeforeAll - public static void cleanUp() throws IOException, URISyntaxException { + public static void init() throws IOException, URISyntaxException { - testGraphBasePath = Paths - .get(SparkDedupPublicationTest.class.getResource("/eu/dnetlib/dhp/dedup/entities2").toURI()) - .toFile() - .getAbsolutePath(); - testOutputBasePath = createTempDirectory(SparkDedupPublicationTest.class.getSimpleName() + "-") - .toAbsolutePath() - .toString(); + testBaseTmpPath = createTempDirectory(SparkPublicationRootsTest.class.getSimpleName() + "-"); - testDedupGraphBasePath = createTempDirectory(SparkDedupPublicationTest.class.getSimpleName() + "-") - .toAbsolutePath() - .toString(); + final File entitiesSources = Paths + .get(SparkPublicationRootsTest.class.getResource("/eu/dnetlib/dhp/dedup/root").toURI()) + .toFile(); - FileUtils.deleteDirectory(new File(testOutputBasePath)); - FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); + FileUtils + .copyDirectory( + entitiesSources, + testBaseTmpPath.resolve("input").toFile()); + + workingPath = testBaseTmpPath.resolve("workingPath").toString(); + graphInputPath = testBaseTmpPath.resolve("input").resolve("entities").toString(); + graphOutputPath = testBaseTmpPath.resolve("output").toString(); + + FileUtils.deleteDirectory(new File(workingPath)); + FileUtils.deleteDirectory(new File(graphOutputPath)); final SparkConf conf = new SparkConf(); conf.set("spark.sql.shuffle.partitions", "10"); spark = SparkSession .builder() - .appName(SparkDedupPublicationTest.class.getSimpleName()) + .appName(SparkPublicationRootsTest.class.getSimpleName()) .master("local[*]") .config(conf) .getOrCreate(); - - jsc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - } @BeforeEach @@ -100,55 +104,51 @@ public class SparkDedupPublicationTest implements Serializable { .thenReturn(classPathResourceAsString("/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json")); } + @AfterAll + public static void tearDown() throws IOException { + FileUtils.deleteDirectory(testBaseTmpPath.toFile()); + spark.close(); + } + @Test @Order(1) void createSimRelsTest() throws Exception { - - ArgumentApplicationParser parser = new ArgumentApplicationParser( - classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json")); - - parser - .parseArgument( - new String[] { - "--graphBasePath", testGraphBasePath, - "--actionSetId", testActionSetId, - "--isLookUpUrl", "lookupurl", - "--workingPath", testOutputBasePath, - "--numPartitions", "5" - }); - - new SparkCreateSimRels(parser, spark).run(isLookUpService); + new SparkCreateSimRels(args( + "/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json", + new String[] { + "--graphBasePath", graphInputPath, + "--actionSetId", testActionSetId, + "--isLookUpUrl", "lookupurl", + "--workingPath", workingPath, + "--numPartitions", "5" + }), spark) + .run(isLookUpService); long pubs_simrel = spark .read() - .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "publication")) + .load(DedupUtility.createSimRelPath(workingPath, testActionSetId, "publication")) .count(); - assertEquals(62, pubs_simrel); + assertEquals(74, pubs_simrel); } @Test @Order(2) void cutMergeRelsTest() throws Exception { - - ArgumentApplicationParser parser = new ArgumentApplicationParser( - classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json")); - - parser - .parseArgument( - new String[] { - "--graphBasePath", testGraphBasePath, - "--actionSetId", testActionSetId, - "--isLookUpUrl", "lookupurl", - "--workingPath", testOutputBasePath, - "--cutConnectedComponent", "3" - }); - - new SparkCreateMergeRels(parser, spark).run(isLookUpService); + new SparkCreateMergeRels(args( + "/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json", + new String[] { + "--graphBasePath", graphInputPath, + "--actionSetId", testActionSetId, + "--isLookUpUrl", "lookupurl", + "--workingPath", workingPath, + "--cutConnectedComponent", "3" + }), spark) + .run(isLookUpService); long pubs_mergerel = spark .read() - .load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel") + .load(workingPath + "/" + testActionSetId + "/publication_mergerel") .as(Encoders.bean(Relation.class)) .filter((FilterFunction) r -> r.getRelClass().equalsIgnoreCase("merges")) .groupBy("source") @@ -159,49 +159,44 @@ public class SparkDedupPublicationTest implements Serializable { assertEquals(0, pubs_mergerel); - FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel")); + FileUtils.deleteDirectory(new File(workingPath + "/" + testActionSetId + "/publication_mergerel")); } @Test @Order(3) void createMergeRelsTest() throws Exception { + new SparkCreateMergeRels(args( + "/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json", + new String[] { + "--graphBasePath", graphInputPath, + "--actionSetId", testActionSetId, + "--isLookUpUrl", "lookupurl", + "--workingPath", workingPath + }), spark) + .run(isLookUpService); - ArgumentApplicationParser parser = new ArgumentApplicationParser( - classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json")); - - parser - .parseArgument( - new String[] { - "--graphBasePath", testGraphBasePath, - "--actionSetId", testActionSetId, - "--isLookUpUrl", "lookupurl", - "--workingPath", testOutputBasePath - }); - - new SparkCreateMergeRels(parser, spark).run(isLookUpService); - - final Dataset pubs = spark + final Dataset merges = spark .read() - .load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel") + .load(workingPath + "/" + testActionSetId + "/publication_mergerel") .as(Encoders.bean(Relation.class)); - final List merges = pubs + final List mergeList = merges .filter("source == '50|doi_dedup___::d5021b53204e4fdeab6ff5d5bc468032'") .collectAsList(); - assertEquals(3, merges.size()); + assertEquals(3, mergeList.size()); Set dups = Sets .newHashSet( "50|doi_________::3b1d0d8e8f930826665df9d6b82fbb73", "50|doi_________::d5021b53204e4fdeab6ff5d5bc468032", "50|arXiv_______::c93aeb433eb90ed7a86e29be00791b7c"); - merges.forEach(r -> { + mergeList.forEach(r -> { assertEquals(ModelConstants.RESULT_RESULT, r.getRelType()); assertEquals(ModelConstants.DEDUP, r.getSubRelType()); assertEquals(ModelConstants.MERGES, r.getRelClass()); assertTrue(dups.contains(r.getTarget())); }); - final List mergedIn = pubs + final List mergedIn = merges .filter("target == '50|doi_dedup___::d5021b53204e4fdeab6ff5d5bc468032'") .collectAsList(); assertEquals(3, mergedIn.size()); @@ -212,47 +207,37 @@ public class SparkDedupPublicationTest implements Serializable { assertTrue(dups.contains(r.getSource())); }); - assertEquals(24, pubs.count()); + assertEquals(32, merges.count()); } @Test @Order(4) void createDedupRecordTest() throws Exception { - - ArgumentApplicationParser parser = new ArgumentApplicationParser( - classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/createDedupRecord_parameters.json")); - parser - .parseArgument( - new String[] { - "--graphBasePath", testGraphBasePath, - "--actionSetId", testActionSetId, - "--isLookUpUrl", "lookupurl", - "--workingPath", testOutputBasePath - }); - - new SparkCreateDedupRecord(parser, spark).run(isLookUpService); - - final ObjectMapper mapper = new ObjectMapper() - .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + new SparkCreateDedupRecord(args( + "/eu/dnetlib/dhp/oa/dedup/createDedupRecord_parameters.json", + new String[] { + "--graphBasePath", graphInputPath, + "--actionSetId", testActionSetId, + "--isLookUpUrl", "lookupurl", + "--workingPath", workingPath + }), spark) + .run(isLookUpService); final Dataset roots = spark .read() - .textFile(testOutputBasePath + "/" + testActionSetId + "/publication_deduprecord") - .map( - (MapFunction) value -> mapper.readValue(value, Publication.class), - Encoders.bean(Publication.class)); + .textFile(workingPath + "/" + testActionSetId + "/publication_deduprecord") + .map(asEntity(Publication.class), Encoders.bean(Publication.class)); - assertEquals(2, roots.count()); + assertEquals(3, roots.count()); final Dataset pubs = spark .read() - .textFile(DedupUtility.createEntityPath(testGraphBasePath, "publication")) - .map( - (MapFunction) value -> mapper.readValue(value, Publication.class), - Encoders.bean(Publication.class)); + .textFile(DedupUtility.createEntityPath(graphInputPath, "publication")) + .map(asEntity(Publication.class), Encoders.bean(Publication.class)); verifyRoot_case_1(roots, pubs); verifyRoot_case_2(roots, pubs); + verifyRoot_case_3(roots, pubs); } private static void verifyRoot_case_1(Dataset roots, Dataset pubs) { @@ -299,8 +284,7 @@ public class SparkDedupPublicationTest implements Serializable { assertEquals("Article", instance_cr.get().getInstancetype().getClassname()); } - private void verifyRoot_case_2(Dataset roots, Dataset pubs) - throws JsonProcessingException { + private void verifyRoot_case_2(Dataset roots, Dataset pubs) { Publication root = roots .filter("id = '50|doi_dedup___::18aff3b55fb6876466a5d4bd82434885'") .first(); @@ -334,57 +318,86 @@ public class SparkDedupPublicationTest implements Serializable { assertTrue(Sets.difference(root_cf, dups_cf).isEmpty()); } + private void verifyRoot_case_3(Dataset roots, Dataset pubs) { + Publication root = roots + .filter("id = '50|dedup_wf_001::31ca734cc22181b704c4aa8fd050062a'") + .first(); + assertNotNull(root); + + Publication pivot_duplicate = pubs + .filter("id = '50|od_______166::31ca734cc22181b704c4aa8fd050062a'") + .first(); + + assertEquals(pivot_duplicate.getPublisher().getValue(), root.getPublisher().getValue()); + + Set dups_cf = pubs + .collectAsList() + .stream() + .flatMap(p -> p.getCollectedfrom().stream()) + .map(KeyValue::getValue) + .collect(Collectors.toCollection(HashSet::new)); + + Set root_cf = root + .getCollectedfrom() + .stream() + .map(KeyValue::getValue) + .collect(Collectors.toCollection(HashSet::new)); + + assertTrue(Sets.difference(root_cf, dups_cf).isEmpty()); + } + @Test @Order(6) void updateEntityTest() throws Exception { + new SparkUpdateEntity(args( + "/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json", + new String[] { + "--graphBasePath", graphInputPath, + "--workingPath", workingPath, + "--dedupGraphPath", graphOutputPath + }), spark) + .run(isLookUpService); - ArgumentApplicationParser parser = new ArgumentApplicationParser( - classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json")); - parser - .parseArgument( - new String[] { - "-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath - }); - - new SparkUpdateEntity(parser, spark).run(isLookUpService); - - long publications = jsc.textFile(testDedupGraphBasePath + "/publication").count(); + long publications = spark.read().textFile(graphOutputPath + "/publication").count(); long mergedPubs = spark .read() - .load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel") + .load(workingPath + "/" + testActionSetId + "/publication_mergerel") .as(Encoders.bean(Relation.class)) .where("relClass=='merges'") - .javaRDD() - .map(Relation::getTarget) + .map((MapFunction) Relation::getTarget, Encoders.STRING()) .distinct() .count(); - assertEquals(14, publications); + assertEquals(19, publications); // 16 originals + 3 roots - long deletedPubs = jsc - .textFile(testDedupGraphBasePath + "/publication") - .filter(this::isDeletedByInference) + long deletedPubs = spark + .read() + .textFile(graphOutputPath + "/publication") + .map(asEntity(Publication.class), Encoders.bean(Publication.class)) + .filter("datainfo.deletedbyinference == true") + .map((MapFunction) OafEntity::getId, Encoders.STRING()) + .distinct() .count(); assertEquals(mergedPubs, deletedPubs); } - @AfterAll - public static void finalCleanUp() throws IOException { - FileUtils.deleteDirectory(new File(testOutputBasePath)); - FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); - } - - public boolean isDeletedByInference(String s) { - return s.contains("\"deletedbyinference\":true"); - } - private static String classPathResourceAsString(String path) throws IOException { return IOUtils .toString( - SparkDedupPublicationTest.class + SparkPublicationRootsTest.class .getResourceAsStream(path)); } + private static MapFunction asEntity(Class clazz) { + return value -> MAPPER.readValue(value, clazz); + } + + private ArgumentApplicationParser args(String paramSpecs, String[] args) throws IOException, ParseException { + ArgumentApplicationParser parser = new ArgumentApplicationParser(classPathResourceAsString(paramSpecs)); + parser.parseArgument(args); + return parser; + } + } diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkPublicationRootsTest2.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkPublicationRootsTest2.java new file mode 100644 index 0000000000..9afe1e34b8 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkPublicationRootsTest2.java @@ -0,0 +1,251 @@ + +package eu.dnetlib.dhp.oa.dedup; + +import static java.nio.file.Files.createTempDirectory; + +import static org.apache.spark.sql.functions.count; +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.lenient; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.net.URISyntaxException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.commons.cli.ParseException; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Sets; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; + +@ExtendWith(MockitoExtension.class) +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +public class SparkPublicationRootsTest2 implements Serializable { + + @Mock(serializable = true) + ISLookUpService isLookUpService; + private static SparkSession spark; + + private static String workingPath; + + private static String graphInputPath; + + private static String graphOutputPath; + + private static final String testActionSetId = "test-orchestrator"; + + private static Path testBaseTmpPath; + + private static final ObjectMapper MAPPER = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + @BeforeAll + public static void init() throws IOException, URISyntaxException { + + testBaseTmpPath = createTempDirectory(SparkPublicationRootsTest2.class.getSimpleName() + "-"); + + final File entitiesSources = Paths + .get(SparkPublicationRootsTest2.class.getResource("/eu/dnetlib/dhp/dedup/root").toURI()) + .toFile(); + + FileUtils + .copyDirectory( + entitiesSources, + testBaseTmpPath.resolve("input").toFile()); + + FileUtils + .copyFileToDirectory( + Paths + .get( + SparkPublicationRootsTest2.class + .getResource( + "/eu/dnetlib/dhp/dedup/root/alterations/publication/publication_1.gz") + .toURI()) + .toFile(), + testBaseTmpPath.resolve("input").resolve("entities").resolve("publication").toFile()); + + workingPath = testBaseTmpPath.resolve("workingPath").toString(); + graphInputPath = testBaseTmpPath.resolve("input").resolve("entities").toString(); + graphOutputPath = testBaseTmpPath.resolve("output").toString(); + + final SparkConf conf = new SparkConf(); + conf.set("spark.sql.shuffle.partitions", "10"); + spark = SparkSession + .builder() + .appName(SparkPublicationRootsTest2.class.getSimpleName()) + .master("local[*]") + .config(conf) + .getOrCreate(); + } + + @BeforeEach + public void setUp() throws IOException, ISLookUpException { + + lenient() + .when(isLookUpService.getResourceProfileByQuery(Mockito.contains(testActionSetId))) + .thenReturn(classPathResourceAsString("/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator_publication.xml")); + + lenient() + .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("publication"))) + .thenReturn(classPathResourceAsString("/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json")); + } + + @AfterAll + public static void tearDown() throws IOException { + FileUtils.deleteDirectory(testBaseTmpPath.toFile()); + } + + @Test + @Order(7) + void dedupAlteredDatasetTest() throws Exception { + + new SparkCreateSimRels(args( + "/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json", + new String[] { + "--graphBasePath", graphInputPath, + "--actionSetId", testActionSetId, + "--isLookUpUrl", "lookupurl", + "--workingPath", workingPath, + "--numPartitions", "5" + }), spark) + .run(isLookUpService); + + new SparkCreateMergeRels(args( + "/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json", + new String[] { + "--graphBasePath", graphInputPath, + "--actionSetId", testActionSetId, + "--isLookUpUrl", "lookupurl", + "--workingPath", workingPath + }), spark) + .run(isLookUpService); + + final Dataset merges = spark + .read() + .load(workingPath + "/" + testActionSetId + "/publication_mergerel") + .as(Encoders.bean(Relation.class)); + + assertEquals( + 3, merges + .filter("relclass == 'isMergedIn'") + .map((MapFunction) Relation::getTarget, Encoders.STRING()) + .distinct() + .count()); + assertEquals( + 4, merges + .filter("source == '50|doi_dedup___::b3aec7985136e36827176aaa1dd5082d'") + .count()); + + new SparkCreateDedupRecord(args( + "/eu/dnetlib/dhp/oa/dedup/createDedupRecord_parameters.json", + new String[] { + "--graphBasePath", graphInputPath, + "--actionSetId", testActionSetId, + "--isLookUpUrl", "lookupurl", + "--workingPath", workingPath + }), spark) + .run(isLookUpService); + + final Dataset roots = spark + .read() + .textFile(workingPath + "/" + testActionSetId + "/publication_deduprecord") + .map(asEntity(Publication.class), Encoders.bean(Publication.class)); + + assertEquals(3, roots.count()); + + final Dataset pubs = spark + .read() + .textFile(DedupUtility.createEntityPath(graphInputPath, "publication")) + .map(asEntity(Publication.class), Encoders.bean(Publication.class)); + + Publication root = roots + .filter("id = '50|doi_dedup___::b3aec7985136e36827176aaa1dd5082d'") + .first(); + assertNotNull(root); + + Publication crossref_duplicate = pubs + .filter("id = '50|doi_________::b3aec7985136e36827176aaa1dd5082d'") + .collectAsList() + .get(0); + + assertEquals(crossref_duplicate.getDateofacceptance().getValue(), root.getDateofacceptance().getValue()); + assertEquals(crossref_duplicate.getJournal().getName(), root.getJournal().getName()); + assertEquals(crossref_duplicate.getJournal().getIssnPrinted(), root.getJournal().getIssnPrinted()); + assertEquals(crossref_duplicate.getPublisher().getValue(), root.getPublisher().getValue()); + + Set rootPids = root + .getPid() + .stream() + .map(StructuredProperty::getValue) + .collect(Collectors.toCollection(HashSet::new)); + Set dupPids = crossref_duplicate + .getPid() + .stream() + .map(StructuredProperty::getValue) + .collect(Collectors.toCollection(HashSet::new)); + + assertFalse(Sets.intersection(rootPids, dupPids).isEmpty()); + assertTrue(rootPids.contains("10.1109/jstqe.2022.3205716")); + assertTrue(rootPids.contains("10.1109/jstqe.2023.9999999")); + + Optional instance_cr = root + .getInstance() + .stream() + .filter(i -> i.getCollectedfrom().getValue().equals("Crossref")) + .findFirst(); + assertTrue(instance_cr.isPresent()); + assertEquals("OPEN", instance_cr.get().getAccessright().getClassid()); + assertEquals("Open Access", instance_cr.get().getAccessright().getClassname()); + assertEquals(OpenAccessRoute.hybrid, instance_cr.get().getAccessright().getOpenAccessRoute()); + assertEquals( + "IEEE Journal of Selected Topics in Quantum Electronics", instance_cr.get().getHostedby().getValue()); + assertEquals("0001", instance_cr.get().getInstancetype().getClassid()); + assertEquals("Article", instance_cr.get().getInstancetype().getClassname()); + + } + + private static String classPathResourceAsString(String path) throws IOException { + return IOUtils + .toString( + SparkPublicationRootsTest2.class + .getResourceAsStream(path)); + } + + private static MapFunction asEntity(Class clazz) { + return value -> MAPPER.readValue(value, clazz); + } + + private ArgumentApplicationParser args(String paramSpecs, String[] args) throws IOException, ParseException { + ArgumentApplicationParser parser = new ArgumentApplicationParser(classPathResourceAsString(paramSpecs)); + parser.parseArgument(args); + return parser; + } + +} diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/entities2/publication/publication.gz b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/entities2/publication/publication.gz deleted file mode 100644 index 96dd218174..0000000000 Binary files a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/entities2/publication/publication.gz and /dev/null differ diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/root/alterations/publication/publication_1.gz b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/root/alterations/publication/publication_1.gz new file mode 100644 index 0000000000..89ed9671e1 Binary files /dev/null and b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/root/alterations/publication/publication_1.gz differ diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/root/entities/publication/publication_0.gz b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/root/entities/publication/publication_0.gz new file mode 100644 index 0000000000..85706bc31f Binary files /dev/null and b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/root/entities/publication/publication_0.gz differ