From 5b3ed70808422c8fc34cf20562f01b3f9cbb3d9e Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Tue, 1 Dec 2020 14:31:34 +0100 Subject: [PATCH] refactoring --- .../bipfinder/BipDeserialize.java | 22 +- .../dhp/actionmanager/bipfinder/BipScore.java | 29 +- .../dhp/actionmanager/bipfinder/KeyValue.java | 29 +- .../bipfinder/PreparedResult.java | 29 +- .../dhp/actionmanager/bipfinder/Score.java | 29 +- .../bipfinder/SparkAtomicActionScoreJob.java | 306 ++++++------ .../SparkAtomicActionScoreJobTest.java | 437 ++++++++++-------- 7 files changed, 480 insertions(+), 401 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/BipDeserialize.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/BipDeserialize.java index e68b912b2..2ec3679ce 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/BipDeserialize.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/BipDeserialize.java @@ -1,3 +1,4 @@ + package eu.dnetlib.dhp.actionmanager.bipfinder; import java.io.Serializable; @@ -5,19 +6,18 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; +public class BipDeserialize extends HashMap> implements Serializable { -public class BipDeserialize extends HashMap> implements Serializable { + public BipDeserialize() { + super(); + } - public BipDeserialize(){ - super(); - } + public List get(String key) { - public List get(String key) { - - if (super.get(key) == null) { - return new ArrayList<>(); - } - return super.get(key); - } + if (super.get(key) == null) { + return new ArrayList<>(); + } + return super.get(key); + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/BipScore.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/BipScore.java index b1e381955..3aa2488cd 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/BipScore.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/BipScore.java @@ -1,25 +1,26 @@ + package eu.dnetlib.dhp.actionmanager.bipfinder; import java.io.Serializable; import java.util.List; public class BipScore implements Serializable { - private String id; - private List scoreList; + private String id; + private List scoreList; - public String getId() { - return id; - } + public String getId() { + return id; + } - public void setId(String id) { - this.id = id; - } + public void setId(String id) { + this.id = id; + } - public List getScoreList() { - return scoreList; - } + public List getScoreList() { + return scoreList; + } - public void setScoreList(List scoreList) { - this.scoreList = scoreList; - } + public void setScoreList(List scoreList) { + this.scoreList = scoreList; + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/KeyValue.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/KeyValue.java index 1fadae6fe..6909a9634 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/KeyValue.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/KeyValue.java @@ -1,25 +1,26 @@ + package eu.dnetlib.dhp.actionmanager.bipfinder; import java.io.Serializable; public class KeyValue implements Serializable { - private String key; - private String value; + private String key; + private String value; - public String getKey() { - return key; - } + public String getKey() { + return key; + } - public void setKey(String key) { - this.key = key; - } + public void setKey(String key) { + this.key = key; + } - public String getValue() { - return value; - } + public String getValue() { + return value; + } - public void setValue(String value) { - this.value = value; - } + public void setValue(String value) { + this.value = value; + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/PreparedResult.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/PreparedResult.java index bb7648f0c..3e55ec6bc 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/PreparedResult.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/PreparedResult.java @@ -1,24 +1,25 @@ + package eu.dnetlib.dhp.actionmanager.bipfinder; import java.io.Serializable; public class PreparedResult implements Serializable { - private String id; //openaire id - private String value; //doi + private String id; // openaire id + private String value; // doi - public String getId() { - return id; - } + public String getId() { + return id; + } - public void setId(String id) { - this.id = id; - } + public void setId(String id) { + this.id = id; + } - public String getValue() { - return value; - } + public String getValue() { + return value; + } - public void setValue(String value) { - this.value = value; - } + public void setValue(String value) { + this.value = value; + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/Score.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/Score.java index b14d971cd..17423fe96 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/Score.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/Score.java @@ -1,3 +1,4 @@ + package eu.dnetlib.dhp.actionmanager.bipfinder; import java.io.Serializable; @@ -5,22 +6,22 @@ import java.util.List; public class Score implements Serializable { - private String id; - private List unit; + private String id; + private List unit; - public String getId() { - return id; - } + public String getId() { + return id; + } - public void setId(String id) { - this.id = id; - } + public void setId(String id) { + this.id = id; + } - public List getUnit() { - return unit; - } + public List getUnit() { + return unit; + } - public void setUnit(List unit) { - this.unit = unit; - } + public void setUnit(List unit) { + this.unit = unit; + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java index 658555e05..1628fca39 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java @@ -1,11 +1,13 @@ + package eu.dnetlib.dhp.actionmanager.bipfinder; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.schema.action.AtomicAction; -import eu.dnetlib.dhp.schema.oaf.*; -import eu.dnetlib.dhp.schema.oaf.KeyValue; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + import org.apache.commons.io.IOUtils; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos; import org.apache.hadoop.io.Text; @@ -22,182 +24,188 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.KeyValue; import scala.Tuple2; -import java.io.Serializable; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - public class SparkAtomicActionScoreJob implements Serializable { - private static String DOI = "doi"; - private static final Logger log = LoggerFactory.getLogger(SparkAtomicActionScoreJob.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static String DOI = "doi"; + private static final Logger log = LoggerFactory.getLogger(SparkAtomicActionScoreJob.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static void main(String[] args) throws Exception { + public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - SparkAtomicActionScoreJob.class - .getResourceAsStream( - "/eu/dnetlib/dhp/actionmanager/bipfinder/input_parameters.json")); + String jsonConfiguration = IOUtils + .toString( + SparkAtomicActionScoreJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/bipfinder/input_parameters.json")); - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); + parser.parseArgument(args); - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - final String inputPath = parser.get("inputPath"); - log.info("inputPath {}: ", inputPath); + final String inputPath = parser.get("inputPath"); + log.info("inputPath {}: ", inputPath); - final String outputPath = parser.get("outputPath"); - log.info("outputPath {}: ", outputPath); + final String outputPath = parser.get("outputPath"); + log.info("outputPath {}: ", outputPath); - final String bipScorePath = parser.get("bipScorePath"); - log.info("bipScorePath: {}", bipScorePath); + final String bipScorePath = parser.get("bipScorePath"); + log.info("bipScorePath: {}", bipScorePath); - final String resultClassName = parser.get("resultTableName"); - log.info("resultTableName: {}", resultClassName); + final String resultClassName = parser.get("resultTableName"); + log.info("resultTableName: {}", resultClassName); - Class inputClazz = (Class) Class.forName(resultClassName); + Class inputClazz = (Class) Class.forName(resultClassName); - SparkConf conf = new SparkConf(); + SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - removeOutputDir(spark, outputPath); - prepareResults(spark, inputPath, outputPath, bipScorePath, inputClazz); - }); - } + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + removeOutputDir(spark, outputPath); + prepareResults(spark, inputPath, outputPath, bipScorePath, inputClazz); + }); + } - private static void prepareResults(SparkSession spark, String inputPath, String outputPath, - String bipScorePath, Class inputClazz) { + private static void prepareResults(SparkSession spark, String inputPath, String outputPath, + String bipScorePath, Class inputClazz) { - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - JavaRDD bipDeserializeJavaRDD = sc.textFile(bipScorePath) - .map(item -> OBJECT_MAPPER.readValue(item, BipDeserialize.class)); + JavaRDD bipDeserializeJavaRDD = sc + .textFile(bipScorePath) + .map(item -> OBJECT_MAPPER.readValue(item, BipDeserialize.class)); + Dataset bipScores = spark + .createDataset(bipDeserializeJavaRDD.flatMap(entry -> entry.keySet().stream().map(key -> { + BipScore bs = new BipScore(); + bs.setId(key); + bs.setScoreList(entry.get(key)); + return bs; + }).collect(Collectors.toList()).iterator()).rdd(), Encoders.bean(BipScore.class)); - Dataset bipScores = spark.createDataset(bipDeserializeJavaRDD.flatMap(entry -> - entry.keySet().stream().map(key -> { - BipScore bs = new BipScore(); - bs.setId(key); - bs.setScoreList(entry.get(key)); - return bs; - }).collect(Collectors.toList()).iterator() - ).rdd(), Encoders.bean(BipScore.class)); + System.out.println(bipScores.count()); - System.out.println(bipScores.count()); + Dataset results = readPath(spark, inputPath, inputClazz); - Dataset results = readPath(spark, inputPath, inputClazz); + results.createOrReplaceTempView("result"); - results.createOrReplaceTempView("result"); + Dataset preparedResult = spark + .sql( + "select pIde.value value, id " + + "from result " + + "lateral view explode (pid) p as pIde " + + "where dataInfo.deletedbyinference = false and pIde.qualifier.classid = '" + DOI + "'") + .as(Encoders.bean(PreparedResult.class)); - Dataset preparedResult = spark.sql("select pIde.value value, id " + - "from result " + - "lateral view explode (pid) p as pIde " + - "where dataInfo.deletedbyinference = false and pIde.qualifier.classid = '" + DOI + "'") - .as(Encoders.bean(PreparedResult.class)); + Dataset tmp = bipScores + .joinWith( + preparedResult, bipScores.col("id").equalTo(preparedResult.col("value")), + "inner") + .map((MapFunction, BipScore>) value -> { + BipScore ret = value._1(); + ret.setId(value._2().getId()); + return ret; + }, Encoders.bean(BipScore.class)); - Dataset tmp = bipScores.joinWith(preparedResult, bipScores.col("id").equalTo(preparedResult.col("value")), - "inner") - .map((MapFunction, BipScore>) value -> { - BipScore ret = value._1(); - ret.setId(value._2().getId()); - return ret; - }, Encoders.bean(BipScore.class)); + tmp + .groupByKey((MapFunction) value -> value.getId(), Encoders.STRING()) + .mapGroups((MapGroupsFunction) (k, it) -> { + Result ret = inputClazz.newInstance(); + BipScore first = it.next(); + ret.setId(first.getId()); - tmp.groupByKey((MapFunction) value -> value.getId(), Encoders.STRING()) - .mapGroups((MapGroupsFunction) (k, it) -> - { - Result ret = inputClazz.newInstance(); - BipScore first = it.next(); - ret.setId(first.getId()); + ret.setMeasures(getMeasure(first)); + it.forEachRemaining(value -> ret.getMeasures().addAll(getMeasure(value))); - ret.setMeasures(getMeasure(first)); - it.forEachRemaining(value -> ret.getMeasures().addAll(getMeasure(value))); + return (I) ret; + }, Encoders.bean(inputClazz)) + .toJavaRDD() + .map(p -> new AtomicAction(inputClazz, p)) + .mapToPair( + aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), + new Text(OBJECT_MAPPER.writeValueAsString(aa)))) + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); + ; + } - return (I)ret; - }, Encoders.bean(inputClazz)) - .toJavaRDD() - .map(p -> new AtomicAction(inputClazz, p)) - .mapToPair( - aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), - new Text(OBJECT_MAPPER.writeValueAsString(aa)))) - .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); - ; - } + public static Dataset getBipScoreDataset(Dataset bipdeserialized) { + return bipdeserialized + .flatMap((FlatMapFunction) bip -> bip.keySet().stream().map(key -> { + BipScore bs = new BipScore(); + bs.setId(key); + bs.setScoreList(bip.get(key)); + return bs; + }).collect(Collectors.toList()).iterator(), Encoders.bean(BipScore.class)); + } - public static Dataset getBipScoreDataset(Dataset bipdeserialized){ - return bipdeserialized.flatMap((FlatMapFunction) bip -> - bip.keySet().stream().map(key -> { - BipScore bs = new BipScore(); - bs.setId(key); - bs.setScoreList(bip.get(key)); - return bs; - }).collect(Collectors.toList()).iterator() - , Encoders.bean(BipScore.class)); - } + private static List getMeasure(BipScore value) { + return value + .getScoreList() + .stream() + .map(score -> { + Measure m = new Measure(); + m.setId(score.getId()); + m + .setUnit( + score + .getUnit() + .stream() + .map(unit -> { + KeyValue kv = new KeyValue(); + kv.setValue(unit.getValue()); + kv.setKey(unit.getKey()); + kv.setDataInfo(getDataInfo()); + return kv; + }) + .collect(Collectors.toList())); + return m; + }) + .collect(Collectors.toList()); + } - private static List getMeasure(BipScore value) { - return value.getScoreList() - .stream() - .map(score -> { - Measure m = new Measure(); - m.setId(score.getId()); - m.setUnit(score.getUnit().stream() - .map(unit -> { - KeyValue kv = new KeyValue(); - kv.setValue(unit.getValue()); - kv.setKey(unit.getKey()); - kv.setDataInfo(getDataInfo()); - return kv; - }).collect(Collectors.toList())); - return m; - }).collect(Collectors.toList()); - } + private static DataInfo getDataInfo() { + DataInfo di = new DataInfo(); + di.setInferred(false); + di.setInvisible(false); + di.setDeletedbyinference(false); + di.setTrust(""); + Qualifier qualifier = new Qualifier(); + qualifier.setClassid("sysimport:actionset"); + qualifier.setClassname("Harvested"); + qualifier.setSchemename("dnet:provenanceActions"); + qualifier.setSchemeid("dnet:provenanceActions"); + di.setProvenanceaction(qualifier); + return di; + } + private static void removeOutputDir(SparkSession spark, String path) { + HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); + } - - private static DataInfo getDataInfo() { - DataInfo di = new DataInfo(); - di.setInferred(false); - di.setInvisible(false); - di.setDeletedbyinference(false); - di.setTrust(""); - Qualifier qualifier = new Qualifier(); - qualifier.setClassid("sysimport:actionset"); - qualifier.setClassname("Harvested"); - qualifier.setSchemename("dnet:provenanceActions"); - qualifier.setSchemeid("dnet:provenanceActions"); - di.setProvenanceaction(qualifier); - return di; - } - - - private static void removeOutputDir(SparkSession spark, String path) { - HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); - } - - public static Dataset readPath( - SparkSession spark, String inputPath, Class clazz) { - return spark - .read() - .textFile(inputPath) - .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); - } - + public static Dataset readPath( + SparkSession spark, String inputPath, Class clazz) { + return spark + .read() + .textFile(inputPath) + .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); + } } diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJobTest.java index 3b360f6ed..4b02e7485 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJobTest.java @@ -1,9 +1,12 @@ + package eu.dnetlib.dhp.actionmanager.bipfinder; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.schema.action.AtomicAction; -import eu.dnetlib.dhp.schema.oaf.*; -import eu.dnetlib.dhp.schema.oaf.KeyValue; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.stream.Collectors; + import org.apache.commons.io.FileUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.SequenceFileOutputFormat; @@ -24,241 +27,305 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Tuple2; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.List; -import java.util.stream.Collectors; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import scala.Tuple2; public class SparkAtomicActionScoreJobTest { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static SparkSession spark; + private static SparkSession spark; - private static Path workingDir; - private static final Logger log = LoggerFactory - .getLogger(SparkAtomicActionScoreJobTest.class); + private static Path workingDir; + private static final Logger log = LoggerFactory + .getLogger(SparkAtomicActionScoreJobTest.class); - @BeforeAll - public static void beforeAll() throws IOException { - workingDir = Files - .createTempDirectory(SparkAtomicActionScoreJobTest.class.getSimpleName()); - log.info("using work dir {}", workingDir); + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files + .createTempDirectory(SparkAtomicActionScoreJobTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); - SparkConf conf = new SparkConf(); - conf.setAppName(SparkAtomicActionScoreJobTest.class.getSimpleName()); + SparkConf conf = new SparkConf(); + conf.setAppName(SparkAtomicActionScoreJobTest.class.getSimpleName()); - conf.setMaster("local[*]"); - conf.set("spark.driver.host", "localhost"); - conf.set("hive.metastore.local", "true"); - conf.set("spark.ui.enabled", "false"); - conf.set("spark.sql.warehouse.dir", workingDir.toString()); - conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); - spark = SparkSession - .builder() - .appName(SparkAtomicActionScoreJobTest.class.getSimpleName()) - .config(conf) - .getOrCreate(); - } + spark = SparkSession + .builder() + .appName(SparkAtomicActionScoreJobTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } - @AfterAll - public static void afterAll() throws IOException { - FileUtils.deleteDirectory(workingDir.toFile()); - spark.stop(); - } + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } - @Test - public void matchOne() throws Exception { - String bipScoresPath = getClass().getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/bip_scores.json").getPath(); - String inputPath = getClass() - .getResource( - "/eu/dnetlib/dhp/actionmanager/bipfinder/publication.json") - .getPath(); + @Test + public void matchOne() throws Exception { + String bipScoresPath = getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/bip_scores.json") + .getPath(); + String inputPath = getClass() + .getResource( + "/eu/dnetlib/dhp/actionmanager/bipfinder/publication.json") + .getPath(); - SparkAtomicActionScoreJob.main( - new String[] { - "-isSparkSessionManaged", - Boolean.FALSE.toString(), - "-inputPath", - inputPath, - "-bipScorePath", - bipScoresPath, - "-resultTableName", - "eu.dnetlib.dhp.schema.oaf.Publication", - "-outputPath", - workingDir.toString() + "/actionSet" - }); + SparkAtomicActionScoreJob + .main( + new String[] { + "-isSparkSessionManaged", + Boolean.FALSE.toString(), + "-inputPath", + inputPath, + "-bipScorePath", + bipScoresPath, + "-resultTableName", + "eu.dnetlib.dhp.schema.oaf.Publication", + "-outputPath", + workingDir.toString() + "/actionSet" + }); + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + JavaRDD tmp = sc + .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) + .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) + .map(aa -> ((Publication) aa.getPayload())); - JavaRDD tmp = sc - .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) - .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) - .map(aa -> ((Publication) aa.getPayload())); + Assertions.assertTrue(tmp.count() == 1); - Assertions.assertTrue(tmp.count() == 1); + Dataset verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Publication.class)); + verificationDataset.createOrReplaceTempView("publication"); - Dataset verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Publication.class)); - verificationDataset.createOrReplaceTempView("publication"); + Dataset execVerification = spark + .sql( + "Select p.id oaid, mes.id, mUnit.value from publication p " + + "lateral view explode(measures) m as mes " + + "lateral view explode(mes.unit) u as mUnit "); - Dataset execVerification = spark.sql("Select p.id oaid, mes.id, mUnit.value from publication p " + - "lateral view explode(measures) m as mes " + - "lateral view explode(mes.unit) u as mUnit "); + Assertions.assertEquals(2, execVerification.count()); - Assertions.assertEquals(2, execVerification.count()); + Assertions + .assertEquals( + "50|355e65625b88::ffa5bad14f4adc0c9a15c00efbbccddb", + execVerification.select("oaid").collectAsList().get(0).getString(0)); - Assertions.assertEquals("50|355e65625b88::ffa5bad14f4adc0c9a15c00efbbccddb", - execVerification.select("oaid").collectAsList().get(0).getString(0)); + Assertions + .assertEquals( + "1.47565045883e-08", + execVerification.filter("id = 'influence'").select("value").collectAsList().get(0).getString(0)); - Assertions.assertEquals("1.47565045883e-08", - execVerification.filter("id = 'influence'").select("value").collectAsList().get(0).getString(0)); + Assertions + .assertEquals( + "0.227515392", + execVerification.filter("id = 'popularity'").select("value").collectAsList().get(0).getString(0)); - Assertions.assertEquals("0.227515392", - execVerification.filter("id = 'popularity'").select("value").collectAsList().get(0).getString(0)); + } - } + @Test + public void matchOneWithTwo() throws Exception { + String bipScoresPath = getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/bip_scores.json") + .getPath(); + String inputPath = getClass() + .getResource( + "/eu/dnetlib/dhp/actionmanager/bipfinder/publication_2.json") + .getPath(); - @Test - public void matchOneWithTwo() throws Exception { - String bipScoresPath = getClass().getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/bip_scores.json").getPath(); - String inputPath = getClass() - .getResource( - "/eu/dnetlib/dhp/actionmanager/bipfinder/publication_2.json") - .getPath(); + SparkAtomicActionScoreJob + .main( + new String[] { + "-isSparkSessionManaged", + Boolean.FALSE.toString(), + "-inputPath", + inputPath, + "-bipScorePath", + bipScoresPath, + "-resultTableName", + "eu.dnetlib.dhp.schema.oaf.Publication", + "-outputPath", + workingDir.toString() + "/actionSet" + }); - SparkAtomicActionScoreJob.main( - new String[] { - "-isSparkSessionManaged", - Boolean.FALSE.toString(), - "-inputPath", - inputPath, - "-bipScorePath", - bipScoresPath, - "-resultTableName", - "eu.dnetlib.dhp.schema.oaf.Publication", - "-outputPath", - workingDir.toString() + "/actionSet" - }); + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + JavaRDD tmp = sc + .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) + .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) + .map(aa -> ((Publication) aa.getPayload())); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + Assertions.assertTrue(tmp.count() == 1); - JavaRDD tmp = sc - .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) - .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) - .map(aa -> ((Publication) aa.getPayload())); + Dataset verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Publication.class)); + verificationDataset.createOrReplaceTempView("publication"); - Assertions.assertTrue(tmp.count() == 1); + Dataset execVerification = spark + .sql( + "Select p.id oaid, mes.id, mUnit.value from publication p " + + "lateral view explode(measures) m as mes " + + "lateral view explode(mes.unit) u as mUnit "); - Dataset verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Publication.class)); - verificationDataset.createOrReplaceTempView("publication"); + Assertions.assertEquals(4, execVerification.count()); - Dataset execVerification = spark.sql("Select p.id oaid, mes.id, mUnit.value from publication p " + - "lateral view explode(measures) m as mes " + - "lateral view explode(mes.unit) u as mUnit "); + Assertions + .assertEquals( + "50|355e65625b88::ffa5bad14f4adc0c9a15c00efbbccddb", + execVerification.select("oaid").collectAsList().get(0).getString(0)); - Assertions.assertEquals(4, execVerification.count()); + Assertions + .assertEquals( + 2, + execVerification.filter("id = 'influence'").count()); + Assertions + .assertEquals( + 2, + execVerification.filter("id = 'popularity'").count()); - Assertions.assertEquals("50|355e65625b88::ffa5bad14f4adc0c9a15c00efbbccddb", - execVerification.select("oaid").collectAsList().get(0).getString(0)); + List tmp_ds = execVerification.filter("id = 'influence'").select("value").collectAsList(); + String tmp_influence = tmp_ds.get(0).getString(0); + Assertions + .assertTrue( + "1.47565045883e-08".equals(tmp_influence) || + "1.98956540239e-08".equals(tmp_influence)); - Assertions.assertEquals(2, - execVerification.filter("id = 'influence'").count()); + tmp_influence = tmp_ds.get(1).getString(0); + Assertions + .assertTrue( + "1.47565045883e-08".equals(tmp_influence) || + "1.98956540239e-08".equals(tmp_influence)); - Assertions.assertEquals(2, - execVerification.filter("id = 'popularity'").count()); + Assertions.assertTrue(!tmp_ds.get(0).getString(0).equals(tmp_ds.get(1).getString(0))); - List tmp_ds = execVerification.filter("id = 'influence'").select("value").collectAsList(); - String tmp_influence = tmp_ds.get(0).getString(0); - Assertions.assertTrue("1.47565045883e-08".equals(tmp_influence)|| - "1.98956540239e-08".equals(tmp_influence) ); + } - tmp_influence = tmp_ds.get(1).getString(0); - Assertions.assertTrue("1.47565045883e-08".equals(tmp_influence)|| - "1.98956540239e-08".equals(tmp_influence) ); + @Test + public void matchTwo() throws Exception { + String bipScoresPath = getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/bip_scores.json") + .getPath(); + String inputPath = getClass() + .getResource( + "/eu/dnetlib/dhp/actionmanager/bipfinder/publication_3.json") + .getPath(); - Assertions.assertTrue(!tmp_ds.get(0).getString(0).equals(tmp_ds.get(1).getString(0))); + SparkAtomicActionScoreJob + .main( + new String[] { + "-isSparkSessionManaged", + Boolean.FALSE.toString(), + "-inputPath", + inputPath, + "-bipScorePath", + bipScoresPath, + "-resultTableName", + "eu.dnetlib.dhp.schema.oaf.Publication", + "-outputPath", + workingDir.toString() + "/actionSet" + }); + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - } + JavaRDD tmp = sc + .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) + .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) + .map(aa -> ((Publication) aa.getPayload())); - @Test - public void matchTwo() throws Exception { - String bipScoresPath = getClass().getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/bip_scores.json").getPath(); - String inputPath = getClass() - .getResource( - "/eu/dnetlib/dhp/actionmanager/bipfinder/publication_3.json") - .getPath(); + Assertions.assertTrue(tmp.count() == 2); - SparkAtomicActionScoreJob.main( - new String[] { - "-isSparkSessionManaged", - Boolean.FALSE.toString(), - "-inputPath", - inputPath, - "-bipScorePath", - bipScoresPath, - "-resultTableName", - "eu.dnetlib.dhp.schema.oaf.Publication", - "-outputPath", - workingDir.toString() + "/actionSet" - }); + Dataset verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Publication.class)); + verificationDataset.createOrReplaceTempView("publication"); + Dataset execVerification = spark + .sql( + "Select p.id oaid, mes.id, mUnit.value from publication p " + + "lateral view explode(measures) m as mes " + + "lateral view explode(mes.unit) u as mUnit "); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + Assertions.assertEquals(4, execVerification.count()); - JavaRDD tmp = sc - .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) - .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) - .map(aa -> ((Publication) aa.getPayload())); + Assertions + .assertEquals( + 2, + execVerification.filter("oaid = '50|355e65625b88::ffa5bad14f4adc0c9a15c00efbbccddb'").count()); - Assertions.assertTrue(tmp.count() == 2); + Assertions + .assertEquals( + 2, + execVerification.filter("oaid = '50|acm_________::faed5b7a1bd8f51118d13ed29cfaee09'").count()); - Dataset verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Publication.class)); - verificationDataset.createOrReplaceTempView("publication"); + Assertions + .assertEquals( + 2, + execVerification.filter("id = 'influence'").count()); - Dataset execVerification = spark.sql("Select p.id oaid, mes.id, mUnit.value from publication p " + - "lateral view explode(measures) m as mes " + - "lateral view explode(mes.unit) u as mUnit "); + Assertions + .assertEquals( + 2, + execVerification.filter("id = 'popularity'").count()); - Assertions.assertEquals(4, execVerification.count()); + Assertions + .assertEquals( + "1.47565045883e-08", + execVerification + .filter( + "oaid = '50|355e65625b88::ffa5bad14f4adc0c9a15c00efbbccddb' " + + "and id = 'influence'") + .select("value") + .collectAsList() + .get(0) + .getString(0)); + Assertions + .assertEquals( + "1.98956540239e-08", + execVerification + .filter( + "oaid = '50|acm_________::faed5b7a1bd8f51118d13ed29cfaee09' " + + "and id = 'influence'") + .select("value") + .collectAsList() + .get(0) + .getString(0)); - Assertions.assertEquals(2, - execVerification.filter("oaid = '50|355e65625b88::ffa5bad14f4adc0c9a15c00efbbccddb'").count()); + Assertions + .assertEquals( + "0.282046161584", + execVerification + .filter( + "oaid = '50|acm_________::faed5b7a1bd8f51118d13ed29cfaee09' " + + "and id = 'popularity'") + .select("value") + .collectAsList() + .get(0) + .getString(0)); - Assertions.assertEquals(2, - execVerification.filter("oaid = '50|acm_________::faed5b7a1bd8f51118d13ed29cfaee09'").count()); + Assertions + .assertEquals( + "0.227515392", + execVerification + .filter( + "oaid = '50|355e65625b88::ffa5bad14f4adc0c9a15c00efbbccddb' " + + "and id = 'popularity'") + .select("value") + .collectAsList() + .get(0) + .getString(0)); - - Assertions.assertEquals(2, - execVerification.filter("id = 'influence'").count()); - - Assertions.assertEquals(2, - execVerification.filter("id = 'popularity'").count()); - - Assertions.assertEquals("1.47565045883e-08" , - execVerification.filter("oaid = '50|355e65625b88::ffa5bad14f4adc0c9a15c00efbbccddb' " + - "and id = 'influence'").select("value").collectAsList().get(0).getString(0)); - - Assertions.assertEquals( - "1.98956540239e-08", execVerification.filter("oaid = '50|acm_________::faed5b7a1bd8f51118d13ed29cfaee09' " + - "and id = 'influence'").select("value").collectAsList().get(0).getString(0)); - - Assertions.assertEquals( - "0.282046161584", execVerification.filter("oaid = '50|acm_________::faed5b7a1bd8f51118d13ed29cfaee09' " + - "and id = 'popularity'").select("value").collectAsList().get(0).getString(0)); - - Assertions.assertEquals("0.227515392" , - execVerification.filter("oaid = '50|355e65625b88::ffa5bad14f4adc0c9a15c00efbbccddb' " + - "and id = 'popularity'").select("value").collectAsList().get(0).getString(0)); - - } + } }