2020-05-19 18:43:42 +02:00
2020-05-19 18:42:50 +02:00
package eu.dnetlib.dhp.actionmanager.project ;
2020-05-19 18:43:42 +02:00
import java.io.IOException ;
import java.nio.file.Files ;
import java.nio.file.Path ;
2020-10-01 15:40:28 +02:00
import java.util.List ;
2020-05-19 18:43:42 +02:00
import org.apache.commons.io.FileUtils ;
2020-05-28 10:26:12 +02:00
import org.apache.hadoop.io.Text ;
2020-05-19 18:43:42 +02:00
import org.apache.spark.SparkConf ;
import org.apache.spark.api.java.JavaRDD ;
import org.apache.spark.api.java.JavaSparkContext ;
2020-10-01 15:40:28 +02:00
import org.apache.spark.sql.Dataset ;
import org.apache.spark.sql.Encoders ;
import org.apache.spark.sql.Row ;
2020-05-19 18:43:42 +02:00
import org.apache.spark.sql.SparkSession ;
import org.junit.jupiter.api.AfterAll ;
import org.junit.jupiter.api.Assertions ;
import org.junit.jupiter.api.BeforeAll ;
import org.junit.jupiter.api.Test ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import com.fasterxml.jackson.databind.ObjectMapper ;
2020-05-29 10:45:18 +02:00
import eu.dnetlib.dhp.schema.action.AtomicAction ;
2020-05-19 18:43:42 +02:00
import eu.dnetlib.dhp.schema.oaf.Project ;
public class SparkUpdateProjectTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper ( ) ;
private static final ClassLoader cl = eu . dnetlib . dhp . actionmanager . project . SparkUpdateProjectTest . class
. getClassLoader ( ) ;
private static SparkSession spark ;
private static Path workingDir ;
private static final Logger log = LoggerFactory
. getLogger ( eu . dnetlib . dhp . actionmanager . project . SparkUpdateProjectTest . class ) ;
@BeforeAll
public static void beforeAll ( ) throws IOException {
workingDir = Files
. createTempDirectory ( eu . dnetlib . dhp . actionmanager . project . SparkUpdateProjectTest . class . getSimpleName ( ) ) ;
log . info ( " using work dir {} " , workingDir ) ;
SparkConf conf = new SparkConf ( ) ;
conf . setAppName ( eu . dnetlib . dhp . actionmanager . project . SparkUpdateProjectTest . class . getSimpleName ( ) ) ;
conf . setMaster ( " local[*] " ) ;
conf . set ( " spark.driver.host " , " localhost " ) ;
conf . set ( " hive.metastore.local " , " true " ) ;
conf . set ( " spark.ui.enabled " , " false " ) ;
conf . set ( " spark.sql.warehouse.dir " , workingDir . toString ( ) ) ;
conf . set ( " hive.metastore.warehouse.dir " , workingDir . resolve ( " warehouse " ) . toString ( ) ) ;
spark = SparkSession
. builder ( )
. appName ( SparkUpdateProjectTest . class . getSimpleName ( ) )
. config ( conf )
. getOrCreate ( ) ;
}
@AfterAll
public static void afterAll ( ) throws IOException {
FileUtils . deleteDirectory ( workingDir . toFile ( ) ) ;
spark . stop ( ) ;
}
@Test
public void numberDistinctProgrammeTest ( ) throws Exception {
SparkAtomicActionJob
. main (
new String [ ] {
" -isSparkSessionManaged " ,
Boolean . FALSE . toString ( ) ,
" -programmePath " ,
2020-05-22 15:29:41 +02:00
getClass ( )
2020-09-23 17:31:49 +02:00
. getResource (
" /eu/dnetlib/dhp/actionmanager/project/preparedProgramme_classification_whole.json.gz " )
2020-05-22 15:29:41 +02:00
. getPath ( ) ,
2020-05-19 18:43:42 +02:00
" -projectPath " ,
2020-05-20 13:53:32 +02:00
getClass ( ) . getResource ( " /eu/dnetlib/dhp/actionmanager/project/prepared_projects.json " ) . getPath ( ) ,
2020-10-01 15:40:28 +02:00
" -topicPath " ,
getClass ( ) . getResource ( " /eu/dnetlib/dhp/actionmanager/project/topic.json.gz " ) . getPath ( ) ,
2020-05-19 18:43:42 +02:00
" -outputPath " ,
workingDir . toString ( ) + " /actionSet "
} ) ;
final JavaSparkContext sc = new JavaSparkContext ( spark . sparkContext ( ) ) ;
JavaRDD < Project > tmp = sc
2020-05-29 10:45:18 +02:00
. sequenceFile ( workingDir . toString ( ) + " /actionSet " , Text . class , Text . class )
2020-05-28 10:26:12 +02:00
. map ( value - > OBJECT_MAPPER . readValue ( value . _2 ( ) . toString ( ) , AtomicAction . class ) )
2020-05-29 10:45:18 +02:00
. map ( aa - > ( ( Project ) aa . getPayload ( ) ) ) ;
2020-05-19 18:43:42 +02:00
2020-09-23 17:31:49 +02:00
Assertions . assertEquals ( 15 , tmp . count ( ) ) ;
2020-10-01 15:40:28 +02:00
Dataset < Project > verificationDataset = spark . createDataset ( tmp . rdd ( ) , Encoders . bean ( Project . class ) ) ;
verificationDataset . createOrReplaceTempView ( " project " ) ;
Dataset < Row > execverification = spark . sql ( " SELECT id, class classification, h2020topiccode, h2020topicdescription FROM project LATERAL VIEW EXPLODE(h2020classification) c as class " ) ;
Assertions . assertEquals ( " H2020-EU.3.4.7. " , execverification . filter ( " id = '40|corda__h2020::2c7298913008865ba784e5c1350a0aa5' " ) . select ( " classification.h2020Programme.code " ) . collectAsList ( ) . get ( 0 ) . getString ( 0 ) ) ;
Assertions . assertEquals ( " SESAR JU " , execverification . filter ( " id = '40|corda__h2020::2c7298913008865ba784e5c1350a0aa5' " ) . select ( " classification.h2020Programme.description " ) . collectAsList ( ) . get ( 0 ) . getString ( 0 ) ) ;
Assertions . assertEquals ( " Societal challenges " , execverification . filter ( " id = '40|corda__h2020::2c7298913008865ba784e5c1350a0aa5' " ) . select ( " classification.level1 " ) . collectAsList ( ) . get ( 0 ) . getString ( 0 ) ) ;
Assertions . assertEquals ( " Smart, Green And Integrated Transport " , execverification . filter ( " id = '40|corda__h2020::2c7298913008865ba784e5c1350a0aa5' " ) . select ( " classification.level2 " ) . collectAsList ( ) . get ( 0 ) . getString ( 0 ) ) ;
Assertions . assertEquals ( " SESAR JU " , execverification . filter ( " id = '40|corda__h2020::2c7298913008865ba784e5c1350a0aa5' " ) . select ( " classification.level3 " ) . collectAsList ( ) . get ( 0 ) . getString ( 0 ) ) ;
Assertions . assertEquals ( " Societal challenges | Smart, Green And Integrated Transport | SESAR JU " , execverification . filter ( " id = '40|corda__h2020::2c7298913008865ba784e5c1350a0aa5' " ) . select ( " classification.classification " ) . collectAsList ( ) . get ( 0 ) . getString ( 0 ) ) ;
Assertions . assertEquals ( " SESAR-ER4-31-2019 " , execverification . filter ( " id = '40|corda__h2020::2c7298913008865ba784e5c1350a0aa5' " ) . select ( " h2020topiccode " ) . collectAsList ( ) . get ( 0 ) . getString ( 0 ) ) ;
Assertions . assertEquals ( " U-space " , execverification . filter ( " id = '40|corda__h2020::2c7298913008865ba784e5c1350a0aa5' " ) . select ( " h2020topicdescription " ) . collectAsList ( ) . get ( 0 ) . getString ( 0 ) ) ;
Assertions . assertEquals ( " H2020-EU.1.3.2. " , execverification . filter ( " id = '40|corda__h2020::1a1f235fdd06ef14790baec159aa1202' " ) . select ( " classification.h2020Programme.code " ) . collectAsList ( ) . get ( 0 ) . getString ( 0 ) ) ;
Assertions . assertEquals ( " Nurturing excellence by means of cross-border and cross-sector mobility " , execverification . filter ( " id = '40|corda__h2020::1a1f235fdd06ef14790baec159aa1202' " ) . select ( " classification.h2020Programme.description " ) . collectAsList ( ) . get ( 0 ) . getString ( 0 ) ) ;
Assertions . assertEquals ( " Excellent science " , execverification . filter ( " id = '40|corda__h2020::1a1f235fdd06ef14790baec159aa1202' " ) . select ( " classification.level1 " ) . collectAsList ( ) . get ( 0 ) . getString ( 0 ) ) ;
Assertions . assertEquals ( " Marie Skłodowska-Curie Actions " , execverification . filter ( " id = '40|corda__h2020::1a1f235fdd06ef14790baec159aa1202' " ) . select ( " classification.level2 " ) . collectAsList ( ) . get ( 0 ) . getString ( 0 ) ) ;
Assertions . assertEquals ( " Nurturing excellence by means of cross-border and cross-sector mobility " , execverification . filter ( " id = '40|corda__h2020::1a1f235fdd06ef14790baec159aa1202' " ) . select ( " classification.level3 " ) . collectAsList ( ) . get ( 0 ) . getString ( 0 ) ) ;
Assertions . assertEquals ( " Excellent science | Marie Skłodowska-Curie Actions | Nurturing excellence by means of cross-border and cross-sector mobility " , execverification . filter ( " id = '40|corda__h2020::1a1f235fdd06ef14790baec159aa1202' " ) . select ( " classification.classification " ) . collectAsList ( ) . get ( 0 ) . getString ( 0 ) ) ;
Assertions . assertEquals ( " MSCA-IF-2019 " , execverification . filter ( " id = '40|corda__h2020::1a1f235fdd06ef14790baec159aa1202' " ) . select ( " h2020topiccode " ) . collectAsList ( ) . get ( 0 ) . getString ( 0 ) ) ;
Assertions . assertEquals ( " Individual Fellowships " , execverification . filter ( " id = '40|corda__h2020::1a1f235fdd06ef14790baec159aa1202' " ) . select ( " h2020topicdescription " ) . collectAsList ( ) . get ( 0 ) . getString ( 0 ) ) ;
Assertions . assertTrue ( execverification . filter ( " id = '40|corda__h2020::a657c271769fec90b60c1f2dbc25f4d5' " ) . select ( " classification.h2020Programme.code " ) . collectAsList ( ) . get ( 0 ) . getString ( 0 ) . equals ( " H2020-EU.2.1.4. " ) | |
execverification . filter ( " id = '40|corda__h2020::a657c271769fec90b60c1f2dbc25f4d5' " ) . select ( " classification.h2020Programme.code " ) . collectAsList ( ) . get ( 1 ) . getString ( 0 ) . equals ( " H2020-EU.2.1.4. " ) ) ;
Assertions . assertTrue ( execverification . filter ( " id = '40|corda__h2020::a657c271769fec90b60c1f2dbc25f4d5' " ) . select ( " classification.h2020Programme.code " ) . collectAsList ( ) . get ( 0 ) . getString ( 0 ) . equals ( " H2020-EU.3.2.6. " ) | |
execverification . filter ( " id = '40|corda__h2020::a657c271769fec90b60c1f2dbc25f4d5' " ) . select ( " classification.h2020Programme.code " ) . collectAsList ( ) . get ( 1 ) . getString ( 0 ) . equals ( " H2020-EU.3.2.6. " ) ) ;
Assertions . assertEquals ( " Biotechnology " , execverification . filter ( " id = '40|corda__h2020::a657c271769fec90b60c1f2dbc25f4d5' and classification.h2020Programme.code = 'H2020-EU.2.1.4.' " ) . select ( " classification.h2020Programme.description " ) . collectAsList ( ) . get ( 0 ) . getString ( 0 ) ) ;
Assertions . assertEquals ( " Bio-based Industries Joint Technology Initiative (BBI-JTI) " , execverification . filter ( " id = '40|corda__h2020::a657c271769fec90b60c1f2dbc25f4d5' and classification.h2020Programme.code = 'H2020-EU.3.2.6.' " ) . select ( " classification.h2020Programme.description " ) . collectAsList ( ) . get ( 0 ) . getString ( 0 ) ) ;
Assertions . assertEquals ( " BBI-2019-SO3-D4 " , execverification . filter ( " id = '40|corda__h2020::a657c271769fec90b60c1f2dbc25f4d5' " ) . select ( " h2020topiccode " ) . collectAsList ( ) . get ( 0 ) . getString ( 0 ) ) ;
Assertions . assertEquals ( " Demonstrate bio-based pesticides and/or biostimulant agents for sustainable increase in agricultural productivity " , execverification . filter ( " id = '40|corda__h2020::a657c271769fec90b60c1f2dbc25f4d5' " ) . select ( " h2020topicdescription " ) . collectAsList ( ) . get ( 0 ) . getString ( 0 ) ) ;
2020-05-19 18:43:42 +02:00
}
2020-05-19 18:42:50 +02:00
}