From 0051ebede575a5effa945e8ec9813c52f3ff6356 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Tue, 1 Dec 2020 12:43:03 +0100 Subject: [PATCH] extending test --- .../SparkAtomicActionScoreJobTest.java | 196 ++++++++++++++---- 1 file changed, 150 insertions(+), 46 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJobTest.java index 64518234e3..3b360f6edc 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJobTest.java @@ -16,6 +16,7 @@ import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; @@ -71,7 +72,7 @@ public class SparkAtomicActionScoreJobTest { } @Test - public void numberDistinctProjectTest() throws Exception { + public void matchOne() throws Exception { String bipScoresPath = getClass().getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/bip_scores.json").getPath(); String inputPath = getClass() .getResource( @@ -102,59 +103,162 @@ public class SparkAtomicActionScoreJobTest { Assertions.assertTrue(tmp.count() == 1); -// Dataset verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Publication.class)); -// verificationDataset.createOrReplaceTempView("project"); + Dataset verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Publication.class)); + verificationDataset.createOrReplaceTempView("publication"); -// Dataset execverification = spark -// .sql( -// "SELECT id, class classification, h2020topiccode, h2020topicdescription FROM project LATERAL VIEW EXPLODE(h2020classification) c as class "); -// -// Assertions -// .assertEquals( -// "H2020-EU.3.4.7.", -// execverification -// .filter("id = '40|corda__h2020::2c7298913008865ba784e5c1350a0aa5'") -// .select("classification.h2020Programme.code") -// .collectAsList() -// .get(0) -// .getString(0)); + Dataset execVerification = spark.sql("Select p.id oaid, mes.id, mUnit.value from publication p " + + "lateral view explode(measures) m as mes " + + "lateral view explode(mes.unit) u as mUnit "); + + Assertions.assertEquals(2, execVerification.count()); + + Assertions.assertEquals("50|355e65625b88::ffa5bad14f4adc0c9a15c00efbbccddb", + execVerification.select("oaid").collectAsList().get(0).getString(0)); + + Assertions.assertEquals("1.47565045883e-08", + execVerification.filter("id = 'influence'").select("value").collectAsList().get(0).getString(0)); + + Assertions.assertEquals("0.227515392", + execVerification.filter("id = 'popularity'").select("value").collectAsList().get(0).getString(0)); + + } + + @Test + public void matchOneWithTwo() throws Exception { + String bipScoresPath = getClass().getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/bip_scores.json").getPath(); + String inputPath = getClass() + .getResource( + "/eu/dnetlib/dhp/actionmanager/bipfinder/publication_2.json") + .getPath(); + + SparkAtomicActionScoreJob.main( + new String[] { + "-isSparkSessionManaged", + Boolean.FALSE.toString(), + "-inputPath", + inputPath, + "-bipScorePath", + bipScoresPath, + "-resultTableName", + "eu.dnetlib.dhp.schema.oaf.Publication", + "-outputPath", + workingDir.toString() + "/actionSet" + }); + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) + .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) + .map(aa -> ((Publication) aa.getPayload())); + + Assertions.assertTrue(tmp.count() == 1); + + Dataset verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Publication.class)); + verificationDataset.createOrReplaceTempView("publication"); + + Dataset execVerification = spark.sql("Select p.id oaid, mes.id, mUnit.value from publication p " + + "lateral view explode(measures) m as mes " + + "lateral view explode(mes.unit) u as mUnit "); + + Assertions.assertEquals(4, execVerification.count()); + + + Assertions.assertEquals("50|355e65625b88::ffa5bad14f4adc0c9a15c00efbbccddb", + execVerification.select("oaid").collectAsList().get(0).getString(0)); + + Assertions.assertEquals(2, + execVerification.filter("id = 'influence'").count()); + + Assertions.assertEquals(2, + execVerification.filter("id = 'popularity'").count()); + + List tmp_ds = execVerification.filter("id = 'influence'").select("value").collectAsList(); + String tmp_influence = tmp_ds.get(0).getString(0); + Assertions.assertTrue("1.47565045883e-08".equals(tmp_influence)|| + "1.98956540239e-08".equals(tmp_influence) ); + + tmp_influence = tmp_ds.get(1).getString(0); + Assertions.assertTrue("1.47565045883e-08".equals(tmp_influence)|| + "1.98956540239e-08".equals(tmp_influence) ); + + Assertions.assertTrue(!tmp_ds.get(0).getString(0).equals(tmp_ds.get(1).getString(0))); } - private static List getMeasure(BipScore value) { - return value.getScoreList() - .stream() - .map(score -> { - Measure m = new Measure(); - m.setId(score.getId()); - m.setUnit(score.getUnit().stream() - .map(unit -> { - eu.dnetlib.dhp.schema.oaf.KeyValue kv = new KeyValue(); - kv.setValue(unit.getValue()); - kv.setKey(unit.getKey()); - kv.setDataInfo(getDataInfo()); - return kv; - }).collect(Collectors.toList())); - return m; - }).collect(Collectors.toList()); - } + @Test + public void matchTwo() throws Exception { + String bipScoresPath = getClass().getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/bip_scores.json").getPath(); + String inputPath = getClass() + .getResource( + "/eu/dnetlib/dhp/actionmanager/bipfinder/publication_3.json") + .getPath(); + + SparkAtomicActionScoreJob.main( + new String[] { + "-isSparkSessionManaged", + Boolean.FALSE.toString(), + "-inputPath", + inputPath, + "-bipScorePath", + bipScoresPath, + "-resultTableName", + "eu.dnetlib.dhp.schema.oaf.Publication", + "-outputPath", + workingDir.toString() + "/actionSet" + }); - private static DataInfo getDataInfo() { - DataInfo di = new DataInfo(); - di.setInferred(false); - di.setInvisible(false); - di.setDeletedbyinference(false); - di.setTrust(""); - Qualifier qualifier = new Qualifier(); - qualifier.setClassid("sysimport:actionset"); - qualifier.setClassname("Harvested"); - qualifier.setSchemename("dnet:provenanceActions"); - qualifier.setSchemeid("dnet:provenanceActions"); - di.setProvenanceaction(qualifier); - return di; + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) + .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) + .map(aa -> ((Publication) aa.getPayload())); + + Assertions.assertTrue(tmp.count() == 2); + + Dataset verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Publication.class)); + verificationDataset.createOrReplaceTempView("publication"); + + Dataset execVerification = spark.sql("Select p.id oaid, mes.id, mUnit.value from publication p " + + "lateral view explode(measures) m as mes " + + "lateral view explode(mes.unit) u as mUnit "); + + Assertions.assertEquals(4, execVerification.count()); + + + Assertions.assertEquals(2, + execVerification.filter("oaid = '50|355e65625b88::ffa5bad14f4adc0c9a15c00efbbccddb'").count()); + + Assertions.assertEquals(2, + execVerification.filter("oaid = '50|acm_________::faed5b7a1bd8f51118d13ed29cfaee09'").count()); + + + Assertions.assertEquals(2, + execVerification.filter("id = 'influence'").count()); + + Assertions.assertEquals(2, + execVerification.filter("id = 'popularity'").count()); + + Assertions.assertEquals("1.47565045883e-08" , + execVerification.filter("oaid = '50|355e65625b88::ffa5bad14f4adc0c9a15c00efbbccddb' " + + "and id = 'influence'").select("value").collectAsList().get(0).getString(0)); + + Assertions.assertEquals( + "1.98956540239e-08", execVerification.filter("oaid = '50|acm_________::faed5b7a1bd8f51118d13ed29cfaee09' " + + "and id = 'influence'").select("value").collectAsList().get(0).getString(0)); + + Assertions.assertEquals( + "0.282046161584", execVerification.filter("oaid = '50|acm_________::faed5b7a1bd8f51118d13ed29cfaee09' " + + "and id = 'popularity'").select("value").collectAsList().get(0).getString(0)); + + Assertions.assertEquals("0.227515392" , + execVerification.filter("oaid = '50|355e65625b88::ffa5bad14f4adc0c9a15c00efbbccddb' " + + "and id = 'popularity'").select("value").collectAsList().get(0).getString(0)); + } + }