2020-05-19 18:43:42 +02:00
2020-05-19 18:42:50 +02:00
package eu.dnetlib.dhp.actionmanager.project ;
2020-05-19 18:43:42 +02:00
import java.io.IOException ;
import java.nio.file.Files ;
import java.nio.file.Path ;
import org.apache.commons.io.FileUtils ;
import org.apache.spark.SparkConf ;
import org.apache.spark.api.java.JavaRDD ;
import org.apache.spark.api.java.JavaSparkContext ;
import org.apache.spark.sql.Dataset ;
import org.apache.spark.sql.Encoders ;
import org.apache.spark.sql.SparkSession ;
import org.junit.jupiter.api.AfterAll ;
import org.junit.jupiter.api.Assertions ;
import org.junit.jupiter.api.BeforeAll ;
import org.junit.jupiter.api.Test ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import com.fasterxml.jackson.databind.ObjectMapper ;
import eu.dnetlib.dhp.actionmanager.project.csvutils.CSVProgramme ;
2020-09-23 17:31:49 +02:00
public class PrepareH2020ProgrammeTest {
2020-05-19 18:43:42 +02:00
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper ( ) ;
2020-09-23 17:31:49 +02:00
private static final ClassLoader cl = PrepareH2020ProgrammeTest . class
2020-05-19 18:43:42 +02:00
. getClassLoader ( ) ;
private static SparkSession spark ;
private static Path workingDir ;
private static final Logger log = LoggerFactory
2020-09-23 17:31:49 +02:00
. getLogger ( PrepareH2020ProgrammeTest . class ) ;
2020-05-19 18:43:42 +02:00
@BeforeAll
public static void beforeAll ( ) throws IOException {
workingDir = Files
2020-09-23 17:31:49 +02:00
. createTempDirectory ( PrepareH2020ProgrammeTest . class . getSimpleName ( ) ) ;
2020-05-19 18:43:42 +02:00
log . info ( " using work dir {} " , workingDir ) ;
SparkConf conf = new SparkConf ( ) ;
2020-09-23 17:31:49 +02:00
conf . setAppName ( PrepareH2020ProgrammeTest . class . getSimpleName ( ) ) ;
2020-05-19 18:43:42 +02:00
conf . setMaster ( " local[*] " ) ;
conf . set ( " spark.driver.host " , " localhost " ) ;
conf . set ( " hive.metastore.local " , " true " ) ;
conf . set ( " spark.ui.enabled " , " false " ) ;
conf . set ( " spark.sql.warehouse.dir " , workingDir . toString ( ) ) ;
conf . set ( " hive.metastore.warehouse.dir " , workingDir . resolve ( " warehouse " ) . toString ( ) ) ;
spark = SparkSession
. builder ( )
2020-09-23 17:31:49 +02:00
. appName ( PrepareH2020ProgrammeTest . class . getSimpleName ( ) )
2020-05-19 18:43:42 +02:00
. config ( conf )
. getOrCreate ( ) ;
}
@AfterAll
public static void afterAll ( ) throws IOException {
FileUtils . deleteDirectory ( workingDir . toFile ( ) ) ;
spark . stop ( ) ;
}
@Test
public void numberDistinctProgrammeTest ( ) throws Exception {
PrepareProgramme
. main (
new String [ ] {
" -isSparkSessionManaged " ,
Boolean . FALSE . toString ( ) ,
" -programmePath " ,
2020-05-20 13:53:32 +02:00
getClass ( ) . getResource ( " /eu/dnetlib/dhp/actionmanager/project/whole_programme.json.gz " ) . getPath ( ) ,
2020-05-19 18:43:42 +02:00
" -outputPath " ,
workingDir . toString ( ) + " /preparedProgramme "
} ) ;
final JavaSparkContext sc = new JavaSparkContext ( spark . sparkContext ( ) ) ;
JavaRDD < CSVProgramme > tmp = sc
. textFile ( workingDir . toString ( ) + " /preparedProgramme " )
. map ( item - > OBJECT_MAPPER . readValue ( item , CSVProgramme . class ) ) ;
Assertions . assertEquals ( 277 , tmp . count ( ) ) ;
Dataset < CSVProgramme > verificationDataset = spark . createDataset ( tmp . rdd ( ) , Encoders . bean ( CSVProgramme . class ) ) ;
2020-09-22 14:38:00 +02:00
Assertions . assertEquals ( 0 , verificationDataset . filter ( " title ='' " ) . count ( ) ) ;
Assertions . assertEquals ( 0 , verificationDataset . filter ( " classification = '' " ) . count ( ) ) ;
2020-09-23 17:31:49 +02:00
Assertions
. assertEquals (
" Societal challenges | Smart, Green And Integrated Transport | CLEANSKY2 | IADP Fast Rotorcraft " ,
verificationDataset
. filter ( " code = 'H2020-EU.3.4.5.3.' " )
. select ( " classification " )
. collectAsList ( )
. get ( 0 )
. getString ( 0 ) ) ;
Assertions
. assertEquals (
" Euratom | Indirect actions | European Fusion Development Agreement " ,
verificationDataset
. filter ( " code = 'H2020-Euratom-1.9.' " )
. select ( " classification " )
. collectAsList ( )
. get ( 0 )
. getString ( 0 ) ) ;
Assertions
. assertEquals (
" Industrial leadership | Leadership in enabling and industrial technologies | Advanced manufacturing and processing | New sustainable business models " ,
verificationDataset
. filter ( " code = 'H2020-EU.2.1.5.4.' " )
. select ( " classification " )
. collectAsList ( )
. get ( 0 )
. getString ( 0 ) ) ;
Assertions
. assertEquals (
" Excellent science | Future and Emerging Technologies (FET) | FET Open " ,
verificationDataset
. filter ( " code = 'H2020-EU.1.2.1.' " )
. select ( " classification " )
. collectAsList ( )
. get ( 0 )
. getString ( 0 ) ) ;
2020-09-22 14:38:00 +02:00
2020-05-19 18:43:42 +02:00
}
2020-05-19 18:42:50 +02:00
}