2020-05-19 18:43:42 +02:00
2020-05-19 18:42:50 +02:00
package eu.dnetlib.dhp.actionmanager.project ;
2020-05-19 18:43:42 +02:00
import java.io.IOException ;
import java.nio.file.Files ;
import java.nio.file.Path ;
2020-09-22 14:38:00 +02:00
import eu.dnetlib.dhp.actionmanager.project.csvutils.CSVProject ;
2020-05-19 18:43:42 +02:00
import org.apache.commons.io.FileUtils ;
import org.apache.spark.SparkConf ;
import org.apache.spark.api.java.JavaRDD ;
import org.apache.spark.api.java.JavaSparkContext ;
2020-09-22 14:38:00 +02:00
import org.apache.spark.api.java.function.ForeachFunction ;
2020-05-19 18:43:42 +02:00
import org.apache.spark.sql.Dataset ;
import org.apache.spark.sql.Encoders ;
import org.apache.spark.sql.SparkSession ;
import org.junit.jupiter.api.AfterAll ;
import org.junit.jupiter.api.Assertions ;
import org.junit.jupiter.api.BeforeAll ;
import org.junit.jupiter.api.Test ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import com.fasterxml.jackson.databind.ObjectMapper ;
import eu.dnetlib.dhp.actionmanager.project.csvutils.CSVProgramme ;
2020-05-19 18:42:50 +02:00
public class PrepareProgrammeTest {
2020-05-19 18:43:42 +02:00
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper ( ) ;
private static final ClassLoader cl = eu . dnetlib . dhp . actionmanager . project . PrepareProgrammeTest . class
. getClassLoader ( ) ;
private static SparkSession spark ;
private static Path workingDir ;
private static final Logger log = LoggerFactory
. getLogger ( eu . dnetlib . dhp . actionmanager . project . PrepareProgrammeTest . class ) ;
@BeforeAll
public static void beforeAll ( ) throws IOException {
workingDir = Files
. createTempDirectory ( eu . dnetlib . dhp . actionmanager . project . PrepareProgrammeTest . class . getSimpleName ( ) ) ;
log . info ( " using work dir {} " , workingDir ) ;
SparkConf conf = new SparkConf ( ) ;
conf . setAppName ( eu . dnetlib . dhp . actionmanager . project . PrepareProgrammeTest . class . getSimpleName ( ) ) ;
conf . setMaster ( " local[*] " ) ;
conf . set ( " spark.driver.host " , " localhost " ) ;
conf . set ( " hive.metastore.local " , " true " ) ;
conf . set ( " spark.ui.enabled " , " false " ) ;
conf . set ( " spark.sql.warehouse.dir " , workingDir . toString ( ) ) ;
conf . set ( " hive.metastore.warehouse.dir " , workingDir . resolve ( " warehouse " ) . toString ( ) ) ;
spark = SparkSession
. builder ( )
. appName ( PrepareProgrammeTest . class . getSimpleName ( ) )
. config ( conf )
. getOrCreate ( ) ;
}
@AfterAll
public static void afterAll ( ) throws IOException {
FileUtils . deleteDirectory ( workingDir . toFile ( ) ) ;
spark . stop ( ) ;
}
@Test
public void numberDistinctProgrammeTest ( ) throws Exception {
PrepareProgramme
. main (
new String [ ] {
" -isSparkSessionManaged " ,
Boolean . FALSE . toString ( ) ,
" -programmePath " ,
2020-05-20 13:53:32 +02:00
getClass ( ) . getResource ( " /eu/dnetlib/dhp/actionmanager/project/whole_programme.json.gz " ) . getPath ( ) ,
2020-05-19 18:43:42 +02:00
" -outputPath " ,
workingDir . toString ( ) + " /preparedProgramme "
} ) ;
final JavaSparkContext sc = new JavaSparkContext ( spark . sparkContext ( ) ) ;
JavaRDD < CSVProgramme > tmp = sc
. textFile ( workingDir . toString ( ) + " /preparedProgramme " )
. map ( item - > OBJECT_MAPPER . readValue ( item , CSVProgramme . class ) ) ;
Assertions . assertEquals ( 277 , tmp . count ( ) ) ;
Dataset < CSVProgramme > verificationDataset = spark . createDataset ( tmp . rdd ( ) , Encoders . bean ( CSVProgramme . class ) ) ;
2020-09-22 14:38:00 +02:00
Assertions . assertEquals ( 0 , verificationDataset . filter ( " title ='' " ) . count ( ) ) ;
Assertions . assertEquals ( 0 , verificationDataset . filter ( " classification = '' " ) . count ( ) ) ;
Assertions . assertEquals ( " Societal challenges $ Smart, Green And Integrated Transport $ CLEANSKY2 $ IADP Fast Rotorcraft " ,
verificationDataset . filter ( " code = 'H2020-EU.3.4.5.3.' " ) . select ( " classification " ) . collectAsList ( )
. get ( 0 ) . getString ( 0 ) ) ;
Assertions . assertEquals ( " Euratom $ Indirect actions $ European Fusion Development Agreement " ,
verificationDataset . filter ( " code = 'H2020-Euratom-1.9.' " ) . select ( " classification " ) . collectAsList ( )
. get ( 0 ) . getString ( 0 ) ) ;
Assertions . assertEquals ( " Industrial leadership $ Leadership in enabling and industrial technologies $ Advanced manufacturing and processing $ New sustainable business models " ,
verificationDataset . filter ( " code = 'H2020-EU.2.1.5.4.' " ) . select ( " classification " ) . collectAsList ( )
. get ( 0 ) . getString ( 0 ) ) ;
Assertions . assertEquals ( " Excellent science $ Future and Emerging Technologies (FET) $ FET Open " ,
verificationDataset . filter ( " code = 'H2020-EU.1.2.1.' " ) . select ( " classification " ) . collectAsList ( )
. get ( 0 ) . getString ( 0 ) ) ;
2020-05-19 18:43:42 +02:00
}
2020-05-19 18:42:50 +02:00
}