diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/ror/GenerateRorActionSetJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/ror/GenerateRorActionSetJob.java index d306afef0..c8876a415 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/ror/GenerateRorActionSetJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/ror/GenerateRorActionSetJob.java @@ -9,6 +9,7 @@ import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.listKeyValues; import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.qualifier; import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.structuredProperty; +import java.io.InputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -22,10 +23,12 @@ import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; @@ -57,17 +60,20 @@ public class GenerateRorActionSetJob { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final List ROR_COLLECTED_FROM = listKeyValues("10|openaire____::993a7ae7a863813cf95028b50708e222", "ROR"); + private static final List ROR_COLLECTED_FROM = listKeyValues( + "10|openaire____::993a7ae7a863813cf95028b50708e222", "ROR"); - private static final DataInfo ROR_DATA_INFO = dataInfo(false, "", false, false, ENTITYREGISTRY_PROVENANCE_ACTION, "0.92"); + private static final DataInfo ROR_DATA_INFO = dataInfo( + false, "", false, false, ENTITYREGISTRY_PROVENANCE_ACTION, "0.92"); private static final Qualifier ROR_PID_TYPE = qualifier("ROR", "ROR", "dnet:pid_types", "dnet:pid_types"); public static void main(final String[] args) throws Exception { final String jsonConfiguration = IOUtils - .toString(SparkAtomicActionJob.class - .getResourceAsStream("/eu/dnetlib/dhp/actionmanager/ror/action_set_parameters.json")); + .toString( + SparkAtomicActionJob.class + .getResourceAsStream("/eu/dnetlib/dhp/actionmanager/ror/action_set_parameters.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); @@ -100,16 +106,16 @@ public class GenerateRorActionSetJob { private static void processRorOrganizations(final SparkSession spark, final String inputPath, - final String outputPath) { + final String outputPath) throws Exception { readInputPath(spark, inputPath) .map(GenerateRorActionSetJob::convertRorOrg, Encoders.bean(Organization.class)) .toJavaRDD() .map(o -> new AtomicAction<>(Organization.class, o)) - .mapToPair(aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), - new Text(OBJECT_MAPPER.writeValueAsString(aa)))) + .mapToPair( + aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), + new Text(OBJECT_MAPPER.writeValueAsString(aa)))) .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); - } protected static Organization convertRorOrg(final RorOrganization r) { @@ -142,7 +148,11 @@ public class GenerateRorActionSetJob { o.setEcsmevalidated(null); o.setEcnutscode(null); if (r.getCountry() != null) { - o.setCountry(qualifier(r.getCountry().getCountryCode(), r.getCountry().getCountryName(), COUNTRIES_VOC, COUNTRIES_VOC)); + o + .setCountry( + qualifier( + r.getCountry().getCountryCode(), r.getCountry().getCountryName(), COUNTRIES_VOC, + COUNTRIES_VOC)); } else { o.setCountry(null); } @@ -162,14 +172,24 @@ public class GenerateRorActionSetJob { if (all == null) { // skip } else if (all instanceof String) { - pids.add(structuredProperty(all.toString(), qualifier(type, type, "dnet:pid_types", "dnet:pid_types"), ROR_DATA_INFO)); + pids + .add( + structuredProperty( + all.toString(), qualifier(type, type, "dnet:pid_types", "dnet:pid_types"), ROR_DATA_INFO)); } else if (all instanceof Collection) { for (final Object pid : (Collection) all) { - pids.add(structuredProperty(pid.toString(), qualifier(type, type, "dnet:pid_types", "dnet:pid_types"), ROR_DATA_INFO)); + pids + .add( + structuredProperty( + pid.toString(), qualifier(type, type, "dnet:pid_types", "dnet:pid_types"), + ROR_DATA_INFO)); } } else if (all instanceof String[]) { for (final String pid : (String[]) all) { - pids.add(structuredProperty(pid, qualifier(type, type, "dnet:pid_types", "dnet:pid_types"), ROR_DATA_INFO)); + pids + .add( + structuredProperty( + pid, qualifier(type, type, "dnet:pid_types", "dnet:pid_types"), ROR_DATA_INFO)); } } else { log.warn("Invalid type for pid list: " + all.getClass()); @@ -185,16 +205,22 @@ public class GenerateRorActionSetJob { names.addAll(r.getAcronyms()); r.getLabels().forEach(l -> names.add(l.getLabel())); - return names.stream().filter(StringUtils::isNotBlank).map(s -> field(s, ROR_DATA_INFO)).collect(Collectors.toList()); + return names + .stream() + .filter(StringUtils::isNotBlank) + .map(s -> field(s, ROR_DATA_INFO)) + .collect(Collectors.toList()); } private static Dataset readInputPath( final SparkSession spark, - final String inputPath) { - return spark - .read() - .textFile(inputPath) - .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, RorOrganization.class), Encoders.bean(RorOrganization.class)); + final String path) throws Exception { + + try (final FileSystem fileSystem = FileSystem.get(new Configuration()); + final InputStream is = fileSystem.open(new Path(path))) { + final RorOrganization[] arr = OBJECT_MAPPER.readValue(is, RorOrganization[].class); + return spark.createDataset(Arrays.asList(arr), Encoders.bean(RorOrganization.class)); + } } } diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/ror/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/ror/oozie_app/workflow.xml index 844f9ebb2..3d00b80a8 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/ror/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/ror/oozie_app/workflow.xml @@ -1,11 +1,11 @@ - inputPath + rorJsonInputPath the path of the json - outputPath + rorActionSetPath path where to store the action set @@ -18,8 +18,8 @@ - - + + @@ -44,8 +44,8 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - --inputPath${inputPath} - --outputPath${outputPath} + --inputPath${rorJsonInputPath} + --outputPath${rorActionSetPath} diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/ror/GenerateRorActionSetJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/ror/GenerateRorActionSetJobTest.java index af2bd1c9d..f16901cb4 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/ror/GenerateRorActionSetJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/ror/GenerateRorActionSetJobTest.java @@ -1,3 +1,4 @@ + package eu.dnetlib.dhp.actionmanager.ror; import java.io.FileInputStream; @@ -20,11 +21,13 @@ class GenerateRorActionSetJobTest { private static final String local_file_path = "/Users/michele/Downloads/ror-data-2021-04-06.json"; @BeforeEach - void setUp() throws Exception {} + void setUp() throws Exception { + } @Test void testConvertRorOrg() throws Exception { - final RorOrganization r = mapper.readValue(IOUtils.toString(getClass().getResourceAsStream("ror_org.json")), RorOrganization.class); + final RorOrganization r = mapper + .readValue(IOUtils.toString(getClass().getResourceAsStream("ror_org.json")), RorOrganization.class); final Organization org = GenerateRorActionSetJob.convertRorOrg(r); System.out.println(mapper.writeValueAsString(org)); @@ -32,7 +35,8 @@ class GenerateRorActionSetJobTest { @Test void testConvertAllRorOrg() throws Exception { - final RorOrganization[] arr = mapper.readValue(IOUtils.toString(new FileInputStream(local_file_path)), RorOrganization[].class); + final RorOrganization[] arr = mapper + .readValue(IOUtils.toString(new FileInputStream(local_file_path)), RorOrganization[].class); for (final RorOrganization r : arr) { GenerateRorActionSetJob.convertRorOrg(r); diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java index bf4913056..a95e28c43 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java @@ -12,7 +12,6 @@ import java.io.IOException; import java.io.Serializable; import java.net.URISyntaxException; import java.nio.file.Paths; -import java.util.List; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; @@ -27,7 +26,13 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; -import org.junit.jupiter.api.*; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.Mockito; @@ -97,8 +102,7 @@ public class SparkDedupTest implements Serializable { IOUtils .toString( SparkDedupTest.class - .getResourceAsStream( - "/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator.xml"))); + .getResourceAsStream("/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator.xml"))); lenient() .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("organization"))) @@ -106,8 +110,7 @@ public class SparkDedupTest implements Serializable { IOUtils .toString( SparkDedupTest.class - .getResourceAsStream( - "/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json"))); + .getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json"))); lenient() .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("publication"))) @@ -115,8 +118,7 @@ public class SparkDedupTest implements Serializable { IOUtils .toString( SparkDedupTest.class - .getResourceAsStream( - "/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json"))); + .getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json"))); lenient() .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("software"))) @@ -124,8 +126,7 @@ public class SparkDedupTest implements Serializable { IOUtils .toString( SparkDedupTest.class - .getResourceAsStream( - "/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json"))); + .getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json"))); lenient() .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("dataset"))) @@ -133,8 +134,7 @@ public class SparkDedupTest implements Serializable { IOUtils .toString( SparkDedupTest.class - .getResourceAsStream( - "/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json"))); + .getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json"))); lenient() .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("otherresearchproduct"))) @@ -142,54 +142,51 @@ public class SparkDedupTest implements Serializable { IOUtils .toString( SparkDedupTest.class - .getResourceAsStream( - "/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json"))); + .getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json"))); } @Test @Order(1) public void createSimRelsTest() throws Exception { - ArgumentApplicationParser parser = new ArgumentApplicationParser( + final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils .toString( SparkCreateSimRels.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json"))); + .getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json"))); parser - .parseArgument( - new String[] { - "-i", testGraphBasePath, - "-asi", testActionSetId, - "-la", "lookupurl", - "-w", testOutputBasePath, - "-np", "50" - }); + .parseArgument(new String[] { + "-i", testGraphBasePath, + "-asi", testActionSetId, + "-la", "lookupurl", + "-w", testOutputBasePath, + "-np", "50" + }); new SparkCreateSimRels(parser, spark).run(isLookUpService); - long orgs_simrel = spark + final long orgs_simrel = spark .read() .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "organization")) .count(); - long pubs_simrel = spark + final long pubs_simrel = spark .read() .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "publication")) .count(); - long sw_simrel = spark + final long sw_simrel = spark .read() .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "software")) .count(); - long ds_simrel = spark + final long ds_simrel = spark .read() .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "dataset")) .count(); - long orp_simrel = spark + final long orp_simrel = spark .read() .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "otherresearchproduct")) .count(); @@ -205,31 +202,29 @@ public class SparkDedupTest implements Serializable { @Order(2) public void cutMergeRelsTest() throws Exception { - ArgumentApplicationParser parser = new ArgumentApplicationParser( + final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils .toString( SparkCreateMergeRels.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json"))); + .getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json"))); parser - .parseArgument( - new String[] { - "-i", - testGraphBasePath, - "-asi", - testActionSetId, - "-la", - "lookupurl", - "-w", - testOutputBasePath, - "-cc", - "3" - }); + .parseArgument(new String[] { + "-i", + testGraphBasePath, + "-asi", + testActionSetId, + "-la", + "lookupurl", + "-w", + testOutputBasePath, + "-cc", + "3" + }); new SparkCreateMergeRels(parser, spark).run(isLookUpService); - long orgs_mergerel = spark + final long orgs_mergerel = spark .read() .load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel") .as(Encoders.bean(Relation.class)) @@ -240,7 +235,7 @@ public class SparkDedupTest implements Serializable { .where("cnt > 3") .count(); - long pubs_mergerel = spark + final long pubs_mergerel = spark .read() .load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel") .as(Encoders.bean(Relation.class)) @@ -250,7 +245,7 @@ public class SparkDedupTest implements Serializable { .select("source", "cnt") .where("cnt > 3") .count(); - long sw_mergerel = spark + final long sw_mergerel = spark .read() .load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel") .as(Encoders.bean(Relation.class)) @@ -261,7 +256,7 @@ public class SparkDedupTest implements Serializable { .where("cnt > 3") .count(); - long ds_mergerel = spark + final long ds_mergerel = spark .read() .load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel") .as(Encoders.bean(Relation.class)) @@ -272,7 +267,7 @@ public class SparkDedupTest implements Serializable { .where("cnt > 3") .count(); - long orp_mergerel = spark + final long orp_mergerel = spark .read() .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel") .as(Encoders.bean(Relation.class)) @@ -301,46 +296,44 @@ public class SparkDedupTest implements Serializable { @Order(3) public void createMergeRelsTest() throws Exception { - ArgumentApplicationParser parser = new ArgumentApplicationParser( + final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils .toString( SparkCreateMergeRels.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json"))); + .getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json"))); parser - .parseArgument( - new String[] { - "-i", - testGraphBasePath, - "-asi", - testActionSetId, - "-la", - "lookupurl", - "-w", - testOutputBasePath - }); + .parseArgument(new String[] { + "-i", + testGraphBasePath, + "-asi", + testActionSetId, + "-la", + "lookupurl", + "-w", + testOutputBasePath + }); new SparkCreateMergeRels(parser, spark).run(isLookUpService); - long orgs_mergerel = spark + final long orgs_mergerel = spark .read() .load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel") .count(); - long pubs_mergerel = spark + final long pubs_mergerel = spark .read() .load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel") .count(); - long sw_mergerel = spark + final long sw_mergerel = spark .read() .load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel") .count(); - long ds_mergerel = spark + final long ds_mergerel = spark .read() .load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel") .count(); - long orp_mergerel = spark + final long orp_mergerel = spark .read() .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel") .count(); @@ -357,40 +350,39 @@ public class SparkDedupTest implements Serializable { @Order(4) public void createDedupRecordTest() throws Exception { - ArgumentApplicationParser parser = new ArgumentApplicationParser( + final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils .toString( SparkCreateDedupRecord.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/createDedupRecord_parameters.json"))); + .getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createDedupRecord_parameters.json"))); parser - .parseArgument( - new String[] { - "-i", - testGraphBasePath, - "-asi", - testActionSetId, - "-la", - "lookupurl", - "-w", - testOutputBasePath - }); + .parseArgument(new String[] { + "-i", + testGraphBasePath, + "-asi", + testActionSetId, + "-la", + "lookupurl", + "-w", + testOutputBasePath + }); new SparkCreateDedupRecord(parser, spark).run(isLookUpService); - long orgs_deduprecord = jsc + final long orgs_deduprecord = jsc .textFile(testOutputBasePath + "/" + testActionSetId + "/organization_deduprecord") .count(); - long pubs_deduprecord = jsc + final long pubs_deduprecord = jsc .textFile(testOutputBasePath + "/" + testActionSetId + "/publication_deduprecord") .count(); - long sw_deduprecord = jsc + final long sw_deduprecord = jsc .textFile(testOutputBasePath + "/" + testActionSetId + "/software_deduprecord") .count(); - long ds_deduprecord = jsc.textFile(testOutputBasePath + "/" + testActionSetId + "/dataset_deduprecord").count(); - long orp_deduprecord = jsc - .textFile( - testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_deduprecord") + final long ds_deduprecord = jsc + .textFile(testOutputBasePath + "/" + testActionSetId + "/dataset_deduprecord") + .count(); + final long orp_deduprecord = jsc + .textFile(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_deduprecord") .count(); assertEquals(85, orgs_deduprecord); @@ -404,29 +396,27 @@ public class SparkDedupTest implements Serializable { @Order(5) public void updateEntityTest() throws Exception { - ArgumentApplicationParser parser = new ArgumentApplicationParser( + final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils .toString( SparkUpdateEntity.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json"))); + .getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json"))); parser - .parseArgument( - new String[] { - "-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath - }); + .parseArgument(new String[] { + "-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath + }); new SparkUpdateEntity(parser, spark).run(isLookUpService); - long organizations = jsc.textFile(testDedupGraphBasePath + "/organization").count(); - long publications = jsc.textFile(testDedupGraphBasePath + "/publication").count(); - long projects = jsc.textFile(testDedupGraphBasePath + "/project").count(); - long datasource = jsc.textFile(testDedupGraphBasePath + "/datasource").count(); - long softwares = jsc.textFile(testDedupGraphBasePath + "/software").count(); - long dataset = jsc.textFile(testDedupGraphBasePath + "/dataset").count(); - long otherresearchproduct = jsc.textFile(testDedupGraphBasePath + "/otherresearchproduct").count(); + final long organizations = jsc.textFile(testDedupGraphBasePath + "/organization").count(); + final long publications = jsc.textFile(testDedupGraphBasePath + "/publication").count(); + final long projects = jsc.textFile(testDedupGraphBasePath + "/project").count(); + final long datasource = jsc.textFile(testDedupGraphBasePath + "/datasource").count(); + final long softwares = jsc.textFile(testDedupGraphBasePath + "/software").count(); + final long dataset = jsc.textFile(testDedupGraphBasePath + "/dataset").count(); + final long otherresearchproduct = jsc.textFile(testDedupGraphBasePath + "/otherresearchproduct").count(); - long mergedOrgs = spark + final long mergedOrgs = spark .read() .load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel") .as(Encoders.bean(Relation.class)) @@ -436,7 +426,7 @@ public class SparkDedupTest implements Serializable { .distinct() .count(); - long mergedPubs = spark + final long mergedPubs = spark .read() .load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel") .as(Encoders.bean(Relation.class)) @@ -446,7 +436,7 @@ public class SparkDedupTest implements Serializable { .distinct() .count(); - long mergedSw = spark + final long mergedSw = spark .read() .load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel") .as(Encoders.bean(Relation.class)) @@ -456,7 +446,7 @@ public class SparkDedupTest implements Serializable { .distinct() .count(); - long mergedDs = spark + final long mergedDs = spark .read() .load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel") .as(Encoders.bean(Relation.class)) @@ -466,7 +456,7 @@ public class SparkDedupTest implements Serializable { .distinct() .count(); - long mergedOrp = spark + final long mergedOrp = spark .read() .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel") .as(Encoders.bean(Relation.class)) @@ -484,27 +474,27 @@ public class SparkDedupTest implements Serializable { assertEquals(389, dataset); assertEquals(517, otherresearchproduct); - long deletedOrgs = jsc + final long deletedOrgs = jsc .textFile(testDedupGraphBasePath + "/organization") .filter(this::isDeletedByInference) .count(); - long deletedPubs = jsc + final long deletedPubs = jsc .textFile(testDedupGraphBasePath + "/publication") .filter(this::isDeletedByInference) .count(); - long deletedSw = jsc + final long deletedSw = jsc .textFile(testDedupGraphBasePath + "/software") .filter(this::isDeletedByInference) .count(); - long deletedDs = jsc + final long deletedDs = jsc .textFile(testDedupGraphBasePath + "/dataset") .filter(this::isDeletedByInference) .count(); - long deletedOrp = jsc + final long deletedOrp = jsc .textFile(testDedupGraphBasePath + "/otherresearchproduct") .filter(this::isDeletedByInference) .count(); @@ -520,21 +510,19 @@ public class SparkDedupTest implements Serializable { @Order(6) public void propagateRelationTest() throws Exception { - ArgumentApplicationParser parser = new ArgumentApplicationParser( + final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils .toString( SparkPropagateRelation.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json"))); + .getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json"))); parser - .parseArgument( - new String[] { - "-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath - }); + .parseArgument(new String[] { + "-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath + }); new SparkPropagateRelation(parser, spark).run(isLookUpService); - long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count(); + final long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count(); assertEquals(4862, relations); @@ -548,10 +536,9 @@ public class SparkDedupTest implements Serializable { .select(mergeRels.col("target")) .distinct() .toJavaRDD() - .mapToPair( - (PairFunction) r -> new Tuple2(r.getString(0), "d")); + .mapToPair((PairFunction) r -> new Tuple2<>(r.getString(0), "d")); - JavaRDD toCheck = jsc + final JavaRDD toCheck = jsc .textFile(testDedupGraphBasePath + "/relation") .mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.source", json), json)) .join(mergedIds) @@ -560,8 +547,8 @@ public class SparkDedupTest implements Serializable { .join(mergedIds) .map(t -> t._2()._1()); - long deletedbyinference = toCheck.filter(this::isDeletedByInference).count(); - long updated = toCheck.count(); + final long deletedbyinference = toCheck.filter(this::isDeletedByInference).count(); + final long updated = toCheck.count(); assertEquals(updated, deletedbyinference); } @@ -573,8 +560,8 @@ public class SparkDedupTest implements Serializable { testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_2.json", 10, 2); } - private void testUniqueness(String path, int expected_total, int expected_unique) { - Dataset rel = spark + private void testUniqueness(final String path, final int expected_total, final int expected_unique) { + final Dataset rel = spark .read() .textFile(getClass().getResource(path).getPath()) .map( @@ -591,7 +578,8 @@ public class SparkDedupTest implements Serializable { FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); } - public boolean isDeletedByInference(String s) { + public boolean isDeletedByInference(final String s) { return s.contains("\"deletedbyinference\":true"); } + }