forked from D-Net/dnet-hadoop
[BIP! Scores integration] merged missing classes from bipFinder branch
This commit is contained in:
parent
2a7a10809e
commit
41500669e2
|
@ -0,0 +1,20 @@
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"paramName": "issm",
|
||||||
|
"paramLongName": "isSparkSessionManaged",
|
||||||
|
"paramDescription": "when true will stop SparkSession after job execution",
|
||||||
|
"paramRequired": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "ip",
|
||||||
|
"paramLongName": "inputPath",
|
||||||
|
"paramDescription": "the URL from where to get the programme file",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "o",
|
||||||
|
"paramLongName": "outputPath",
|
||||||
|
"paramDescription": "the path of the new ActionSet",
|
||||||
|
"paramRequired": true
|
||||||
|
}
|
||||||
|
]
|
|
@ -0,0 +1,32 @@
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"paramName": "issm",
|
||||||
|
"paramLongName": "isSparkSessionManaged",
|
||||||
|
"paramDescription": "when true will stop SparkSession after job execution",
|
||||||
|
"paramRequired": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "ip",
|
||||||
|
"paramLongName": "inputPath",
|
||||||
|
"paramDescription": "the URL from where to get the programme file",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "o",
|
||||||
|
"paramLongName": "outputPath",
|
||||||
|
"paramDescription": "the path of the new ActionSet",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "rtn",
|
||||||
|
"paramLongName": "resultTableName",
|
||||||
|
"paramDescription": "the path of the new ActionSet",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "bsp",
|
||||||
|
"paramLongName": "bipScorePath",
|
||||||
|
"paramDescription": "the path of the new ActionSet",
|
||||||
|
"paramRequired": true
|
||||||
|
}
|
||||||
|
]
|
|
@ -0,0 +1,171 @@
|
||||||
|
<workflow-app name="BipFinderScore" xmlns="uri:oozie:workflow:0.5">
|
||||||
|
<parameters>
|
||||||
|
<property>
|
||||||
|
<name>inputPath</name>
|
||||||
|
<description>the input path of the resources to be extended</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>bipScorePath</name>
|
||||||
|
<description>the path where to find the bipFinder scores</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>outputPath</name>
|
||||||
|
<description>the path where to store the actionset</description>
|
||||||
|
</property>
|
||||||
|
</parameters>
|
||||||
|
|
||||||
|
<start to="deleteoutputpath"/>
|
||||||
|
<kill name="Kill">
|
||||||
|
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
|
</kill>
|
||||||
|
<action name="deleteoutputpath">
|
||||||
|
<fs>
|
||||||
|
<delete path="${outputPath}"/>
|
||||||
|
<mkdir path="${outputPath}"/>
|
||||||
|
<delete path="${workingDir}"/>
|
||||||
|
<mkdir path="${workingDir}"/>
|
||||||
|
</fs>
|
||||||
|
<ok to="atomicactions"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<fork name="atomicactions">
|
||||||
|
<path start="atomicactions_publication"/>
|
||||||
|
<path start="atomicactions_dataset"/>
|
||||||
|
<path start="atomicactions_orp"/>
|
||||||
|
<path start="atomicactions_software"/>
|
||||||
|
</fork>
|
||||||
|
|
||||||
|
<action name="atomicactions_publication">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>Produces the atomic action with the bip finder scores for publications</name>
|
||||||
|
<class>eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob</class>
|
||||||
|
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--inputPath</arg><arg>${inputPath}/publication</arg>
|
||||||
|
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
|
||||||
|
<arg>--outputPath</arg><arg>${workingDir}/publication</arg>
|
||||||
|
<arg>--bipScorePath</arg><arg>${bipScorePath}</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="join_aa"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="atomicactions_dataset">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>Produces the atomic action with the bip finder scores for datasets</name>
|
||||||
|
<class>eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob</class>
|
||||||
|
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--inputPath</arg><arg>${inputPath}/dataset</arg>
|
||||||
|
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
|
||||||
|
<arg>--outputPath</arg><arg>${workingDir}/dataset</arg>
|
||||||
|
<arg>--bipScorePath</arg><arg>${bipScorePath}</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="join_aa"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="atomicactions_orp">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>Produces the atomic action with the bip finder scores for orp</name>
|
||||||
|
<class>eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob</class>
|
||||||
|
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--inputPath</arg><arg>${inputPath}/otherresearchproduct</arg>
|
||||||
|
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
|
||||||
|
<arg>--outputPath</arg><arg>${workingDir}/otherresearchproduct</arg>
|
||||||
|
<arg>--bipScorePath</arg><arg>${bipScorePath}</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="join_aa"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="atomicactions_software">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>Produces the atomic action with the bip finder scores for software</name>
|
||||||
|
<class>eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob</class>
|
||||||
|
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--inputPath</arg><arg>${inputPath}/software</arg>
|
||||||
|
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
|
||||||
|
<arg>--outputPath</arg><arg>${workingDir}/software</arg>
|
||||||
|
<arg>--bipScorePath</arg><arg>${bipScorePath}</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="join_aa"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<join name="join_aa" to="collectandsave"/>
|
||||||
|
|
||||||
|
<action name="collectandsave">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>saves all the aa produced for the several types of results in the as output path</name>
|
||||||
|
<class>eu.dnetlib.dhp.actionmanager.bipfinder.CollectAndSave</class>
|
||||||
|
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--inputPath</arg><arg>${workingDir}</arg>
|
||||||
|
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="End"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<end name="End"/>
|
||||||
|
</workflow-app>
|
|
@ -0,0 +1,323 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.actionmanager.bipfinder;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.sql.Dataset;
|
||||||
|
import org.apache.spark.sql.Encoders;
|
||||||
|
import org.apache.spark.sql.Row;
|
||||||
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
import org.junit.jupiter.api.AfterAll;
|
||||||
|
import org.junit.jupiter.api.Assertions;
|
||||||
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||||
|
|
||||||
|
public class SparkAtomicActionScoreJobTest {
|
||||||
|
|
||||||
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
|
private static SparkSession spark;
|
||||||
|
|
||||||
|
private static Path workingDir;
|
||||||
|
private static final Logger log = LoggerFactory
|
||||||
|
.getLogger(SparkAtomicActionScoreJobTest.class);
|
||||||
|
|
||||||
|
@BeforeAll
|
||||||
|
public static void beforeAll() throws IOException {
|
||||||
|
workingDir = Files
|
||||||
|
.createTempDirectory(SparkAtomicActionScoreJobTest.class.getSimpleName());
|
||||||
|
log.info("using work dir {}", workingDir);
|
||||||
|
|
||||||
|
SparkConf conf = new SparkConf();
|
||||||
|
conf.setAppName(SparkAtomicActionScoreJobTest.class.getSimpleName());
|
||||||
|
|
||||||
|
conf.setMaster("local[*]");
|
||||||
|
conf.set("spark.driver.host", "localhost");
|
||||||
|
conf.set("hive.metastore.local", "true");
|
||||||
|
conf.set("spark.ui.enabled", "false");
|
||||||
|
conf.set("spark.sql.warehouse.dir", workingDir.toString());
|
||||||
|
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
|
||||||
|
|
||||||
|
spark = SparkSession
|
||||||
|
.builder()
|
||||||
|
.appName(SparkAtomicActionScoreJobTest.class.getSimpleName())
|
||||||
|
.config(conf)
|
||||||
|
.getOrCreate();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterAll
|
||||||
|
public static void afterAll() throws IOException {
|
||||||
|
FileUtils.deleteDirectory(workingDir.toFile());
|
||||||
|
spark.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void matchOne() throws Exception {
|
||||||
|
String bipScoresPath = getClass()
|
||||||
|
.getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/bip_scores.json")
|
||||||
|
.getPath();
|
||||||
|
String inputPath = getClass()
|
||||||
|
.getResource(
|
||||||
|
"/eu/dnetlib/dhp/actionmanager/bipfinder/publication.json")
|
||||||
|
.getPath();
|
||||||
|
|
||||||
|
SparkAtomicActionScoreJob
|
||||||
|
.main(
|
||||||
|
new String[] {
|
||||||
|
"-isSparkSessionManaged",
|
||||||
|
Boolean.FALSE.toString(),
|
||||||
|
"-inputPath",
|
||||||
|
inputPath,
|
||||||
|
"-bipScorePath",
|
||||||
|
bipScoresPath,
|
||||||
|
"-resultTableName",
|
||||||
|
"eu.dnetlib.dhp.schema.oaf.Publication",
|
||||||
|
"-outputPath",
|
||||||
|
workingDir.toString() + "/actionSet"
|
||||||
|
});
|
||||||
|
|
||||||
|
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
|
JavaRDD<Publication> tmp = sc
|
||||||
|
.sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class)
|
||||||
|
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
|
||||||
|
.map(aa -> ((Publication) aa.getPayload()));
|
||||||
|
|
||||||
|
Assertions.assertTrue(tmp.count() == 1);
|
||||||
|
|
||||||
|
Dataset<Publication> verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Publication.class));
|
||||||
|
verificationDataset.createOrReplaceTempView("publication");
|
||||||
|
|
||||||
|
Dataset<Row> execVerification = spark
|
||||||
|
.sql(
|
||||||
|
"Select p.id oaid, mes.id, mUnit.value from publication p " +
|
||||||
|
"lateral view explode(measures) m as mes " +
|
||||||
|
"lateral view explode(mes.unit) u as mUnit ");
|
||||||
|
|
||||||
|
Assertions.assertEquals(2, execVerification.count());
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"50|355e65625b88::ffa5bad14f4adc0c9a15c00efbbccddb",
|
||||||
|
execVerification.select("oaid").collectAsList().get(0).getString(0));
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"1.47565045883e-08",
|
||||||
|
execVerification.filter("id = 'influence'").select("value").collectAsList().get(0).getString(0));
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"0.227515392",
|
||||||
|
execVerification.filter("id = 'popularity'").select("value").collectAsList().get(0).getString(0));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void matchOneWithTwo() throws Exception {
|
||||||
|
String bipScoresPath = getClass()
|
||||||
|
.getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/bip_scores.json")
|
||||||
|
.getPath();
|
||||||
|
String inputPath = getClass()
|
||||||
|
.getResource(
|
||||||
|
"/eu/dnetlib/dhp/actionmanager/bipfinder/publication_2.json")
|
||||||
|
.getPath();
|
||||||
|
|
||||||
|
SparkAtomicActionScoreJob
|
||||||
|
.main(
|
||||||
|
new String[] {
|
||||||
|
"-isSparkSessionManaged",
|
||||||
|
Boolean.FALSE.toString(),
|
||||||
|
"-inputPath",
|
||||||
|
inputPath,
|
||||||
|
"-bipScorePath",
|
||||||
|
bipScoresPath,
|
||||||
|
"-resultTableName",
|
||||||
|
"eu.dnetlib.dhp.schema.oaf.Publication",
|
||||||
|
"-outputPath",
|
||||||
|
workingDir.toString() + "/actionSet"
|
||||||
|
});
|
||||||
|
|
||||||
|
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
|
JavaRDD<Publication> tmp = sc
|
||||||
|
.sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class)
|
||||||
|
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
|
||||||
|
.map(aa -> ((Publication) aa.getPayload()));
|
||||||
|
|
||||||
|
Assertions.assertTrue(tmp.count() == 1);
|
||||||
|
|
||||||
|
Dataset<Publication> verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Publication.class));
|
||||||
|
verificationDataset.createOrReplaceTempView("publication");
|
||||||
|
|
||||||
|
Dataset<Row> execVerification = spark
|
||||||
|
.sql(
|
||||||
|
"Select p.id oaid, mes.id, mUnit.value from publication p " +
|
||||||
|
"lateral view explode(measures) m as mes " +
|
||||||
|
"lateral view explode(mes.unit) u as mUnit ");
|
||||||
|
|
||||||
|
Assertions.assertEquals(4, execVerification.count());
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"50|355e65625b88::ffa5bad14f4adc0c9a15c00efbbccddb",
|
||||||
|
execVerification.select("oaid").collectAsList().get(0).getString(0));
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
2,
|
||||||
|
execVerification.filter("id = 'influence'").count());
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
2,
|
||||||
|
execVerification.filter("id = 'popularity'").count());
|
||||||
|
|
||||||
|
List<Row> tmp_ds = execVerification.filter("id = 'influence'").select("value").collectAsList();
|
||||||
|
String tmp_influence = tmp_ds.get(0).getString(0);
|
||||||
|
Assertions
|
||||||
|
.assertTrue(
|
||||||
|
"1.47565045883e-08".equals(tmp_influence) ||
|
||||||
|
"1.98956540239e-08".equals(tmp_influence));
|
||||||
|
|
||||||
|
tmp_influence = tmp_ds.get(1).getString(0);
|
||||||
|
Assertions
|
||||||
|
.assertTrue(
|
||||||
|
"1.47565045883e-08".equals(tmp_influence) ||
|
||||||
|
"1.98956540239e-08".equals(tmp_influence));
|
||||||
|
|
||||||
|
Assertions.assertTrue(!tmp_ds.get(0).getString(0).equals(tmp_ds.get(1).getString(0)));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void matchTwo() throws Exception {
|
||||||
|
String bipScoresPath = getClass()
|
||||||
|
.getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/bip_scores.json")
|
||||||
|
.getPath();
|
||||||
|
String inputPath = getClass()
|
||||||
|
.getResource(
|
||||||
|
"/eu/dnetlib/dhp/actionmanager/bipfinder/publication_3.json")
|
||||||
|
.getPath();
|
||||||
|
|
||||||
|
SparkAtomicActionScoreJob
|
||||||
|
.main(
|
||||||
|
new String[] {
|
||||||
|
"-isSparkSessionManaged",
|
||||||
|
Boolean.FALSE.toString(),
|
||||||
|
"-inputPath",
|
||||||
|
inputPath,
|
||||||
|
"-bipScorePath",
|
||||||
|
bipScoresPath,
|
||||||
|
"-resultTableName",
|
||||||
|
"eu.dnetlib.dhp.schema.oaf.Publication",
|
||||||
|
"-outputPath",
|
||||||
|
workingDir.toString() + "/actionSet"
|
||||||
|
});
|
||||||
|
|
||||||
|
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
|
JavaRDD<Publication> tmp = sc
|
||||||
|
.sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class)
|
||||||
|
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
|
||||||
|
.map(aa -> ((Publication) aa.getPayload()));
|
||||||
|
|
||||||
|
Assertions.assertTrue(tmp.count() == 2);
|
||||||
|
|
||||||
|
Dataset<Publication> verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Publication.class));
|
||||||
|
verificationDataset.createOrReplaceTempView("publication");
|
||||||
|
|
||||||
|
Dataset<Row> execVerification = spark
|
||||||
|
.sql(
|
||||||
|
"Select p.id oaid, mes.id, mUnit.value from publication p " +
|
||||||
|
"lateral view explode(measures) m as mes " +
|
||||||
|
"lateral view explode(mes.unit) u as mUnit ");
|
||||||
|
|
||||||
|
Assertions.assertEquals(4, execVerification.count());
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
2,
|
||||||
|
execVerification.filter("oaid = '50|355e65625b88::ffa5bad14f4adc0c9a15c00efbbccddb'").count());
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
2,
|
||||||
|
execVerification.filter("oaid = '50|acm_________::faed5b7a1bd8f51118d13ed29cfaee09'").count());
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
2,
|
||||||
|
execVerification.filter("id = 'influence'").count());
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
2,
|
||||||
|
execVerification.filter("id = 'popularity'").count());
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"1.47565045883e-08",
|
||||||
|
execVerification
|
||||||
|
.filter(
|
||||||
|
"oaid = '50|355e65625b88::ffa5bad14f4adc0c9a15c00efbbccddb' " +
|
||||||
|
"and id = 'influence'")
|
||||||
|
.select("value")
|
||||||
|
.collectAsList()
|
||||||
|
.get(0)
|
||||||
|
.getString(0));
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"1.98956540239e-08",
|
||||||
|
execVerification
|
||||||
|
.filter(
|
||||||
|
"oaid = '50|acm_________::faed5b7a1bd8f51118d13ed29cfaee09' " +
|
||||||
|
"and id = 'influence'")
|
||||||
|
.select("value")
|
||||||
|
.collectAsList()
|
||||||
|
.get(0)
|
||||||
|
.getString(0));
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"0.282046161584",
|
||||||
|
execVerification
|
||||||
|
.filter(
|
||||||
|
"oaid = '50|acm_________::faed5b7a1bd8f51118d13ed29cfaee09' " +
|
||||||
|
"and id = 'popularity'")
|
||||||
|
.select("value")
|
||||||
|
.collectAsList()
|
||||||
|
.get(0)
|
||||||
|
.getString(0));
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"0.227515392",
|
||||||
|
execVerification
|
||||||
|
.filter(
|
||||||
|
"oaid = '50|355e65625b88::ffa5bad14f4adc0c9a15c00efbbccddb' " +
|
||||||
|
"and id = 'popularity'")
|
||||||
|
.select("value")
|
||||||
|
.collectAsList()
|
||||||
|
.get(0)
|
||||||
|
.getString(0));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
Loading…
Reference in New Issue