forked from D-Net/dnet-hadoop
Merge pull request 'bipFinder at the level of the result' (#173) from bipFinder into beta
Reviewed-on: D-Net/dnet-hadoop#173
This commit is contained in:
commit
bf52a1847b
|
@ -1,86 +0,0 @@
|
|||
|
||||
package eu.dnetlib.dhp.actionmanager.bipfinder;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
|
||||
/**
|
||||
* Just collects all the atomic actions produced for the different results and saves them in
|
||||
* outputpath for the ActionSet
|
||||
*/
|
||||
public class CollectAndSave implements Serializable {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(CollectAndSave.class);
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
String jsonConfiguration = IOUtils
|
||||
.toString(
|
||||
Objects
|
||||
.requireNonNull(
|
||||
CollectAndSave.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/actionmanager/bipfinder/input_actionset_parameter.json")));
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||
|
||||
parser.parseArgument(args);
|
||||
|
||||
Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
final String inputPath = parser.get("inputPath");
|
||||
log.info("inputPath {}: ", inputPath);
|
||||
|
||||
final String outputPath = parser.get("outputPath");
|
||||
log.info("outputPath {}: ", outputPath);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
runWithSparkSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
removeOutputDir(spark, outputPath);
|
||||
collectAndSave(spark, inputPath, outputPath);
|
||||
});
|
||||
}
|
||||
|
||||
private static void collectAndSave(SparkSession spark, String inputPath, String outputPath) {
|
||||
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
sc
|
||||
.sequenceFile(inputPath + "/publication", Text.class, Text.class)
|
||||
.union(sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class))
|
||||
.union(sc.sequenceFile(inputPath + "/otherresearchproduct", Text.class, Text.class))
|
||||
.union(sc.sequenceFile(inputPath + "/software", Text.class, Text.class))
|
||||
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
|
||||
}
|
||||
|
||||
private static void removeOutputDir(SparkSession spark, String path) {
|
||||
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
|
||||
}
|
||||
|
||||
}
|
|
@ -1,28 +0,0 @@
|
|||
|
||||
package eu.dnetlib.dhp.actionmanager.bipfinder;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* Subset of the information of the generic results that are needed to create the atomic action
|
||||
*/
|
||||
public class PreparedResult implements Serializable {
|
||||
private String id; // openaire id
|
||||
private String value; // doi
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public void setId(String id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public String getValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
public void setValue(String value) {
|
||||
this.value = value;
|
||||
}
|
||||
}
|
|
@ -1,6 +1,7 @@
|
|||
|
||||
package eu.dnetlib.dhp.actionmanager.bipfinder;
|
||||
|
||||
import static eu.dnetlib.dhp.actionmanager.bipmodel.Constants.*;
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
@ -15,7 +16,6 @@ import org.apache.spark.SparkConf;
|
|||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.api.java.function.MapGroupsFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
|
@ -24,11 +24,15 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.actionmanager.bipmodel.BipDeserialize;
|
||||
import eu.dnetlib.dhp.actionmanager.bipmodel.BipScore;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
||||
import scala.Tuple2;
|
||||
|
||||
/**
|
||||
|
@ -46,7 +50,7 @@ public class SparkAtomicActionScoreJob implements Serializable {
|
|||
.toString(
|
||||
SparkAtomicActionScoreJob.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/actionmanager/bipfinder/input_parameters.json"));
|
||||
"/eu/dnetlib/dhp/actionmanager/bipfinder/input_actionset_parameter.json"));
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||
|
||||
|
@ -65,14 +69,6 @@ public class SparkAtomicActionScoreJob implements Serializable {
|
|||
final String outputPath = parser.get("outputPath");
|
||||
log.info("outputPath {}: ", outputPath);
|
||||
|
||||
final String bipScorePath = parser.get("bipScorePath");
|
||||
log.info("bipScorePath: {}", bipScorePath);
|
||||
|
||||
final String resultClassName = parser.get("resultTableName");
|
||||
log.info("resultTableName: {}", resultClassName);
|
||||
|
||||
Class<I> inputClazz = (Class<I>) Class.forName(resultClassName);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
runWithSparkSession(
|
||||
|
@ -80,12 +76,11 @@ public class SparkAtomicActionScoreJob implements Serializable {
|
|||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
removeOutputDir(spark, outputPath);
|
||||
prepareResults(spark, inputPath, outputPath, bipScorePath, inputClazz);
|
||||
prepareResults(spark, inputPath, outputPath);
|
||||
});
|
||||
}
|
||||
|
||||
private static <I extends Result> void prepareResults(SparkSession spark, String inputPath, String outputPath,
|
||||
String bipScorePath, Class<I> inputClazz) {
|
||||
private static <I extends Result> void prepareResults(SparkSession spark, String bipScorePath, String outputPath) {
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
|
@ -101,41 +96,19 @@ public class SparkAtomicActionScoreJob implements Serializable {
|
|||
return bs;
|
||||
}).collect(Collectors.toList()).iterator()).rdd(), Encoders.bean(BipScore.class));
|
||||
|
||||
Dataset<I> results = readPath(spark, inputPath, inputClazz);
|
||||
|
||||
results.createOrReplaceTempView("result");
|
||||
|
||||
Dataset<PreparedResult> preparedResult = spark
|
||||
.sql(
|
||||
"select pIde.value value, id " +
|
||||
"from result " +
|
||||
"lateral view explode (pid) p as pIde " +
|
||||
"where dataInfo.deletedbyinference = false and pIde.qualifier.classid = '" + DOI + "'")
|
||||
.as(Encoders.bean(PreparedResult.class));
|
||||
|
||||
bipScores
|
||||
.joinWith(
|
||||
preparedResult, bipScores.col("id").equalTo(preparedResult.col("value")),
|
||||
"inner")
|
||||
.map((MapFunction<Tuple2<BipScore, PreparedResult>, BipScore>) value -> {
|
||||
BipScore ret = value._1();
|
||||
ret.setId(value._2().getId());
|
||||
return ret;
|
||||
}, Encoders.bean(BipScore.class))
|
||||
.groupByKey((MapFunction<BipScore, String>) BipScore::getId, Encoders.STRING())
|
||||
.mapGroups((MapGroupsFunction<String, BipScore, Result>) (k, it) -> {
|
||||
Result ret = new Result();
|
||||
ret.setDataInfo(getDataInfo());
|
||||
BipScore first = it.next();
|
||||
ret.setId(first.getId());
|
||||
|
||||
ret.setMeasures(getMeasure(first));
|
||||
it.forEachRemaining(value -> ret.getMeasures().addAll(getMeasure(value)));
|
||||
.map((MapFunction<BipScore, Result>) bs -> {
|
||||
Result ret = new Result();
|
||||
|
||||
ret.setId(bs.getId());
|
||||
|
||||
ret.setMeasures(getMeasure(bs));
|
||||
|
||||
return ret;
|
||||
}, Encoders.bean(Result.class))
|
||||
.toJavaRDD()
|
||||
.map(p -> new AtomicAction(inputClazz, p))
|
||||
.map(p -> new AtomicAction(Result.class, p))
|
||||
.mapToPair(
|
||||
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
|
||||
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
|
||||
|
@ -159,7 +132,21 @@ public class SparkAtomicActionScoreJob implements Serializable {
|
|||
KeyValue kv = new KeyValue();
|
||||
kv.setValue(unit.getValue());
|
||||
kv.setKey(unit.getKey());
|
||||
kv.setDataInfo(getDataInfo());
|
||||
kv
|
||||
.setDataInfo(
|
||||
OafMapperUtils
|
||||
.dataInfo(
|
||||
false,
|
||||
UPDATE_DATA_INFO_TYPE,
|
||||
true,
|
||||
false,
|
||||
OafMapperUtils
|
||||
.qualifier(
|
||||
UPDATE_MEASURE_BIP_CLASS_ID,
|
||||
UPDATE_CLASS_NAME,
|
||||
ModelConstants.DNET_PROVENANCE_ACTIONS,
|
||||
ModelConstants.DNET_PROVENANCE_ACTIONS),
|
||||
""));
|
||||
return kv;
|
||||
})
|
||||
.collect(Collectors.toList()));
|
||||
|
@ -168,21 +155,6 @@ public class SparkAtomicActionScoreJob implements Serializable {
|
|||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private static DataInfo getDataInfo() {
|
||||
DataInfo di = new DataInfo();
|
||||
di.setInferred(false);
|
||||
di.setInvisible(false);
|
||||
di.setDeletedbyinference(false);
|
||||
di.setTrust("");
|
||||
Qualifier qualifier = new Qualifier();
|
||||
qualifier.setClassid("sysimport:actionset");
|
||||
qualifier.setClassname("Harvested");
|
||||
qualifier.setSchemename("dnet:provenanceActions");
|
||||
qualifier.setSchemeid("dnet:provenanceActions");
|
||||
di.setProvenanceaction(qualifier);
|
||||
return di;
|
||||
}
|
||||
|
||||
private static void removeOutputDir(SparkSession spark, String path) {
|
||||
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
|
||||
package eu.dnetlib.dhp.actionmanager.bipfinder;
|
||||
package eu.dnetlib.dhp.actionmanager.bipmodel;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
|
@ -1,5 +1,5 @@
|
|||
|
||||
package eu.dnetlib.dhp.actionmanager.bipfinder;
|
||||
package eu.dnetlib.dhp.actionmanager.bipmodel;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
|
@ -1,5 +1,5 @@
|
|||
|
||||
package eu.dnetlib.dhp.actionmanager.createunresolvedentities;
|
||||
package eu.dnetlib.dhp.actionmanager.bipmodel;
|
||||
|
||||
import java.util.Optional;
|
||||
|
|
@ -1,5 +1,5 @@
|
|||
|
||||
package eu.dnetlib.dhp.actionmanager.bipfinder;
|
||||
package eu.dnetlib.dhp.actionmanager.bipmodel;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
|
@ -1,5 +1,5 @@
|
|||
|
||||
package eu.dnetlib.dhp.actionmanager.bipfinder;
|
||||
package eu.dnetlib.dhp.actionmanager.bipmodel;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
|
@ -1,8 +1,8 @@
|
|||
|
||||
package eu.dnetlib.dhp.actionmanager.createunresolvedentities;
|
||||
|
||||
import static eu.dnetlib.dhp.actionmanager.createunresolvedentities.Constants.*;
|
||||
import static eu.dnetlib.dhp.actionmanager.createunresolvedentities.Constants.UPDATE_CLASS_NAME;
|
||||
import static eu.dnetlib.dhp.actionmanager.bipmodel.Constants.*;
|
||||
import static eu.dnetlib.dhp.actionmanager.bipmodel.Constants.UPDATE_CLASS_NAME;
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
@ -11,7 +11,6 @@ import java.util.Optional;
|
|||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.hdfs.client.HdfsUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
|
@ -24,8 +23,8 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.BipDeserialize;
|
||||
import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.BipScore;
|
||||
import eu.dnetlib.dhp.actionmanager.bipmodel.BipDeserialize;
|
||||
import eu.dnetlib.dhp.actionmanager.bipmodel.BipScore;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
|
@ -89,6 +88,7 @@ public class PrepareBipFinder implements Serializable {
|
|||
BipScore bs = new BipScore();
|
||||
bs.setId(key);
|
||||
bs.setScoreList(entry.get(key));
|
||||
|
||||
return bs;
|
||||
}).collect(Collectors.toList()).iterator()).rdd(), Encoders.bean(BipScore.class))
|
||||
.map((MapFunction<BipScore, Result>) v -> {
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
|
||||
package eu.dnetlib.dhp.actionmanager.createunresolvedentities;
|
||||
|
||||
import static eu.dnetlib.dhp.actionmanager.createunresolvedentities.Constants.*;
|
||||
import static eu.dnetlib.dhp.actionmanager.bipmodel.Constants.*;
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
|
||||
package eu.dnetlib.dhp.actionmanager.createunresolvedentities;
|
||||
|
||||
import static eu.dnetlib.dhp.actionmanager.createunresolvedentities.Constants.*;
|
||||
import static eu.dnetlib.dhp.actionmanager.bipmodel.Constants.*;
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
|
|
@ -1,28 +0,0 @@
|
|||
|
||||
package eu.dnetlib.dhp.actionmanager.createunresolvedentities.model;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Class that maps the model of the bipFinder! input data.
|
||||
* Only needed for deserialization purposes
|
||||
*/
|
||||
|
||||
public class BipDeserialize extends HashMap<String, List<Score>> implements Serializable {
|
||||
|
||||
public BipDeserialize() {
|
||||
super();
|
||||
}
|
||||
|
||||
public List<Score> get(String key) {
|
||||
|
||||
if (super.get(key) == null) {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
return super.get(key);
|
||||
}
|
||||
|
||||
}
|
|
@ -1,30 +0,0 @@
|
|||
|
||||
package eu.dnetlib.dhp.actionmanager.createunresolvedentities.model;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Rewriting of the bipFinder input data by extracting the identifier of the result (doi)
|
||||
*/
|
||||
|
||||
public class BipScore implements Serializable {
|
||||
private String id; // doi
|
||||
private List<Score> scoreList; // unit as given in the inputfile
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public void setId(String id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public List<Score> getScoreList() {
|
||||
return scoreList;
|
||||
}
|
||||
|
||||
public void setScoreList(List<Score> scoreList) {
|
||||
this.scoreList = scoreList;
|
||||
}
|
||||
}
|
|
@ -1,26 +0,0 @@
|
|||
|
||||
package eu.dnetlib.dhp.actionmanager.createunresolvedentities.model;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public class KeyValue implements Serializable {
|
||||
|
||||
private String key;
|
||||
private String value;
|
||||
|
||||
public String getKey() {
|
||||
return key;
|
||||
}
|
||||
|
||||
public void setKey(String key) {
|
||||
this.key = key;
|
||||
}
|
||||
|
||||
public String getValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
public void setValue(String value) {
|
||||
this.value = value;
|
||||
}
|
||||
}
|
|
@ -1,30 +0,0 @@
|
|||
|
||||
package eu.dnetlib.dhp.actionmanager.createunresolvedentities.model;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* represents the score in the input file
|
||||
*/
|
||||
public class Score implements Serializable {
|
||||
|
||||
private String id;
|
||||
private List<KeyValue> unit;
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public void setId(String id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public List<KeyValue> getUnit() {
|
||||
return unit;
|
||||
}
|
||||
|
||||
public void setUnit(List<KeyValue> unit) {
|
||||
this.unit = unit;
|
||||
}
|
||||
}
|
|
@ -1,32 +0,0 @@
|
|||
[
|
||||
{
|
||||
"paramName": "issm",
|
||||
"paramLongName": "isSparkSessionManaged",
|
||||
"paramDescription": "when true will stop SparkSession after job execution",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "ip",
|
||||
"paramLongName": "inputPath",
|
||||
"paramDescription": "the URL from where to get the programme file",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "o",
|
||||
"paramLongName": "outputPath",
|
||||
"paramDescription": "the path of the new ActionSet",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "rtn",
|
||||
"paramLongName": "resultTableName",
|
||||
"paramDescription": "the path of the new ActionSet",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "bsp",
|
||||
"paramLongName": "bipScorePath",
|
||||
"paramDescription": "the path of the new ActionSet",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -0,0 +1,30 @@
|
|||
<configuration>
|
||||
<property>
|
||||
<name>jobTracker</name>
|
||||
<value>yarnRM</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>nameNode</name>
|
||||
<value>hdfs://nameservice1</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.use.system.libpath</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>hiveMetastoreUris</name>
|
||||
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>hiveJdbcUrl</name>
|
||||
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>hiveDbName</name>
|
||||
<value>openaire</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.launcher.mapreduce.user.classpath.first</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
</configuration>
|
|
@ -13,8 +13,61 @@
|
|||
<name>outputPath</name>
|
||||
<description>the path where to store the actionset</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<description>memory for driver process</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorMemory</name>
|
||||
<description>memory for individual executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorCores</name>
|
||||
<description>number of cores used by single executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozieActionShareLibForSpark2</name>
|
||||
<description>oozie action sharelib for spark 2.*</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2ExtraListeners</name>
|
||||
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
|
||||
<description>spark 2.* extra listeners classname</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2SqlQueryExecutionListeners</name>
|
||||
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
|
||||
<description>spark 2.* sql query execution listeners classname</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2YarnHistoryServerAddress</name>
|
||||
<description>spark 2.* yarn history server address</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2EventLogDir</name>
|
||||
<description>spark 2.* event log dir location</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<global>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<configuration>
|
||||
<property>
|
||||
<name>mapreduce.job.queuename</name>
|
||||
<value>${queueName}</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.launcher.mapred.job.queue.name</name>
|
||||
<value>${oozieLauncherQueueName}</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>${oozieActionShareLibForSpark2}</value>
|
||||
</property>
|
||||
|
||||
</configuration>
|
||||
</global>
|
||||
<start to="deleteoutputpath"/>
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
|
@ -30,14 +83,8 @@
|
|||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<fork name="atomicactions">
|
||||
<path start="atomicactions_publication"/>
|
||||
<path start="atomicactions_dataset"/>
|
||||
<path start="atomicactions_orp"/>
|
||||
<path start="atomicactions_software"/>
|
||||
</fork>
|
||||
|
||||
<action name="atomicactions_publication">
|
||||
<action name="atomicactions">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
|
@ -54,118 +101,14 @@
|
|||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${inputPath}/publication</arg>
|
||||
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/publication</arg>
|
||||
<arg>--bipScorePath</arg><arg>${bipScorePath}</arg>
|
||||
</spark>
|
||||
<ok to="join_aa"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="atomicactions_dataset">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Produces the atomic action with the bip finder scores for datasets</name>
|
||||
<class>eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${inputPath}/dataset</arg>
|
||||
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/dataset</arg>
|
||||
<arg>--bipScorePath</arg><arg>${bipScorePath}</arg>
|
||||
</spark>
|
||||
<ok to="join_aa"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="atomicactions_orp">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Produces the atomic action with the bip finder scores for orp</name>
|
||||
<class>eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${inputPath}/otherresearchproduct</arg>
|
||||
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/otherresearchproduct</arg>
|
||||
<arg>--bipScorePath</arg><arg>${bipScorePath}</arg>
|
||||
</spark>
|
||||
<ok to="join_aa"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="atomicactions_software">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Produces the atomic action with the bip finder scores for software</name>
|
||||
<class>eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${inputPath}/software</arg>
|
||||
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/software</arg>
|
||||
<arg>--bipScorePath</arg><arg>${bipScorePath}</arg>
|
||||
</spark>
|
||||
<ok to="join_aa"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<join name="join_aa" to="collectandsave"/>
|
||||
|
||||
<action name="collectandsave">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>saves all the aa produced for the several types of results in the as output path</name>
|
||||
<class>eu.dnetlib.dhp.actionmanager.bipfinder.CollectAndSave</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${workingDir}</arg>
|
||||
<arg>--inputPath</arg><arg>${bipScorePath}</arg>
|
||||
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
|
@ -28,6 +28,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||
|
||||
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
|
||||
public class SparkAtomicActionScoreJobTest {
|
||||
|
||||
|
@ -69,13 +70,9 @@ public class SparkAtomicActionScoreJobTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
void matchOne() throws Exception {
|
||||
void testMatch() throws Exception {
|
||||
String bipScoresPath = getClass()
|
||||
.getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/bip_scores.json")
|
||||
.getPath();
|
||||
String inputPath = getClass()
|
||||
.getResource(
|
||||
"/eu/dnetlib/dhp/actionmanager/bipfinder/publication.json")
|
||||
.getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/bip_scores_oid.json")
|
||||
.getPath();
|
||||
|
||||
SparkAtomicActionScoreJob
|
||||
|
@ -84,234 +81,57 @@ public class SparkAtomicActionScoreJobTest {
|
|||
"-isSparkSessionManaged",
|
||||
Boolean.FALSE.toString(),
|
||||
"-inputPath",
|
||||
inputPath,
|
||||
"-bipScorePath",
|
||||
|
||||
bipScoresPath,
|
||||
"-resultTableName",
|
||||
"eu.dnetlib.dhp.schema.oaf.Publication",
|
||||
|
||||
"-outputPath",
|
||||
workingDir.toString() + "/actionSet"
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||
|
||||
JavaRDD<Publication> tmp = sc
|
||||
JavaRDD<Result> tmp = sc
|
||||
.sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class)
|
||||
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
|
||||
.map(aa -> ((Publication) aa.getPayload()));
|
||||
.map(aa -> ((Result) aa.getPayload()));
|
||||
|
||||
assertEquals(1, tmp.count());
|
||||
assertEquals(4, tmp.count());
|
||||
|
||||
Dataset<Publication> verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Publication.class));
|
||||
verificationDataset.createOrReplaceTempView("publication");
|
||||
Dataset<Result> verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Result.class));
|
||||
verificationDataset.createOrReplaceTempView("result");
|
||||
|
||||
Dataset<Row> execVerification = spark
|
||||
.sql(
|
||||
"Select p.id oaid, mes.id, mUnit.value from publication p " +
|
||||
"Select p.id oaid, mes.id, mUnit.value from result p " +
|
||||
"lateral view explode(measures) m as mes " +
|
||||
"lateral view explode(mes.unit) u as mUnit ");
|
||||
|
||||
Assertions.assertEquals(2, execVerification.count());
|
||||
|
||||
Assertions.assertEquals(12, execVerification.count());
|
||||
Assertions
|
||||
.assertEquals(
|
||||
"50|355e65625b88::ffa5bad14f4adc0c9a15c00efbbccddb",
|
||||
execVerification.select("oaid").collectAsList().get(0).getString(0));
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
"1.47565045883e-08",
|
||||
execVerification.filter("id = 'influence'").select("value").collectAsList().get(0).getString(0));
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
"0.227515392",
|
||||
execVerification.filter("id = 'popularity'").select("value").collectAsList().get(0).getString(0));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
void matchOneWithTwo() throws Exception {
|
||||
String bipScoresPath = getClass()
|
||||
.getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/bip_scores.json")
|
||||
.getPath();
|
||||
String inputPath = getClass()
|
||||
.getResource(
|
||||
"/eu/dnetlib/dhp/actionmanager/bipfinder/publication_2.json")
|
||||
.getPath();
|
||||
|
||||
SparkAtomicActionScoreJob
|
||||
.main(
|
||||
new String[] {
|
||||
"-isSparkSessionManaged",
|
||||
Boolean.FALSE.toString(),
|
||||
"-inputPath",
|
||||
inputPath,
|
||||
"-bipScorePath",
|
||||
bipScoresPath,
|
||||
"-resultTableName",
|
||||
"eu.dnetlib.dhp.schema.oaf.Publication",
|
||||
"-outputPath",
|
||||
workingDir.toString() + "/actionSet"
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||
|
||||
JavaRDD<Publication> tmp = sc
|
||||
.sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class)
|
||||
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
|
||||
.map(aa -> ((Publication) aa.getPayload()));
|
||||
|
||||
assertEquals(1, tmp.count());
|
||||
|
||||
Dataset<Publication> verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Publication.class));
|
||||
verificationDataset.createOrReplaceTempView("publication");
|
||||
|
||||
Dataset<Row> execVerification = spark
|
||||
.sql(
|
||||
"Select p.id oaid, mes.id, mUnit.value from publication p " +
|
||||
"lateral view explode(measures) m as mes " +
|
||||
"lateral view explode(mes.unit) u as mUnit ");
|
||||
|
||||
Assertions.assertEquals(4, execVerification.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
"50|355e65625b88::ffa5bad14f4adc0c9a15c00efbbccddb",
|
||||
execVerification.select("oaid").collectAsList().get(0).getString(0));
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
2,
|
||||
execVerification.filter("id = 'influence'").count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
2,
|
||||
execVerification.filter("id = 'popularity'").count());
|
||||
|
||||
List<Row> tmp_ds = execVerification.filter("id = 'influence'").select("value").collectAsList();
|
||||
String tmp_influence = tmp_ds.get(0).getString(0);
|
||||
assertTrue(
|
||||
"1.47565045883e-08".equals(tmp_influence) ||
|
||||
"1.98956540239e-08".equals(tmp_influence));
|
||||
|
||||
tmp_influence = tmp_ds.get(1).getString(0);
|
||||
assertTrue(
|
||||
"1.47565045883e-08".equals(tmp_influence) ||
|
||||
"1.98956540239e-08".equals(tmp_influence));
|
||||
|
||||
assertNotEquals(tmp_ds.get(1).getString(0), tmp_ds.get(0).getString(0));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
void matchTwo() throws Exception {
|
||||
String bipScoresPath = getClass()
|
||||
.getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/bip_scores.json")
|
||||
.getPath();
|
||||
String inputPath = getClass()
|
||||
.getResource(
|
||||
"/eu/dnetlib/dhp/actionmanager/bipfinder/publication_3.json")
|
||||
.getPath();
|
||||
|
||||
SparkAtomicActionScoreJob
|
||||
.main(
|
||||
new String[] {
|
||||
"-isSparkSessionManaged",
|
||||
Boolean.FALSE.toString(),
|
||||
"-inputPath",
|
||||
inputPath,
|
||||
"-bipScorePath",
|
||||
bipScoresPath,
|
||||
"-resultTableName",
|
||||
"eu.dnetlib.dhp.schema.oaf.Publication",
|
||||
"-outputPath",
|
||||
workingDir.toString() + "/actionSet"
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||
|
||||
JavaRDD<Publication> tmp = sc
|
||||
.sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class)
|
||||
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
|
||||
.map(aa -> ((Publication) aa.getPayload()));
|
||||
|
||||
assertEquals(2, tmp.count());
|
||||
|
||||
Dataset<Publication> verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Publication.class));
|
||||
verificationDataset.createOrReplaceTempView("publication");
|
||||
|
||||
Dataset<Row> execVerification = spark
|
||||
.sql(
|
||||
"Select p.id oaid, mes.id, mUnit.value from publication p " +
|
||||
"lateral view explode(measures) m as mes " +
|
||||
"lateral view explode(mes.unit) u as mUnit ");
|
||||
|
||||
Assertions.assertEquals(4, execVerification.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
2,
|
||||
execVerification.filter("oaid = '50|355e65625b88::ffa5bad14f4adc0c9a15c00efbbccddb'").count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
2,
|
||||
execVerification.filter("oaid = '50|acm_________::faed5b7a1bd8f51118d13ed29cfaee09'").count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
2,
|
||||
execVerification.filter("id = 'influence'").count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
2,
|
||||
execVerification.filter("id = 'popularity'").count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
"1.47565045883e-08",
|
||||
execVerification
|
||||
"6.63451994567e-09", execVerification
|
||||
.filter(
|
||||
"oaid = '50|355e65625b88::ffa5bad14f4adc0c9a15c00efbbccddb' " +
|
||||
"oaid='50|arXiv_dedup_::4a2d5fd8d71daec016c176ec71d957b1' " +
|
||||
"and id = 'influence'")
|
||||
.select("value")
|
||||
.collectAsList()
|
||||
.get(0)
|
||||
.getString(0));
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
"1.98956540239e-08",
|
||||
execVerification
|
||||
"0.348694533145", execVerification
|
||||
.filter(
|
||||
"oaid = '50|acm_________::faed5b7a1bd8f51118d13ed29cfaee09' " +
|
||||
"and id = 'influence'")
|
||||
"oaid='50|arXiv_dedup_::4a2d5fd8d71daec016c176ec71d957b1' " +
|
||||
"and id = 'popularity_alt'")
|
||||
.select("value")
|
||||
.collectAsList()
|
||||
.get(0)
|
||||
.getString(0));
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
"0.282046161584",
|
||||
execVerification
|
||||
"2.16094680115e-09", execVerification
|
||||
.filter(
|
||||
"oaid = '50|acm_________::faed5b7a1bd8f51118d13ed29cfaee09' " +
|
||||
"and id = 'popularity'")
|
||||
.select("value")
|
||||
.collectAsList()
|
||||
.get(0)
|
||||
.getString(0));
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
"0.227515392",
|
||||
execVerification
|
||||
.filter(
|
||||
"oaid = '50|355e65625b88::ffa5bad14f4adc0c9a15c00efbbccddb' " +
|
||||
"oaid='50|arXiv_dedup_::4a2d5fd8d71daec016c176ec71d957b1' " +
|
||||
"and id = 'popularity'")
|
||||
.select("value")
|
||||
.collectAsList()
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,4 @@
|
|||
{"50|arXiv_dedup_::4a2d5fd8d71daec016c176ec71d957b1": [{"id": "influence", "unit": [{"value": "6.63451994567e-09", "key": "score"}]}, {"id": "popularity_alt", "unit": [{"value": "0.348694533145", "key": "score"}]}, {"id": "popularity", "unit": [{"value": "2.16094680115e-09", "key": "score"}]}]}
|
||||
{"50|dedup_wf_001::05b1f8ce98702f69d07aa5f0429de1e3": [{"id": "influence", "unit": [{"value": "6.25057357279e-09", "key": "score"}]}, {"id": "popularity_alt", "unit": [{"value": "7.0208", "key": "score"}]}, {"id": "popularity", "unit": [{"value": "2.40234462244e-08", "key": "score"}]}]}
|
||||
{"50|dedup_wf_001::08823c8f5c3ca2eae523817036cdda67": [{"id": "influence", "unit": [{"value": "5.54921449123e-09", "key": "score"}]}, {"id": "popularity_alt", "unit": [{"value": "0.0", "key": "score"}]}, {"id": "popularity", "unit": [{"value": "3.53012887452e-10", "key": "score"}]}]}
|
||||
{"50|dedup_wf_001::0e72b399325d6efcbe3271891a1dfe4c": [{"id": "influence", "unit": [{"value": "1.63466096315e-08", "key": "score"}]}, {"id": "popularity_alt", "unit": [{"value": "20.9870879741", "key": "score"}]}, {"id": "popularity", "unit": [{"value": "5.49501495323e-08", "key": "score"}]}]}
|
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
Loading…
Reference in New Issue