1
0
Fork 0

extending test

This commit is contained in:
Miriam Baglioni 2020-12-01 12:43:03 +01:00
parent 719da15f04
commit 0051ebede5
1 changed files with 150 additions and 46 deletions

View File

@ -16,6 +16,7 @@ import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
@ -71,7 +72,7 @@ public class SparkAtomicActionScoreJobTest {
}
@Test
public <I extends Result> void numberDistinctProjectTest() throws Exception {
public void matchOne() throws Exception {
String bipScoresPath = getClass().getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/bip_scores.json").getPath();
String inputPath = getClass()
.getResource(
@ -102,59 +103,162 @@ public class SparkAtomicActionScoreJobTest {
Assertions.assertTrue(tmp.count() == 1);
// Dataset<Publication> verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Publication.class));
// verificationDataset.createOrReplaceTempView("project");
Dataset<Publication> verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Publication.class));
verificationDataset.createOrReplaceTempView("publication");
// Dataset<Row> execverification = spark
// .sql(
// "SELECT id, class classification, h2020topiccode, h2020topicdescription FROM project LATERAL VIEW EXPLODE(h2020classification) c as class ");
//
// Assertions
// .assertEquals(
// "H2020-EU.3.4.7.",
// execverification
// .filter("id = '40|corda__h2020::2c7298913008865ba784e5c1350a0aa5'")
// .select("classification.h2020Programme.code")
// .collectAsList()
// .get(0)
// .getString(0));
Dataset<Row> execVerification = spark.sql("Select p.id oaid, mes.id, mUnit.value from publication p " +
"lateral view explode(measures) m as mes " +
"lateral view explode(mes.unit) u as mUnit ");
Assertions.assertEquals(2, execVerification.count());
Assertions.assertEquals("50|355e65625b88::ffa5bad14f4adc0c9a15c00efbbccddb",
execVerification.select("oaid").collectAsList().get(0).getString(0));
Assertions.assertEquals("1.47565045883e-08",
execVerification.filter("id = 'influence'").select("value").collectAsList().get(0).getString(0));
Assertions.assertEquals("0.227515392",
execVerification.filter("id = 'popularity'").select("value").collectAsList().get(0).getString(0));
}
@Test
public void matchOneWithTwo() throws Exception {
String bipScoresPath = getClass().getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/bip_scores.json").getPath();
String inputPath = getClass()
.getResource(
"/eu/dnetlib/dhp/actionmanager/bipfinder/publication_2.json")
.getPath();
SparkAtomicActionScoreJob.main(
new String[] {
"-isSparkSessionManaged",
Boolean.FALSE.toString(),
"-inputPath",
inputPath,
"-bipScorePath",
bipScoresPath,
"-resultTableName",
"eu.dnetlib.dhp.schema.oaf.Publication",
"-outputPath",
workingDir.toString() + "/actionSet"
});
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<Publication> tmp = sc
.sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class)
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
.map(aa -> ((Publication) aa.getPayload()));
Assertions.assertTrue(tmp.count() == 1);
Dataset<Publication> verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Publication.class));
verificationDataset.createOrReplaceTempView("publication");
Dataset<Row> execVerification = spark.sql("Select p.id oaid, mes.id, mUnit.value from publication p " +
"lateral view explode(measures) m as mes " +
"lateral view explode(mes.unit) u as mUnit ");
Assertions.assertEquals(4, execVerification.count());
Assertions.assertEquals("50|355e65625b88::ffa5bad14f4adc0c9a15c00efbbccddb",
execVerification.select("oaid").collectAsList().get(0).getString(0));
Assertions.assertEquals(2,
execVerification.filter("id = 'influence'").count());
Assertions.assertEquals(2,
execVerification.filter("id = 'popularity'").count());
List<Row> tmp_ds = execVerification.filter("id = 'influence'").select("value").collectAsList();
String tmp_influence = tmp_ds.get(0).getString(0);
Assertions.assertTrue("1.47565045883e-08".equals(tmp_influence)||
"1.98956540239e-08".equals(tmp_influence) );
tmp_influence = tmp_ds.get(1).getString(0);
Assertions.assertTrue("1.47565045883e-08".equals(tmp_influence)||
"1.98956540239e-08".equals(tmp_influence) );
Assertions.assertTrue(!tmp_ds.get(0).getString(0).equals(tmp_ds.get(1).getString(0)));
}
private static List<Measure> getMeasure(BipScore value) {
return value.getScoreList()
.stream()
.map(score -> {
Measure m = new Measure();
m.setId(score.getId());
m.setUnit(score.getUnit().stream()
.map(unit -> {
eu.dnetlib.dhp.schema.oaf.KeyValue kv = new KeyValue();
kv.setValue(unit.getValue());
kv.setKey(unit.getKey());
kv.setDataInfo(getDataInfo());
return kv;
}).collect(Collectors.toList()));
return m;
}).collect(Collectors.toList());
}
@Test
public void matchTwo() throws Exception {
String bipScoresPath = getClass().getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/bip_scores.json").getPath();
String inputPath = getClass()
.getResource(
"/eu/dnetlib/dhp/actionmanager/bipfinder/publication_3.json")
.getPath();
SparkAtomicActionScoreJob.main(
new String[] {
"-isSparkSessionManaged",
Boolean.FALSE.toString(),
"-inputPath",
inputPath,
"-bipScorePath",
bipScoresPath,
"-resultTableName",
"eu.dnetlib.dhp.schema.oaf.Publication",
"-outputPath",
workingDir.toString() + "/actionSet"
});
private static DataInfo getDataInfo() {
DataInfo di = new DataInfo();
di.setInferred(false);
di.setInvisible(false);
di.setDeletedbyinference(false);
di.setTrust("");
Qualifier qualifier = new Qualifier();
qualifier.setClassid("sysimport:actionset");
qualifier.setClassname("Harvested");
qualifier.setSchemename("dnet:provenanceActions");
qualifier.setSchemeid("dnet:provenanceActions");
di.setProvenanceaction(qualifier);
return di;
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<Publication> tmp = sc
.sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class)
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
.map(aa -> ((Publication) aa.getPayload()));
Assertions.assertTrue(tmp.count() == 2);
Dataset<Publication> verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Publication.class));
verificationDataset.createOrReplaceTempView("publication");
Dataset<Row> execVerification = spark.sql("Select p.id oaid, mes.id, mUnit.value from publication p " +
"lateral view explode(measures) m as mes " +
"lateral view explode(mes.unit) u as mUnit ");
Assertions.assertEquals(4, execVerification.count());
Assertions.assertEquals(2,
execVerification.filter("oaid = '50|355e65625b88::ffa5bad14f4adc0c9a15c00efbbccddb'").count());
Assertions.assertEquals(2,
execVerification.filter("oaid = '50|acm_________::faed5b7a1bd8f51118d13ed29cfaee09'").count());
Assertions.assertEquals(2,
execVerification.filter("id = 'influence'").count());
Assertions.assertEquals(2,
execVerification.filter("id = 'popularity'").count());
Assertions.assertEquals("1.47565045883e-08" ,
execVerification.filter("oaid = '50|355e65625b88::ffa5bad14f4adc0c9a15c00efbbccddb' " +
"and id = 'influence'").select("value").collectAsList().get(0).getString(0));
Assertions.assertEquals(
"1.98956540239e-08", execVerification.filter("oaid = '50|acm_________::faed5b7a1bd8f51118d13ed29cfaee09' " +
"and id = 'influence'").select("value").collectAsList().get(0).getString(0));
Assertions.assertEquals(
"0.282046161584", execVerification.filter("oaid = '50|acm_________::faed5b7a1bd8f51118d13ed29cfaee09' " +
"and id = 'popularity'").select("value").collectAsList().get(0).getString(0));
Assertions.assertEquals("0.227515392" ,
execVerification.filter("oaid = '50|355e65625b88::ffa5bad14f4adc0c9a15c00efbbccddb' " +
"and id = 'popularity'").select("value").collectAsList().get(0).getString(0));
}
}