Merge pull request 'Creation of the action set to include the bipFinder! score' (#62) from miriam.baglioni/dnet-hadoop:bipFinder into master

This commit is contained in:
Claudio Atzori 2020-12-17 12:09:03 +01:00
commit add7e1693b
17 changed files with 2197 additions and 2 deletions

View File

@ -1,6 +1,7 @@
package eu.dnetlib.dhp.schema.oaf;
import java.io.Serializable;
import java.util.List;
import com.google.common.base.Objects;
@ -8,7 +9,7 @@ import com.google.common.base.Objects;
/**
* Represent a measure, must be further described by a system available resource providing name and descriptions.
*/
public class Measure {
public class Measure implements Serializable {
/**
* Unique measure identifier.
@ -16,7 +17,7 @@ public class Measure {
private String id;
/**
* List of units associated with this measure. KeyValue provides a pair to store the laber (key) and the value, plus
* List of units associated with this measure. KeyValue provides a pair to store the label (key) and the value, plus
* common provenance information.
*/
private List<KeyValue> unit;

View File

@ -0,0 +1,28 @@
package eu.dnetlib.dhp.actionmanager.bipfinder;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
/**
* Class that maps the model of the bipFinder! input data.
* Only needed for deserialization purposes
*/
public class BipDeserialize extends HashMap<String, List<Score>> implements Serializable {
public BipDeserialize() {
super();
}
public List<Score> get(String key) {
if (super.get(key) == null) {
return new ArrayList<>();
}
return super.get(key);
}
}

View File

@ -0,0 +1,30 @@
package eu.dnetlib.dhp.actionmanager.bipfinder;
import java.io.Serializable;
import java.util.List;
/**
* Rewriting of the bipFinder input data by extracting the identifier of the result (doi)
*/
public class BipScore implements Serializable {
private String id; //doi
private List<Score> scoreList; //unit as given in the inputfile
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public List<Score> getScoreList() {
return scoreList;
}
public void setScoreList(List<Score> scoreList) {
this.scoreList = scoreList;
}
}

View File

@ -0,0 +1,85 @@
package eu.dnetlib.dhp.actionmanager.bipfinder;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.oaf.Result;
/**
* Just collects all the atomic actions produced for the different results and saves them in
* outputpath for the ActionSet
*/
public class CollectAndSave implements Serializable {
private static final Logger log = LoggerFactory.getLogger(CollectAndSave.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static <I extends Result> void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
CollectAndSave.class
.getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/bipfinder/input_actionset_parameter.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("inputPath");
log.info("inputPath {}: ", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath {}: ", outputPath);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
collectAndSave(spark, inputPath, outputPath);
});
}
private static void collectAndSave(SparkSession spark, String inputPath, String outputPath) {
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
sc
.sequenceFile(inputPath + "/publication", Text.class, Text.class)
.union(sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class))
.union(sc.sequenceFile(inputPath + "/otherresearchproduct", Text.class, Text.class))
.union(sc.sequenceFile(inputPath + "/software", Text.class, Text.class))
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
;
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
}

View File

@ -0,0 +1,26 @@
package eu.dnetlib.dhp.actionmanager.bipfinder;
import java.io.Serializable;
public class KeyValue implements Serializable {
private String key;
private String value;
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}

View File

@ -0,0 +1,28 @@
package eu.dnetlib.dhp.actionmanager.bipfinder;
import java.io.Serializable;
/**
* Subset of the information of the generic results that are needed to create the atomic action
*/
public class PreparedResult implements Serializable {
private String id; // openaire id
private String value; // doi
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}

View File

@ -0,0 +1,30 @@
package eu.dnetlib.dhp.actionmanager.bipfinder;
import java.io.Serializable;
import java.util.List;
/**
* represents the score in the input file
*/
public class Score implements Serializable {
private String id;
private List<KeyValue> unit;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public List<KeyValue> getUnit() {
return unit;
}
public void setUnit(List<KeyValue> unit) {
this.unit = unit;
}
}

View File

@ -0,0 +1,200 @@
package eu.dnetlib.dhp.actionmanager.bipfinder;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import scala.Tuple2;
/**
* created the Atomic Action for each tipe of results
*/
public class SparkAtomicActionScoreJob implements Serializable {
private static String DOI = "doi";
private static final Logger log = LoggerFactory.getLogger(SparkAtomicActionScoreJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static <I extends Result> void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkAtomicActionScoreJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/bipfinder/input_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("inputPath");
log.info("inputPath {}: ", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath {}: ", outputPath);
final String bipScorePath = parser.get("bipScorePath");
log.info("bipScorePath: {}", bipScorePath);
final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName);
Class<I> inputClazz = (Class<I>) Class.forName(resultClassName);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
prepareResults(spark, inputPath, outputPath, bipScorePath, inputClazz);
});
}
private static <I extends Result> void prepareResults(SparkSession spark, String inputPath, String outputPath,
String bipScorePath, Class<I> inputClazz) {
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<BipDeserialize> bipDeserializeJavaRDD = sc
.textFile(bipScorePath)
.map(item -> OBJECT_MAPPER.readValue(item, BipDeserialize.class));
Dataset<BipScore> bipScores = spark
.createDataset(bipDeserializeJavaRDD.flatMap(entry -> entry.keySet().stream().map(key -> {
BipScore bs = new BipScore();
bs.setId(key);
bs.setScoreList(entry.get(key));
return bs;
}).collect(Collectors.toList()).iterator()).rdd(), Encoders.bean(BipScore.class));
System.out.println(bipScores.count());
Dataset<I> results = readPath(spark, inputPath, inputClazz);
results.createOrReplaceTempView("result");
Dataset<PreparedResult> preparedResult = spark
.sql(
"select pIde.value value, id " +
"from result " +
"lateral view explode (pid) p as pIde " +
"where dataInfo.deletedbyinference = false and pIde.qualifier.classid = '" + DOI + "'")
.as(Encoders.bean(PreparedResult.class));
bipScores
.joinWith(
preparedResult, bipScores.col("id").equalTo(preparedResult.col("value")),
"inner")
.map((MapFunction<Tuple2<BipScore, PreparedResult>, BipScore>) value -> {
BipScore ret = value._1();
ret.setId(value._2().getId());
return ret;
}, Encoders.bean(BipScore.class))
.groupByKey((MapFunction<BipScore, String>) value -> value.getId(), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, BipScore, I>) (k, it) -> {
Result ret = inputClazz.newInstance();
BipScore first = it.next();
ret.setId(first.getId());
ret.setMeasures(getMeasure(first));
it.forEachRemaining(value -> ret.getMeasures().addAll(getMeasure(value)));
return (I) ret;
}, Encoders.bean(inputClazz))
.toJavaRDD()
.map(p -> new AtomicAction(inputClazz, p))
.mapToPair(
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
}
private static List<Measure> getMeasure(BipScore value) {
return value
.getScoreList()
.stream()
.map(score -> {
Measure m = new Measure();
m.setId(score.getId());
m
.setUnit(
score
.getUnit()
.stream()
.map(unit -> {
KeyValue kv = new KeyValue();
kv.setValue(unit.getValue());
kv.setKey(unit.getKey());
kv.setDataInfo(getDataInfo());
return kv;
})
.collect(Collectors.toList()));
return m;
})
.collect(Collectors.toList());
}
private static DataInfo getDataInfo() {
DataInfo di = new DataInfo();
di.setInferred(false);
di.setInvisible(false);
di.setDeletedbyinference(false);
di.setTrust("");
Qualifier qualifier = new Qualifier();
qualifier.setClassid("sysimport:actionset");
qualifier.setClassname("Harvested");
qualifier.setSchemename("dnet:provenanceActions");
qualifier.setSchemeid("dnet:provenanceActions");
di.setProvenanceaction(qualifier);
return di;
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
public static <R> Dataset<R> readPath(
SparkSession spark, String inputPath, Class<R> clazz) {
return spark
.read()
.textFile(inputPath)
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
}
}

View File

@ -0,0 +1,20 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "ip",
"paramLongName": "inputPath",
"paramDescription": "the URL from where to get the programme file",
"paramRequired": true
},
{
"paramName": "o",
"paramLongName": "outputPath",
"paramDescription": "the path of the new ActionSet",
"paramRequired": true
}
]

