forked from D-Net/dnet-hadoop
fixed test
This commit is contained in:
parent
3ea8c328ac
commit
69c253710b
|
@ -0,0 +1,52 @@
|
||||||
|
package eu.dnetlib.dhp.aggregation;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
||||||
|
import eu.dnetlib.dhp.transformation.TransformationFactory;
|
||||||
|
import eu.dnetlib.dhp.transformation.TransformationJobTest;
|
||||||
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||||
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.mockito.Mock;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.mockito.Mockito.lenient;
|
||||||
|
|
||||||
|
public abstract class AbstractVocabularyTest {
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
protected ISLookUpService isLookUpService;
|
||||||
|
|
||||||
|
protected VocabularyGroup vocabularies;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
public void setUpVocabulary() throws ISLookUpException, IOException {
|
||||||
|
lenient().when(isLookUpService.quickSearchProfile(VocabularyGroup.VOCABULARIES_XQUERY)).thenReturn(vocs());
|
||||||
|
|
||||||
|
lenient()
|
||||||
|
.when(isLookUpService.quickSearchProfile(VocabularyGroup.VOCABULARY_SYNONYMS_XQUERY))
|
||||||
|
.thenReturn(synonyms());
|
||||||
|
vocabularies = VocabularyGroup.loadVocsFromIS(isLookUpService);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<String> vocs() throws IOException {
|
||||||
|
return IOUtils
|
||||||
|
.readLines(TransformationJobTest.class.getResourceAsStream("/eu/dnetlib/dhp/transform/terms.txt"));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<String> synonyms() throws IOException {
|
||||||
|
return IOUtils
|
||||||
|
.readLines(TransformationJobTest.class.getResourceAsStream("/eu/dnetlib/dhp/transform/synonyms.txt"));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void mockupTrasformationRule(final String trule, final String path) throws Exception {
|
||||||
|
final String trValue = IOUtils.toString(this.getClass().getResourceAsStream(path));
|
||||||
|
|
||||||
|
lenient()
|
||||||
|
.when(isLookUpService.quickSearchProfile(String.format(TransformationFactory.TRULE_XQUERY, trule)))
|
||||||
|
.thenReturn(Collections.singletonList(trValue));
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,44 +1,47 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.aggregation;
|
package eu.dnetlib.dhp.aggregation;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
|
||||||
|
import eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob;
|
||||||
|
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
|
||||||
|
import eu.dnetlib.dhp.transformation.TransformSparkJobNode;
|
||||||
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.hadoop.io.IntWritable;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.api.java.function.FilterFunction;
|
||||||
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
|
import org.apache.spark.sql.Dataset;
|
||||||
|
import org.apache.spark.sql.Encoder;
|
||||||
|
import org.apache.spark.sql.Encoders;
|
||||||
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
import org.junit.jupiter.api.*;
|
||||||
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.io.FileReader;
|
import java.io.FileReader;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import org.apache.commons.io.FileUtils;
|
import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.MDSTORE_DATA_PATH;
|
||||||
import org.apache.commons.io.IOUtils;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import org.apache.hadoop.io.IntWritable;
|
|
||||||
import org.apache.hadoop.io.Text;
|
|
||||||
import org.apache.spark.SparkConf;
|
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
|
||||||
import org.apache.spark.sql.Dataset;
|
|
||||||
import org.apache.spark.sql.Encoder;
|
|
||||||
import org.apache.spark.sql.Encoders;
|
|
||||||
import org.apache.spark.sql.SparkSession;
|
|
||||||
import org.junit.jupiter.api.*;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
|
|
||||||
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
|
|
||||||
import eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob;
|
|
||||||
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
|
||||||
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
|
|
||||||
import eu.dnetlib.dhp.transformation.TransformSparkJobNode;
|
|
||||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
|
||||||
|
|
||||||
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
|
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
|
||||||
public class AggregationJobTest {
|
@ExtendWith(MockitoExtension.class)
|
||||||
|
public class AggregationJobTest extends AbstractVocabularyTest{
|
||||||
|
|
||||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
|
@ -55,6 +58,8 @@ public class AggregationJobTest {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(AggregationJobTest.class);
|
private static final Logger log = LoggerFactory.getLogger(AggregationJobTest.class);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@BeforeAll
|
@BeforeAll
|
||||||
public static void beforeAll() throws IOException {
|
public static void beforeAll() throws IOException {
|
||||||
provenance = IOUtils
|
provenance = IOUtils
|
||||||
|
@ -81,6 +86,8 @@ public class AggregationJobTest {
|
||||||
.getOrCreate();
|
.getOrCreate();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@AfterAll
|
@AfterAll
|
||||||
public static void afterAll() throws IOException {
|
public static void afterAll() throws IOException {
|
||||||
FileUtils.deleteDirectory(workingDir.toFile());
|
FileUtils.deleteDirectory(workingDir.toFile());
|
||||||
|
@ -149,19 +156,45 @@ public class AggregationJobTest {
|
||||||
@Order(3)
|
@Order(3)
|
||||||
public void testTransformSparkJob() throws Exception {
|
public void testTransformSparkJob() throws Exception {
|
||||||
|
|
||||||
|
setUpVocabulary();
|
||||||
|
|
||||||
MDStoreVersion mdStoreV2 = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_2.json");
|
MDStoreVersion mdStoreV2 = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_2.json");
|
||||||
MDStoreVersion mdStoreCleanedVersion = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreCleanedVersion.json");
|
MDStoreVersion mdStoreCleanedVersion = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreCleanedVersion.json");
|
||||||
|
|
||||||
TransformSparkJobNode.main(new String[] {
|
|
||||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
mockupTrasformationRule("simpleTRule", "/eu/dnetlib/dhp/transform/ext_simple.xsl");
|
||||||
"-dateOfTransformation", dateOfCollection,
|
|
||||||
"-mdstoreInputVersion", OBJECT_MAPPER.writeValueAsString(mdStoreV2),
|
final Map<String, String> parameters = Stream.of(new String[][] {
|
||||||
"-mdstoreOutputVersion", OBJECT_MAPPER.writeValueAsString(mdStoreCleanedVersion),
|
{
|
||||||
"-transformationPlugin", "XSLT_TRANSFORM",
|
"dateOfTransformation", "1234"
|
||||||
"-isLookupUrl", "https://dev-openaire.d4science.org/is/services/isLookUp",
|
},
|
||||||
"-transformationRuleId",
|
{
|
||||||
"183dde52-a69b-4db9-a07e-1ef2be105294_VHJhbnNmb3JtYXRpb25SdWxlRFNSZXNvdXJjZXMvVHJhbnNmb3JtYXRpb25SdWxlRFNSZXNvdXJjZVR5cGU="
|
"transformationPlugin", "XSLT_TRANSFORM"
|
||||||
});
|
},
|
||||||
|
{
|
||||||
|
"transformationRuleId", "simpleTRule"
|
||||||
|
},
|
||||||
|
|
||||||
|
}).collect(Collectors.toMap(data -> data[0], data -> data[1]));
|
||||||
|
|
||||||
|
TransformSparkJobNode.transformRecords(parameters, isLookUpService, spark, mdStoreV2.getHdfsPath()+MDSTORE_DATA_PATH, mdStoreCleanedVersion.getHdfsPath());
|
||||||
|
|
||||||
|
final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
|
||||||
|
final Dataset<MetadataRecord> mOutput = spark.read().format("parquet").load(mdStoreCleanedVersion.getHdfsPath()+MDSTORE_DATA_PATH).as(encoder);
|
||||||
|
|
||||||
|
final Long total = mOutput.count();
|
||||||
|
|
||||||
|
final long recordTs = mOutput
|
||||||
|
.filter((FilterFunction<MetadataRecord>) p -> p.getDateOfTransformation() == 1234)
|
||||||
|
.count();
|
||||||
|
|
||||||
|
final long recordNotEmpty = mOutput
|
||||||
|
.filter((FilterFunction<MetadataRecord>) p -> !StringUtils.isBlank(p.getBody()))
|
||||||
|
.count();
|
||||||
|
|
||||||
|
assertEquals(total, recordTs);
|
||||||
|
|
||||||
|
assertEquals(total, recordNotEmpty);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,20 +1,12 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.transformation;
|
package eu.dnetlib.dhp.transformation;
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest;
|
||||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
import eu.dnetlib.dhp.aggregation.common.AggregationCounter;
|
||||||
import static org.mockito.Mockito.lenient;
|
import eu.dnetlib.dhp.collection.CollectionJobTest;
|
||||||
|
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
|
||||||
import java.io.IOException;
|
import eu.dnetlib.dhp.transformation.xslt.XSLTTransformationFunction;
|
||||||
import java.io.StringWriter;
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||||
import java.nio.file.Files;
|
|
||||||
import java.nio.file.Path;
|
|
||||||
import java.util.*;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
import java.util.stream.Stream;
|
|
||||||
|
|
||||||
import javax.xml.transform.stream.StreamSource;
|
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
|
@ -24,52 +16,42 @@ import org.apache.spark.sql.Encoder;
|
||||||
import org.apache.spark.sql.Encoders;
|
import org.apache.spark.sql.Encoders;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
import org.apache.spark.util.LongAccumulator;
|
import org.apache.spark.util.LongAccumulator;
|
||||||
import org.dom4j.Document;
|
|
||||||
import org.dom4j.Node;
|
|
||||||
import org.dom4j.io.SAXReader;
|
|
||||||
import org.junit.jupiter.api.*;
|
import org.junit.jupiter.api.*;
|
||||||
import org.junit.jupiter.api.extension.ExtendWith;
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
import org.junit.jupiter.api.io.TempDir;
|
import org.junit.jupiter.api.io.TempDir;
|
||||||
import org.mockito.Mock;
|
|
||||||
import org.mockito.junit.jupiter.MockitoExtension;
|
import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.aggregation.common.AggregationCounter;
|
import java.io.IOException;
|
||||||
import eu.dnetlib.dhp.collection.CollectionJobTest;
|
import java.nio.file.Files;
|
||||||
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
import java.nio.file.Path;
|
||||||
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
|
import java.util.Collections;
|
||||||
import eu.dnetlib.dhp.transformation.xslt.XSLTTransformationFunction;
|
import java.util.Map;
|
||||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
import java.util.stream.Collectors;
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
import java.util.stream.Stream;
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
|
||||||
|
import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.MDSTORE_DATA_PATH;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.mockito.Mockito.lenient;
|
||||||
|
|
||||||
@ExtendWith(MockitoExtension.class)
|
@ExtendWith(MockitoExtension.class)
|
||||||
public class TransformationJobTest {
|
public class TransformationJobTest extends AbstractVocabularyTest {
|
||||||
|
|
||||||
private static SparkSession spark;
|
private static SparkSession spark;
|
||||||
|
|
||||||
@Mock
|
|
||||||
private ISLookUpService isLookUpService;
|
|
||||||
|
|
||||||
private VocabularyGroup vocabularies;
|
|
||||||
|
|
||||||
@BeforeEach
|
|
||||||
public void setUp() throws ISLookUpException, IOException {
|
|
||||||
lenient().when(isLookUpService.quickSearchProfile(VocabularyGroup.VOCABULARIES_XQUERY)).thenReturn(vocs());
|
|
||||||
|
|
||||||
lenient()
|
|
||||||
.when(isLookUpService.quickSearchProfile(VocabularyGroup.VOCABULARY_SYNONYMS_XQUERY))
|
|
||||||
.thenReturn(synonyms());
|
|
||||||
vocabularies = VocabularyGroup.loadVocsFromIS(isLookUpService);
|
|
||||||
}
|
|
||||||
|
|
||||||
@BeforeAll
|
@BeforeAll
|
||||||
public static void beforeAll() {
|
public static void beforeAll() throws IOException, ISLookUpException {
|
||||||
SparkConf conf = new SparkConf();
|
SparkConf conf = new SparkConf();
|
||||||
conf.setAppName(CollectionJobTest.class.getSimpleName());
|
conf.setAppName(CollectionJobTest.class.getSimpleName());
|
||||||
conf.setMaster("local");
|
conf.setMaster("local");
|
||||||
spark = SparkSession.builder().config(conf).getOrCreate();
|
spark = SparkSession.builder().config(conf).getOrCreate();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
public void setUp() throws IOException, ISLookUpException {
|
||||||
|
setUpVocabulary();
|
||||||
|
}
|
||||||
|
|
||||||
@AfterAll
|
@AfterAll
|
||||||
public static void afterAll() {
|
public static void afterAll() {
|
||||||
spark.stop();
|
spark.stop();
|
||||||
|
@ -101,8 +83,6 @@ public class TransformationJobTest {
|
||||||
|
|
||||||
mockupTrasformationRule("simpleTRule", "/eu/dnetlib/dhp/transform/ext_simple.xsl");
|
mockupTrasformationRule("simpleTRule", "/eu/dnetlib/dhp/transform/ext_simple.xsl");
|
||||||
|
|
||||||
// final String arguments = "-issm true -i %s -o %s -d 1 -w 1 -tp XSLT_TRANSFORM -tr simpleTRule";
|
|
||||||
|
|
||||||
final Map<String, String> parameters = Stream.of(new String[][] {
|
final Map<String, String> parameters = Stream.of(new String[][] {
|
||||||
{
|
{
|
||||||
"dateOfTransformation", "1234"
|
"dateOfTransformation", "1234"
|
||||||
|
@ -111,7 +91,7 @@ public class TransformationJobTest {
|
||||||
"transformationPlugin", "XSLT_TRANSFORM"
|
"transformationPlugin", "XSLT_TRANSFORM"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"transformationRuleTitle", "simpleTRule"
|
"transformationRuleId", "simpleTRule"
|
||||||
},
|
},
|
||||||
|
|
||||||
}).collect(Collectors.toMap(data -> data[0], data -> data[1]));
|
}).collect(Collectors.toMap(data -> data[0], data -> data[1]));
|
||||||
|
@ -121,7 +101,7 @@ public class TransformationJobTest {
|
||||||
// TODO introduce useful assertions
|
// TODO introduce useful assertions
|
||||||
|
|
||||||
final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
|
final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
|
||||||
final Dataset<MetadataRecord> mOutput = spark.read().format("parquet").load(mdstore_output).as(encoder);
|
final Dataset<MetadataRecord> mOutput = spark.read().format("parquet").load(mdstore_output+MDSTORE_DATA_PATH).as(encoder);
|
||||||
|
|
||||||
final Long total = mOutput.count();
|
final Long total = mOutput.count();
|
||||||
|
|
||||||
|
@ -151,13 +131,7 @@ public class TransformationJobTest {
|
||||||
Files.deleteIfExists(tempDirWithPrefix);
|
Files.deleteIfExists(tempDirWithPrefix);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void mockupTrasformationRule(final String trule, final String path) throws Exception {
|
|
||||||
final String trValue = IOUtils.toString(this.getClass().getResourceAsStream(path));
|
|
||||||
|
|
||||||
lenient()
|
|
||||||
.when(isLookUpService.quickSearchProfile(String.format(TransformationFactory.TRULE_XQUERY, trule)))
|
|
||||||
.thenReturn(Collections.singletonList(trValue));
|
|
||||||
}
|
|
||||||
|
|
||||||
private XSLTTransformationFunction loadTransformationRule(final String path) throws Exception {
|
private XSLTTransformationFunction loadTransformationRule(final String path) throws Exception {
|
||||||
final String trValue = IOUtils.toString(this.getClass().getResourceAsStream(path));
|
final String trValue = IOUtils.toString(this.getClass().getResourceAsStream(path));
|
||||||
|
@ -165,13 +139,5 @@ public class TransformationJobTest {
|
||||||
return new XSLTTransformationFunction(new AggregationCounter(la, la, la), trValue, 0, vocabularies);
|
return new XSLTTransformationFunction(new AggregationCounter(la, la, la), trValue, 0, vocabularies);
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<String> vocs() throws IOException {
|
|
||||||
return IOUtils
|
|
||||||
.readLines(TransformationJobTest.class.getResourceAsStream("/eu/dnetlib/dhp/transform/terms.txt"));
|
|
||||||
}
|
|
||||||
|
|
||||||
private List<String> synonyms() throws IOException {
|
|
||||||
return IOUtils
|
|
||||||
.readLines(TransformationJobTest.class.getResourceAsStream("/eu/dnetlib/dhp/transform/synonyms.txt"));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue