diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java index 1cb04d02f7..e5c8a4606e 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java @@ -5,29 +5,29 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; -import eu.dnetlib.pace.config.DedupConfig; import java.util.Collection; import java.util.Iterator; -import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Tuple2; public class DedupRecordFactory { + private static final Logger log = LoggerFactory.getLogger(DedupRecordFactory.class); + protected static final ObjectMapper OBJECT_MAPPER = - new com.fasterxml.jackson.databind.ObjectMapper() - .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); public static Dataset createDedupRecord( final SparkSession spark, final String mergeRelsInputPath, final String entitiesInputPath, - final Class clazz, - final DedupConfig dedupConf) { + final Class clazz) { long ts = System.currentTimeMillis(); @@ -54,40 +54,39 @@ public class DedupRecordFactory { r -> new Tuple2<>(r.getSource(), r.getTarget()), Encoders.tuple(Encoders.STRING(), Encoders.STRING())); - // return mergeRels - .joinWith(entities, mergeRels.col("_1").equalTo(entities.col("_1")), "left_outer") - .filter( - (FilterFunction, Tuple2>>) - value -> value._2() != null) + .joinWith(entities, mergeRels.col("_2").equalTo(entities.col("_1")), "inner") .map( - (MapFunction, Tuple2>, T>) - value -> value._2()._2(), - Encoders.kryo(clazz)) - .groupByKey((MapFunction) value -> value.getId(), Encoders.STRING()) + (MapFunction< + Tuple2, Tuple2>, + Tuple2>) + value -> new Tuple2<>(value._1()._1(), value._2()._2()), + Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz))) + .groupByKey( + (MapFunction, String>) entity -> entity._1(), + Encoders.STRING()) .mapGroups( - (MapGroupsFunction) + (MapGroupsFunction, T>) (key, values) -> entityMerger(key, values, ts, clazz), Encoders.bean(clazz)); } private static T entityMerger( - String id, Iterator entities, final long ts, Class clazz) { + String id, Iterator> entities, long ts, Class clazz) { try { T entity = clazz.newInstance(); entity.setId(id); - if (entity.getDataInfo() == null) { - entity.setDataInfo(new DataInfo()); - } + entity.setDataInfo(new DataInfo()); entity.getDataInfo().setTrust("0.9"); entity.setLastupdatetimestamp(ts); final Collection dates = Lists.newArrayList(); entities.forEachRemaining( - e -> { - entity.mergeFrom(e); - if (ModelSupport.isSubClass(e, Result.class)) { - Result r1 = (Result) e; + t -> { + T duplicate = t._2(); + entity.mergeFrom(duplicate); + if (ModelSupport.isSubClass(duplicate, Result.class)) { + Result r1 = (Result) duplicate; Result er = (Result) entity; er.setAuthor(DedupUtility.mergeAuthor(er.getAuthor(), r1.getAuthor())); diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java index 127a19139e..d442d73a89 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java @@ -71,7 +71,7 @@ public class SparkCreateDedupRecord extends AbstractSparkAction { Class clazz = ModelSupport.entityTypes.get(EntityType.valueOf(subEntity)); - DedupRecordFactory.createDedupRecord(spark, mergeRelPath, entityPath, clazz, dedupConf) + DedupRecordFactory.createDedupRecord(spark, mergeRelPath, entityPath, clazz) .map( (MapFunction) value -> OBJECT_MAPPER.writeValueAsString(value), diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java index ce0be4f915..8fd7a0ec9c 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java @@ -1,10 +1,19 @@ package eu.dnetlib.dhp.oa.dedup; +import static java.nio.file.Files.createTempDirectory; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.lenient; + import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.pace.util.MapDocumentUtil; +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.net.URISyntaxException; +import java.nio.file.Paths; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; @@ -23,16 +32,6 @@ import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import scala.Tuple2; -import java.io.File; -import java.io.IOException; -import java.io.Serializable; -import java.net.URISyntaxException; -import java.nio.file.Paths; - -import static java.nio.file.Files.createTempDirectory; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Mockito.lenient; - @ExtendWith(MockitoExtension.class) @TestMethodOrder(MethodOrderer.OrderAnnotation.class) public class SparkDedupTest implements Serializable { @@ -46,65 +45,104 @@ public class SparkDedupTest implements Serializable { private static String testGraphBasePath; private static String testOutputBasePath; private static String testDedupGraphBasePath; - private final static String testActionSetId = "test-orchestrator"; + private static final String testActionSetId = "test-orchestrator"; @BeforeAll - private static void cleanUp() throws IOException, URISyntaxException { + public static void cleanUp() throws IOException, URISyntaxException { - testGraphBasePath = Paths.get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/entities").toURI()).toFile().getAbsolutePath(); + testGraphBasePath = + Paths.get( + SparkDedupTest.class + .getResource("/eu/dnetlib/dhp/dedup/entities") + .toURI()) + .toFile() + .getAbsolutePath(); - testOutputBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-").toAbsolutePath().toString(); - testDedupGraphBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-").toAbsolutePath().toString(); + testOutputBasePath = + createTempDirectory(SparkDedupTest.class.getSimpleName() + "-") + .toAbsolutePath() + .toString(); + testDedupGraphBasePath = + createTempDirectory(SparkDedupTest.class.getSimpleName() + "-") + .toAbsolutePath() + .toString(); FileUtils.deleteDirectory(new File(testOutputBasePath)); FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); - spark = SparkSession - .builder() - .appName(SparkCreateSimRels.class.getSimpleName()) - .master("local[*]") - .config(new SparkConf()) - .getOrCreate(); + spark = + SparkSession.builder() + .appName(SparkCreateSimRels.class.getSimpleName()) + .master("local[*]") + .config(new SparkConf()) + .getOrCreate(); jsc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - } @BeforeEach - private void setUp() throws IOException, ISLookUpException { + public void setUp() throws IOException, ISLookUpException { - lenient().when(isLookUpService.getResourceProfileByQuery(Mockito.contains(testActionSetId))) - .thenReturn(IOUtils.toString(SparkDedupTest.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator.xml"))); + lenient() + .when(isLookUpService.getResourceProfileByQuery(Mockito.contains(testActionSetId))) + .thenReturn( + IOUtils.toString( + SparkDedupTest.class.getResourceAsStream( + "/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator.xml"))); - lenient().when(isLookUpService.getResourceProfileByQuery(Mockito.contains("organization"))) - .thenReturn(IOUtils.toString(SparkDedupTest.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json"))); + lenient() + .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("organization"))) + .thenReturn( + IOUtils.toString( + SparkDedupTest.class.getResourceAsStream( + "/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json"))); - lenient().when(isLookUpService.getResourceProfileByQuery(Mockito.contains("publication"))) - .thenReturn(IOUtils.toString(SparkDedupTest.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json"))); - - lenient().when(isLookUpService.getResourceProfileByQuery(Mockito.contains("software"))) - .thenReturn(IOUtils.toString(SparkDedupTest.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json"))); + lenient() + .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("publication"))) + .thenReturn( + IOUtils.toString( + SparkDedupTest.class.getResourceAsStream( + "/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json"))); + lenient() + .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("software"))) + .thenReturn( + IOUtils.toString( + SparkDedupTest.class.getResourceAsStream( + "/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json"))); } @Test @Order(1) public void createSimRelsTest() throws Exception { - ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils.toString( - SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json"))); - parser.parseArgument(new String[]{ - "-i", testGraphBasePath, - "-asi", testActionSetId, - "-la", "lookupurl", - "-w", testOutputBasePath}); + ArgumentApplicationParser parser = + new ArgumentApplicationParser( + IOUtils.toString( + SparkCreateSimRels.class.getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json"))); + parser.parseArgument( + new String[] { + "-i", testGraphBasePath, + "-asi", testActionSetId, + "-la", "lookupurl", + "-w", testOutputBasePath + }); new SparkCreateSimRels(parser, spark).run(isLookUpService); - long orgs_simrel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/organization_simrel").count(); - long pubs_simrel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/publication_simrel").count(); - long sw_simrel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/software_simrel").count(); + long orgs_simrel = + spark.read() + .load(testOutputBasePath + "/" + testActionSetId + "/organization_simrel") + .count(); + long pubs_simrel = + spark.read() + .load(testOutputBasePath + "/" + testActionSetId + "/publication_simrel") + .count(); + long sw_simrel = + spark.read() + .load(testOutputBasePath + "/" + testActionSetId + "/software_simrel") + .count(); assertEquals(3432, orgs_simrel); assertEquals(7260, pubs_simrel); @@ -115,20 +153,33 @@ public class SparkDedupTest implements Serializable { @Order(2) public void createMergeRelsTest() throws Exception { - ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils.toString( - SparkCreateMergeRels.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json"))); - parser.parseArgument(new String[]{ - "-i", testGraphBasePath, - "-asi", testActionSetId, - "-la", "lookupurl", - "-w", testOutputBasePath}); + ArgumentApplicationParser parser = + new ArgumentApplicationParser( + IOUtils.toString( + SparkCreateMergeRels.class.getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json"))); + parser.parseArgument( + new String[] { + "-i", testGraphBasePath, + "-asi", testActionSetId, + "-la", "lookupurl", + "-w", testOutputBasePath + }); new SparkCreateMergeRels(parser, spark).run(isLookUpService); - long orgs_mergerel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel").count(); - long pubs_mergerel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel").count(); - long sw_mergerel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel").count(); + long orgs_mergerel = + spark.read() + .load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel") + .count(); + long pubs_mergerel = + spark.read() + .load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel") + .count(); + long sw_mergerel = + spark.read() + .load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel") + .count(); assertEquals(1276, orgs_mergerel); assertEquals(1460, pubs_mergerel); @@ -139,20 +190,38 @@ public class SparkDedupTest implements Serializable { @Order(3) public void createDedupRecordTest() throws Exception { - ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils.toString( - SparkCreateDedupRecord.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createDedupRecord_parameters.json"))); - parser.parseArgument(new String[]{ - "-i", testGraphBasePath, - "-asi", testActionSetId, - "-la", "lookupurl", - "-w", testOutputBasePath}); + ArgumentApplicationParser parser = + new ArgumentApplicationParser( + IOUtils.toString( + SparkCreateDedupRecord.class.getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/createDedupRecord_parameters.json"))); + parser.parseArgument( + new String[] { + "-i", testGraphBasePath, + "-asi", testActionSetId, + "-la", "lookupurl", + "-w", testOutputBasePath + }); new SparkCreateDedupRecord(parser, spark).run(isLookUpService); - long orgs_deduprecord = jsc.textFile(testOutputBasePath + "/" + testActionSetId + "/organization_deduprecord").count(); - long pubs_deduprecord = jsc.textFile(testOutputBasePath + "/" + testActionSetId + "/publication_deduprecord").count(); - long sw_deduprecord = jsc.textFile(testOutputBasePath + "/" + testActionSetId + "/software_deduprecord").count(); + long orgs_deduprecord = + jsc.textFile( + testOutputBasePath + + "/" + + testActionSetId + + "/organization_deduprecord") + .count(); + long pubs_deduprecord = + jsc.textFile( + testOutputBasePath + + "/" + + testActionSetId + + "/publication_deduprecord") + .count(); + long sw_deduprecord = + jsc.textFile(testOutputBasePath + "/" + testActionSetId + "/software_deduprecord") + .count(); assertEquals(82, orgs_deduprecord); assertEquals(66, pubs_deduprecord); @@ -163,14 +232,17 @@ public class SparkDedupTest implements Serializable { @Order(4) public void updateEntityTest() throws Exception { - ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils.toString( - SparkUpdateEntity.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json"))); - parser.parseArgument(new String[]{ - "-i", testGraphBasePath, - "-w", testOutputBasePath, - "-o", testDedupGraphBasePath - }); + ArgumentApplicationParser parser = + new ArgumentApplicationParser( + IOUtils.toString( + SparkUpdateEntity.class.getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json"))); + parser.parseArgument( + new String[] { + "-i", testGraphBasePath, + "-w", testOutputBasePath, + "-o", testDedupGraphBasePath + }); new SparkUpdateEntity(parser, spark).run(isLookUpService); @@ -180,19 +252,25 @@ public class SparkDedupTest implements Serializable { long datasource = jsc.textFile(testDedupGraphBasePath + "/datasource").count(); long softwares = jsc.textFile(testDedupGraphBasePath + "/software").count(); - long mergedOrgs = spark - .read().load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel").as(Encoders.bean(Relation.class)) - .where("relClass=='merges'") - .javaRDD() - .map(Relation::getTarget) - .distinct().count(); + long mergedOrgs = + spark.read() + .load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel") + .as(Encoders.bean(Relation.class)) + .where("relClass=='merges'") + .javaRDD() + .map(Relation::getTarget) + .distinct() + .count(); - long mergedPubs = spark - .read().load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel").as(Encoders.bean(Relation.class)) - .where("relClass=='merges'") - .javaRDD() - .map(Relation::getTarget) - .distinct().count(); + long mergedPubs = + spark.read() + .load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel") + .as(Encoders.bean(Relation.class)) + .where("relClass=='merges'") + .javaRDD() + .map(Relation::getTarget) + .distinct() + .count(); assertEquals(831, publications); assertEquals(835, organizations); @@ -200,10 +278,15 @@ public class SparkDedupTest implements Serializable { assertEquals(100, datasource); assertEquals(200, softwares); - long deletedOrgs = jsc.textFile(testDedupGraphBasePath + "/organization") - .filter(this::isDeletedByInference).count(); - long deletedPubs = jsc.textFile(testDedupGraphBasePath + "/publication") - .filter(this::isDeletedByInference).count(); + long deletedOrgs = + jsc.textFile(testDedupGraphBasePath + "/organization") + .filter(this::isDeletedByInference) + .count(); + + long deletedPubs = + jsc.textFile(testDedupGraphBasePath + "/publication") + .filter(this::isDeletedByInference) + .count(); assertEquals(mergedOrgs, deletedOrgs); assertEquals(mergedPubs, deletedPubs); @@ -213,14 +296,17 @@ public class SparkDedupTest implements Serializable { @Order(5) public void propagateRelationTest() throws Exception { - ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils.toString( - SparkPropagateRelation.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json"))); - parser.parseArgument(new String[]{ - "-i", testGraphBasePath, - "-w", testOutputBasePath, - "-o", testDedupGraphBasePath - }); + ArgumentApplicationParser parser = + new ArgumentApplicationParser( + IOUtils.toString( + SparkPropagateRelation.class.getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json"))); + parser.parseArgument( + new String[] { + "-i", testGraphBasePath, + "-w", testOutputBasePath, + "-o", testDedupGraphBasePath + }); new SparkPropagateRelation(parser, spark).run(isLookUpService); @@ -228,22 +314,37 @@ public class SparkDedupTest implements Serializable { assertEquals(826, relations); - //check deletedbyinference - final Dataset mergeRels = spark.read().load(DedupUtility.createMergeRelPath(testOutputBasePath, "*", "*")).as(Encoders.bean(Relation.class)); - final JavaPairRDD mergedIds = mergeRels - .where("relClass == 'merges'") - .select(mergeRels.col("target")) - .distinct() - .toJavaRDD() - .mapToPair((PairFunction) r -> new Tuple2(r.getString(0), "d")); + // check deletedbyinference + final Dataset mergeRels = + spark.read() + .load(DedupUtility.createMergeRelPath(testOutputBasePath, "*", "*")) + .as(Encoders.bean(Relation.class)); + final JavaPairRDD mergedIds = + mergeRels + .where("relClass == 'merges'") + .select(mergeRels.col("target")) + .distinct() + .toJavaRDD() + .mapToPair( + (PairFunction) + r -> new Tuple2(r.getString(0), "d")); - JavaRDD toCheck = jsc.textFile(testDedupGraphBasePath + "/relation") - .mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.source", json), json)) - .join(mergedIds) - .map(t -> t._2()._1()) - .mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.target", json), json)) - .join(mergedIds) - .map(t -> t._2()._1()); + JavaRDD toCheck = + jsc.textFile(testDedupGraphBasePath + "/relation") + .mapToPair( + json -> + new Tuple2<>( + MapDocumentUtil.getJPathString("$.source", json), + json)) + .join(mergedIds) + .map(t -> t._2()._1()) + .mapToPair( + json -> + new Tuple2<>( + MapDocumentUtil.getJPathString("$.target", json), + json)) + .join(mergedIds) + .map(t -> t._2()._1()); long deletedbyinference = toCheck.filter(this::isDeletedByInference).count(); long updated = toCheck.count(); @@ -258,6 +359,6 @@ public class SparkDedupTest implements Serializable { } public boolean isDeletedByInference(String s) { - return s.contains("\"deletedbyinference\":true"); + return s.contains("\"deletedbyinference\":true"); } -} \ No newline at end of file +}