View File

@ -0,0 +1,32 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "ip",
"paramLongName": "inputPath",
"paramDescription": "the URL from where to get the programme file",
"paramRequired": true
},
{
"paramName": "o",
"paramLongName": "outputPath",
"paramDescription": "the path of the new ActionSet",
"paramRequired": true
},
{
"paramName": "rtn",
"paramLongName": "resultTableName",
"paramDescription": "the path of the new ActionSet",
"paramRequired": true
},
{
"paramName": "bsp",
"paramLongName": "bipScorePath",
"paramDescription": "the path of the new ActionSet",
"paramRequired": true
}
]

View File

@ -0,0 +1,58 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>hive_metastore_uris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
<property>
<name>sparkExecutorNumber</name>
<value>4</value>
</property>
<property>
<name>spark2EventLogDir</name>
<value>/user/spark/spark2ApplicationHistory</value>
</property>
<property>
<name>sparkDriverMemory</name>
<value>15G</value>
</property>
<property>
<name>sparkExecutorMemory</name>
<value>6G</value>
</property>
<property>
<name>sparkExecutorCores</name>
<value>1</value>
</property>
</configuration>

View File

@ -0,0 +1,171 @@
<workflow-app name="BipFinderScore" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>inputPath</name>
<description>the input path of the resources to be extended</description>
</property>
<property>
<name>bipScorePath</name>
<description>the path where to find the bipFinder scores</description>
</property>
<property>
<name>outputPath</name>
<description>the path where to store the actionset</description>
</property>
</parameters>
<start to="deleteoutputpath"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="deleteoutputpath">
<fs>
<delete path='${outputPath}'/>
<mkdir path='${outputPath}'/>
<delete path='${workingDir}'/>
<mkdir path='${workingDir}'/>
</fs>
<ok to="atomicactions"/>
<error to="Kill"/>
</action>
<fork name="atomicactions">
<path start="atomicactions_publication"/>
<path start="atomicactions_dataset"/>
<path start="atomicactions_orp"/>
<path start="atomicactions_software"/>
</fork>
<action name="atomicactions_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Produces the atomic action with the bip finder scores for publications</name>
<class>eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--inputPath</arg><arg>${inputPath}/publication</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/publication</arg>
<arg>--bipScorePath</arg><arg>${bipScorePath}</arg>
</spark>
<ok to="join_aa"/>
<error to="Kill"/>
</action>
<action name="atomicactions_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Produces the atomic action with the bip finder scores for datasets</name>
<class>eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--inputPath</arg><arg>${inputPath}/dataset</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/dataset</arg>
<arg>--bipScorePath</arg><arg>${bipScorePath}</arg>
</spark>
<ok to="join_aa"/>
<error to="Kill"/>
</action>
<action name="atomicactions_orp">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Produces the atomic action with the bip finder scores for orp</name>
<class>eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--inputPath</arg><arg>${inputPath}/otherresearchproduct</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/otherresearchproduct</arg>
<arg>--bipScorePath</arg><arg>${bipScorePath}</arg>
</spark>
<ok to="join_aa"/>
<error to="Kill"/>
</action>
<action name="atomicactions_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Produces the atomic action with the bip finder scores for software</name>
<class>eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--inputPath</arg><arg>${inputPath}/software</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${workingDir}/software</arg>
<arg>--bipScorePath</arg><arg>${bipScorePath}</arg>
</spark>
<ok to="join_aa"/>
<error to="Kill"/>
</action>
<join name="join_aa" to="collectandsave"/>
<action name="collectandsave">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>saves all the aa produced for the several types of results in the as output path</name>
<class>eu.dnetlib.dhp.actionmanager.bipfinder.CollectAndSave</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--inputPath</arg><arg>${workingDir}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,331 @@
package eu.dnetlib.dhp.actionmanager.bipfinder;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import scala.Tuple2;
public class SparkAtomicActionScoreJobTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static SparkSession spark;
private static Path workingDir;
private static final Logger log = LoggerFactory
.getLogger(SparkAtomicActionScoreJobTest.class);
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files
.createTempDirectory(SparkAtomicActionScoreJobTest.class.getSimpleName());
log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(SparkAtomicActionScoreJobTest.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession
.builder()
.appName(SparkAtomicActionScoreJobTest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop();
}
@Test
public void matchOne() throws Exception {
String bipScoresPath = getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/bip_scores.json")
.getPath();
String inputPath = getClass()
.getResource(
"/eu/dnetlib/dhp/actionmanager/bipfinder/publication.json")
.getPath();
SparkAtomicActionScoreJob
.main(
new String[] {
"-isSparkSessionManaged",
Boolean.FALSE.toString(),
"-inputPath",
inputPath,
"-bipScorePath",
bipScoresPath,
"-resultTableName",
"eu.dnetlib.dhp.schema.oaf.Publication",
"-outputPath",
workingDir.toString() + "/actionSet"
});
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<Publication> tmp = sc
.sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class)
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
.map(aa -> ((Publication) aa.getPayload()));
Assertions.assertTrue(tmp.count() == 1);
Dataset<Publication> verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Publication.class));
verificationDataset.createOrReplaceTempView("publication");
Dataset<Row> execVerification = spark
.sql(
"Select p.id oaid, mes.id, mUnit.value from publication p " +
"lateral view explode(measures) m as mes " +
"lateral view explode(mes.unit) u as mUnit ");
Assertions.assertEquals(2, execVerification.count());
Assertions
.assertEquals(
"50|355e65625b88::ffa5bad14f4adc0c9a15c00efbbccddb",
execVerification.select("oaid").collectAsList().get(0).getString(0));
Assertions
.assertEquals(
"1.47565045883e-08",
execVerification.filter("id = 'influence'").select("value").collectAsList().get(0).getString(0));
Assertions
.assertEquals(
"0.227515392",
execVerification.filter("id = 'popularity'").select("value").collectAsList().get(0).getString(0));
}
@Test
public void matchOneWithTwo() throws Exception {
String bipScoresPath = getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/bip_scores.json")
.getPath();
String inputPath = getClass()
.getResource(
"/eu/dnetlib/dhp/actionmanager/bipfinder/publication_2.json")
.getPath();
SparkAtomicActionScoreJob
.main(
new String[] {
"-isSparkSessionManaged",
Boolean.FALSE.toString(),
"-inputPath",
inputPath,
"-bipScorePath",
bipScoresPath,
"-resultTableName",
"eu.dnetlib.dhp.schema.oaf.Publication",
"-outputPath",
workingDir.toString() + "/actionSet"
});
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<Publication> tmp = sc
.sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class)
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
.map(aa -> ((Publication) aa.getPayload()));
Assertions.assertTrue(tmp.count() == 1);
Dataset<Publication> verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Publication.class));
verificationDataset.createOrReplaceTempView("publication");
Dataset<Row> execVerification = spark
.sql(
"Select p.id oaid, mes.id, mUnit.value from publication p " +
"lateral view explode(measures) m as mes " +
"lateral view explode(mes.unit) u as mUnit ");
Assertions.assertEquals(4, execVerification.count());
Assertions
.assertEquals(
"50|355e65625b88::ffa5bad14f4adc0c9a15c00efbbccddb",
execVerification.select("oaid").collectAsList().get(0).getString(0));
Assertions
.assertEquals(
2,
execVerification.filter("id = 'influence'").count());
Assertions
.assertEquals(
2,
execVerification.filter("id = 'popularity'").count());
List<Row> tmp_ds = execVerification.filter("id = 'influence'").select("value").collectAsList();
String tmp_influence = tmp_ds.get(0).getString(0);
Assertions
.assertTrue(
"1.47565045883e-08".equals(tmp_influence) ||
"1.98956540239e-08".equals(tmp_influence));
tmp_influence = tmp_ds.get(1).getString(0);
Assertions
.assertTrue(
"1.47565045883e-08".equals(tmp_influence) ||
"1.98956540239e-08".equals(tmp_influence));
Assertions.assertTrue(!tmp_ds.get(0).getString(0).equals(tmp_ds.get(1).getString(0)));
}
@Test
public void matchTwo() throws Exception {
String bipScoresPath = getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/bip_scores.json")
.getPath();
String inputPath = getClass()
.getResource(
"/eu/dnetlib/dhp/actionmanager/bipfinder/publication_3.json")
.getPath();
SparkAtomicActionScoreJob
.main(
new String[] {
"-isSparkSessionManaged",
Boolean.FALSE.toString(),
"-inputPath",
inputPath,
"-bipScorePath",
bipScoresPath,
"-resultTableName",
"eu.dnetlib.dhp.schema.oaf.Publication",
"-outputPath",
workingDir.toString() + "/actionSet"
});
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<Publication> tmp = sc
.sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class)
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
.map(aa -> ((Publication) aa.getPayload()));
Assertions.assertTrue(tmp.count() == 2);
Dataset<Publication> verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Publication.class));
verificationDataset.createOrReplaceTempView("publication");
Dataset<Row> execVerification = spark
.sql(
"Select p.id oaid, mes.id, mUnit.value from publication p " +
"lateral view explode(measures) m as mes " +
"lateral view explode(mes.unit) u as mUnit ");
Assertions.assertEquals(4, execVerification.count());
Assertions
.assertEquals(
2,
execVerification.filter("oaid = '50|355e65625b88::ffa5bad14f4adc0c9a15c00efbbccddb'").count());
Assertions
.assertEquals(
2,
execVerification.filter("oaid = '50|acm_________::faed5b7a1bd8f51118d13ed29cfaee09'").count());
Assertions
.assertEquals(
2,
execVerification.filter("id = 'influence'").count());
Assertions
.assertEquals(
2,
execVerification.filter("id = 'popularity'").count());
Assertions
.assertEquals(
"1.47565045883e-08",
execVerification
.filter(
"oaid = '50|355e65625b88::ffa5bad14f4adc0c9a15c00efbbccddb' " +
"and id = 'influence'")
.select("value")
.collectAsList()
.get(0)
.getString(0));
Assertions
.assertEquals(
"1.98956540239e-08",
execVerification
.filter(
"oaid = '50|acm_________::faed5b7a1bd8f51118d13ed29cfaee09' " +
"and id = 'influence'")
.select("value")
.collectAsList()
.get(0)
.getString(0));
Assertions
.assertEquals(
"0.282046161584",
execVerification
.filter(
"oaid = '50|acm_________::faed5b7a1bd8f51118d13ed29cfaee09' " +
"and id = 'popularity'")
.select("value")
.collectAsList()
.get(0)
.getString(0));
Assertions
.assertEquals(
"0.227515392",
execVerification
.filter(
"oaid = '50|355e65625b88::ffa5bad14f4adc0c9a15c00efbbccddb' " +
"and id = 'popularity'")
.select("value")
.collectAsList()
.get(0)
.getString(0));
}
}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long