From be320ba3c1c013d11da957acbce843b00bcae484 Mon Sep 17 00:00:00 2001 From: Serafeim Chatzopoulos Date: Mon, 17 Jul 2023 16:04:21 +0300 Subject: [PATCH] Indentation fixes --- .../dnetlib/dhp/actionmanager/Constants.java | 3 +- .../PrepareAffiliationRelations.java | 228 ++++++++++-------- .../AffiliationRelationDeserializer.java | 31 +-- .../model/AffiliationRelationModel.java | 11 +- .../bipaffiliations/job.properties | 11 +- .../bipaffiliations/oozie_app/workflow.xml | 4 +- .../PrepareAffiliationRelationsTest.java | 162 +++++++------ 7 files changed, 235 insertions(+), 215 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java index 8b0dab3a7..62556b16b 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java @@ -3,7 +3,6 @@ package eu.dnetlib.dhp.actionmanager; import java.util.Optional; -import eu.dnetlib.dhp.common.HdfsSupport; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -12,6 +11,7 @@ import org.apache.spark.sql.SparkSession; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; import eu.dnetlib.dhp.schema.oaf.Subject; @@ -94,6 +94,7 @@ public class Constants { return s; } + public static void removeOutputDir(SparkSession spark, String path) { HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java index 44870c0f8..9b5d4a2ca 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java @@ -8,12 +8,6 @@ import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; -import eu.dnetlib.dhp.actionmanager.Constants; -import eu.dnetlib.dhp.actionmanager.bipaffiliations.model.*; -import eu.dnetlib.dhp.actionmanager.ror.GenerateRorActionSetJob; -import eu.dnetlib.dhp.schema.action.AtomicAction; -import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; -import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.SequenceFileOutputFormat; @@ -22,17 +16,23 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.spark.sql.Dataset; import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.actionmanager.Constants; +import eu.dnetlib.dhp.actionmanager.bipaffiliations.model.*; +import eu.dnetlib.dhp.actionmanager.ror.GenerateRorActionSetJob; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; +import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import scala.Tuple2; @@ -41,125 +41,139 @@ import scala.Tuple2; */ public class PrepareAffiliationRelations implements Serializable { - private static final Logger log = LoggerFactory.getLogger(PrepareAffiliationRelations.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final String ID_PREFIX = "50|doi_________::"; - public static final String BIP_AFFILIATIONS_CLASSID = "result:organization:bipinference"; - public static final String BIP_AFFILIATIONS_CLASSNAME = "Affiliation relation inferred by BIP!"; - public static final String BIP_INFERENCE_PROVENANCE = "bip:affiliation:crossref"; + private static final Logger log = LoggerFactory.getLogger(PrepareAffiliationRelations.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final String ID_PREFIX = "50|doi_________::"; + public static final String BIP_AFFILIATIONS_CLASSID = "result:organization:bipinference"; + public static final String BIP_AFFILIATIONS_CLASSNAME = "Affiliation relation inferred by BIP!"; + public static final String BIP_INFERENCE_PROVENANCE = "bip:affiliation:crossref"; - public static void main(String[] args) throws Exception { + public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - PrepareAffiliationRelations.class - .getResourceAsStream( - "/eu/dnetlib/dhp/actionmanager/bipaffiliations/input_actionset_parameter.json")); + String jsonConfiguration = IOUtils + .toString( + PrepareAffiliationRelations.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/bipaffiliations/input_actionset_parameter.json")); - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); - Boolean isSparkSessionManaged = Constants.isSparkSessionManaged(parser); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + Boolean isSparkSessionManaged = Constants.isSparkSessionManaged(parser); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - final String inputPath = parser.get("inputPath"); - log.info("inputPath {}: ", inputPath); + final String inputPath = parser.get("inputPath"); + log.info("inputPath {}: ", inputPath); - final String outputPath = parser.get("outputPath"); - log.info("outputPath {}: ", outputPath); + final String outputPath = parser.get("outputPath"); + log.info("outputPath {}: ", outputPath); - SparkConf conf = new SparkConf(); + SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - Constants.removeOutputDir(spark, outputPath); - prepareAffiliationRelations(spark, inputPath, outputPath); - }); - } + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Constants.removeOutputDir(spark, outputPath); + prepareAffiliationRelations(spark, inputPath, outputPath); + }); + } - private static void prepareAffiliationRelations(SparkSession spark, String inputPath, String outputPath) { + private static void prepareAffiliationRelations(SparkSession spark, String inputPath, + String outputPath) { - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - // load and parse affiliation relations from HDFS - JavaRDD affiliationRelationsDeserializeRDD = sc - .textFile(inputPath) - .map(item -> OBJECT_MAPPER.readValue(item, AffiliationRelationDeserializer.class)); + // load and parse affiliation relations from HDFS + JavaRDD affiliationRelationsDeserializeRDD = sc + .textFile(inputPath) + .map(item -> OBJECT_MAPPER.readValue(item, AffiliationRelationDeserializer.class)); - // convert affiliation to an internal representation - Dataset affiliationRelations = - spark.createDataset( - affiliationRelationsDeserializeRDD.flatMap(entry -> - entry.getMatchings().stream().flatMap(matching -> - matching.getRorId().stream().map( rorId -> new AffiliationRelationModel( - entry.getDoi(), - rorId, - matching.getConfidence() - ))).collect(Collectors.toList()) - .iterator()) - .rdd(), - Encoders.bean(AffiliationRelationModel.class)); + // convert affiliation to an internal representation + Dataset affiliationRelations = spark + .createDataset( + affiliationRelationsDeserializeRDD + .flatMap( + entry -> entry + .getMatchings() + .stream() + .flatMap( + matching -> matching + .getRorId() + .stream() + .map( + rorId -> new AffiliationRelationModel( + entry.getDoi(), + rorId, + matching.getConfidence()))) + .collect(Collectors.toList()) + .iterator()) + .rdd(), + Encoders.bean(AffiliationRelationModel.class)); - // prepare action sets for affiliation relations - affiliationRelations - .flatMap((FlatMapFunction) affRel -> { + // prepare action sets for affiliation relations + affiliationRelations + .flatMap((FlatMapFunction) affRel -> { - // DOI to OpenAIRE id - final String paperId = ID_PREFIX + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", affRel.getDoi())); + // DOI to OpenAIRE id + final String paperId = ID_PREFIX + + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", affRel.getDoi())); - // ROR id to OpenAIRE id - final String affId = GenerateRorActionSetJob.calculateOpenaireId(affRel.getRorId()); + // ROR id to OpenAIRE id + final String affId = GenerateRorActionSetJob.calculateOpenaireId(affRel.getRorId()); - Qualifier qualifier = OafMapperUtils.qualifier( - BIP_AFFILIATIONS_CLASSID, - BIP_AFFILIATIONS_CLASSNAME, - ModelConstants.DNET_PROVENANCE_ACTIONS, - ModelConstants.DNET_PROVENANCE_ACTIONS); + Qualifier qualifier = OafMapperUtils + .qualifier( + BIP_AFFILIATIONS_CLASSID, + BIP_AFFILIATIONS_CLASSNAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS); - // format data info; setting `confidence` into relation's `trust` - DataInfo dataInfo = OafMapperUtils.dataInfo( - false, - BIP_INFERENCE_PROVENANCE, - true, - false, - qualifier, - Double.toString(affRel.getConfidence())); + // format data info; setting `confidence` into relation's `trust` + DataInfo dataInfo = OafMapperUtils + .dataInfo( + false, + BIP_INFERENCE_PROVENANCE, + true, + false, + qualifier, + Double.toString(affRel.getConfidence())); - // return bi-directional relations - return getAffiliationRelationPair(paperId, affId, dataInfo).iterator(); + // return bi-directional relations + return getAffiliationRelationPair(paperId, affId, dataInfo).iterator(); - }, Encoders.bean(Relation.class)) - .toJavaRDD() - .map(p -> new AtomicAction(Relation.class, p)) - .mapToPair( - aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), - new Text(OBJECT_MAPPER.writeValueAsString(aa)))) - .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); + }, Encoders.bean(Relation.class)) + .toJavaRDD() + .map(p -> new AtomicAction(Relation.class, p)) + .mapToPair( + aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), + new Text(OBJECT_MAPPER.writeValueAsString(aa)))) + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); - } + } - private static List getAffiliationRelationPair(String paperId, String affId, DataInfo dataInfo) { - return Arrays.asList( - OafMapperUtils.getRelation( - paperId, - affId, - ModelConstants.RESULT_ORGANIZATION, - ModelConstants.AFFILIATION, - ModelConstants.HAS_AUTHOR_INSTITUTION, - null, - dataInfo, - null), - OafMapperUtils.getRelation( - affId, - paperId, - ModelConstants.RESULT_ORGANIZATION, - ModelConstants.AFFILIATION, - ModelConstants.IS_AUTHOR_INSTITUTION_OF, - null, - dataInfo, - null) - ); - } + private static List getAffiliationRelationPair(String paperId, String affId, DataInfo dataInfo) { + return Arrays + .asList( + OafMapperUtils + .getRelation( + paperId, + affId, + ModelConstants.RESULT_ORGANIZATION, + ModelConstants.AFFILIATION, + ModelConstants.HAS_AUTHOR_INSTITUTION, + null, + dataInfo, + null), + OafMapperUtils + .getRelation( + affId, + paperId, + ModelConstants.RESULT_ORGANIZATION, + ModelConstants.AFFILIATION, + ModelConstants.IS_AUTHOR_INSTITUTION_OF, + null, + dataInfo, + null)); + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/model/AffiliationRelationDeserializer.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/model/AffiliationRelationDeserializer.java index 450a8c175..ef4f200d0 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/model/AffiliationRelationDeserializer.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/model/AffiliationRelationDeserializer.java @@ -1,26 +1,27 @@ -package eu.dnetlib.dhp.actionmanager.bipaffiliations.model; -import com.fasterxml.jackson.annotation.JsonProperty; -import lombok.Data; +package eu.dnetlib.dhp.actionmanager.bipaffiliations.model; import java.io.Serializable; import java.util.List; +import com.fasterxml.jackson.annotation.JsonProperty; + +import lombok.Data; + @Data public class AffiliationRelationDeserializer implements Serializable { - @JsonProperty("DOI") - private String doi; - @JsonProperty("Matchings") - private List matchings; + @JsonProperty("DOI") + private String doi; + @JsonProperty("Matchings") + private List matchings; - @Data - public static class Matching implements Serializable { - @JsonProperty("RORid") - private List rorId; - @JsonProperty("Confidence") - private double confidence; + @Data + public static class Matching implements Serializable { + @JsonProperty("RORid") + private List rorId; + @JsonProperty("Confidence") + private double confidence; - } + } } - diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/model/AffiliationRelationModel.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/model/AffiliationRelationModel.java index 8689914ee..6509f56ac 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/model/AffiliationRelationModel.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/model/AffiliationRelationModel.java @@ -1,15 +1,16 @@ + package eu.dnetlib.dhp.actionmanager.bipaffiliations.model; +import java.io.Serializable; + import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; -import java.io.Serializable; - @Data @AllArgsConstructor public class AffiliationRelationModel implements Serializable { - private String doi; - private String rorId; - private double confidence; + private String doi; + private String rorId; + private double confidence; } diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/job.properties b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/job.properties index dce59a31f..43d86ee09 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/job.properties +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/job.properties @@ -28,15 +28,8 @@ oozie.use.system.libpath=true spark2ExtraListeners=com.cloudera.spark.lineage.NavigatorAppListener spark2SqlQueryExecutionListeners=com.cloudera.spark.lineage.NavigatorQueryListener -# I think this should be the oozie workflow directory -# oozieWorkflowPath=/user/ilias.kanellos/workflow_example/ - - -# The workflow application path -wfAppPath=${oozieTopWfApplicationPath} - # The following is needed as a property of a workflow oozie.wf.application.path=${oozieTopWfApplicationPath} -inputPath=/user/schatz/affiliations/data-v3.json -outputPath=/tmp/crossref-affiliations-output-v3 +inputPath=/user/schatz/affiliations/data-v3.1.json +outputPath=/tmp/crossref-affiliations-output-v3.1 diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml index 31f35adfd..9930cfe17 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml @@ -1,4 +1,4 @@ - + @@ -84,7 +84,7 @@ yarn cluster Produces the atomic action with the inferred by BIP! affiliation relations from Crossref - eu.dnetlib.dhp.actionmanager.bipaffiliations.PrepareAffiliationRelations/class> + eu.dnetlib.dhp.actionmanager.bipaffiliations.PrepareAffiliationRelations dhp-aggregation-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java index 7e2fc5a39..72aabde7f 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java @@ -1,3 +1,4 @@ + package eu.dnetlib.dhp.actionmanager.bipaffiliations; import static org.junit.jupiter.api.Assertions.*; @@ -6,10 +7,6 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; -import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; import org.apache.commons.io.FileUtils; import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; @@ -29,107 +26,120 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; +import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; public class PrepareAffiliationRelationsTest { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static SparkSession spark; + private static SparkSession spark; - private static Path workingDir; - private static final String ID_PREFIX = "50|doi_________::"; - private static final Logger log = LoggerFactory - .getLogger(PrepareAffiliationRelationsTest.class); + private static Path workingDir; + private static final String ID_PREFIX = "50|doi_________::"; + private static final Logger log = LoggerFactory + .getLogger(PrepareAffiliationRelationsTest.class); - @BeforeAll - public static void beforeAll() throws IOException { - workingDir = Files.createTempDirectory(PrepareAffiliationRelationsTest.class.getSimpleName()); + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files.createTempDirectory(PrepareAffiliationRelationsTest.class.getSimpleName()); - log.info("Using work dir {}", workingDir); + log.info("Using work dir {}", workingDir); - SparkConf conf = new SparkConf(); - conf.setAppName(PrepareAffiliationRelationsTest.class.getSimpleName()); + SparkConf conf = new SparkConf(); + conf.setAppName(PrepareAffiliationRelationsTest.class.getSimpleName()); - conf.setMaster("local[*]"); - conf.set("spark.driver.host", "localhost"); - conf.set("hive.metastore.local", "true"); - conf.set("spark.ui.enabled", "false"); - conf.set("spark.sql.warehouse.dir", workingDir.toString()); - conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); - spark = SparkSession - .builder() - .appName(PrepareAffiliationRelationsTest.class.getSimpleName()) - .config(conf) - .getOrCreate(); - } + spark = SparkSession + .builder() + .appName(PrepareAffiliationRelationsTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } - @AfterAll - public static void afterAll() throws IOException { - FileUtils.deleteDirectory(workingDir.toFile()); - spark.stop(); - } + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } - @Test - void testMatch() throws Exception { + @Test + void testMatch() throws Exception { - String affiliationRelationsPath = getClass() - .getResource("/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json") - .getPath(); + String affiliationRelationsPath = getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json") + .getPath(); - String outputPath = workingDir.toString() + "/actionSet"; + String outputPath = workingDir.toString() + "/actionSet"; - PrepareAffiliationRelations - .main( - new String[] { - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-inputPath", affiliationRelationsPath, - "-outputPath", outputPath - }); + PrepareAffiliationRelations + .main( + new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-inputPath", affiliationRelationsPath, + "-outputPath", outputPath + }); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - JavaRDD tmp = sc - .sequenceFile(outputPath, Text.class, Text.class) - .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) - .map(aa -> ((Relation) aa.getPayload())); + JavaRDD tmp = sc + .sequenceFile(outputPath, Text.class, Text.class) + .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) + .map(aa -> ((Relation) aa.getPayload())); // for (Relation r : tmp.collect()) { // System.out.println( // r.getSource() + "\t" + r.getTarget() + "\t" + r.getRelType() + "\t" + r.getRelClass() + "\t" + r.getSubRelType() + "\t" + r.getValidationDate() + "\t" + r.getDataInfo().getTrust() + "\t" + r.getDataInfo().getInferred() // ); // } - // count the number of relations - assertEquals(16, tmp.count()); + // count the number of relations + assertEquals(16, tmp.count()); - Dataset dataset = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class)); - dataset.createOrReplaceTempView("result"); + Dataset dataset = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class)); + dataset.createOrReplaceTempView("result"); - Dataset execVerification = spark.sql("select r.relType, r.relClass, r.source, r.target, r.dataInfo.trust from result r"); + Dataset execVerification = spark + .sql("select r.relType, r.relClass, r.source, r.target, r.dataInfo.trust from result r"); - // verify that we have equal number of bi-directional relations - Assertions.assertEquals(8, execVerification - .filter( - "relClass='" + ModelConstants.HAS_AUTHOR_INSTITUTION +"'") - .collectAsList() - .size()); + // verify that we have equal number of bi-directional relations + Assertions + .assertEquals( + 8, execVerification + .filter( + "relClass='" + ModelConstants.HAS_AUTHOR_INSTITUTION + "'") + .collectAsList() + .size()); - Assertions.assertEquals(8, execVerification - .filter( - "relClass='" + ModelConstants.IS_AUTHOR_INSTITUTION_OF +"'") - .collectAsList() - .size()); + Assertions + .assertEquals( + 8, execVerification + .filter( + "relClass='" + ModelConstants.IS_AUTHOR_INSTITUTION_OF + "'") + .collectAsList() + .size()); - // check confidence value of a specific relation - String sourceDOI = "10.1105/tpc.8.3.343"; + // check confidence value of a specific relation + String sourceDOI = "10.1105/tpc.8.3.343"; - final String sourceOpenaireId = ID_PREFIX + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", sourceDOI)); + final String sourceOpenaireId = ID_PREFIX + + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", sourceDOI)); - Assertions.assertEquals("0.7071067812", execVerification - .filter( - "source='" + sourceOpenaireId +"'") - .collectAsList().get(0).getString(4)); + Assertions + .assertEquals( + "0.7071067812", execVerification + .filter( + "source='" + sourceOpenaireId + "'") + .collectAsList() + .get(0) + .getString(4)); - } + } } -