improved unit tests in dhp-aggregation

This commit is contained in:
Claudio Atzori 2020-05-05 12:39:04 +02:00
parent 4a8487165c
commit 0825321d0b
6 changed files with 368 additions and 203 deletions

View File

@ -1,17 +1,21 @@
package eu.dnetlib.dhp.collection; package eu.dnetlib.dhp.collection;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional;
import org.apache.commons.cli.*; import org.apache.commons.cli.*;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
@ -23,6 +27,8 @@ import org.apache.spark.util.LongAccumulator;
import org.dom4j.Document; import org.dom4j.Document;
import org.dom4j.Node; import org.dom4j.Node;
import org.dom4j.io.SAXReader; import org.dom4j.io.SAXReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
@ -35,6 +41,8 @@ import eu.dnetlib.message.MessageType;
public class GenerateNativeStoreSparkJob { public class GenerateNativeStoreSparkJob {
private static final Logger log = LoggerFactory.getLogger(GenerateNativeStoreSparkJob.class);
public static MetadataRecord parseRecord( public static MetadataRecord parseRecord(
final String input, final String input,
final String xpath, final String xpath,
@ -78,84 +86,90 @@ public class GenerateNativeStoreSparkJob {
final Provenance provenance = jsonMapper.readValue(parser.get("provenance"), Provenance.class); final Provenance provenance = jsonMapper.readValue(parser.get("provenance"), Provenance.class);
final long dateOfCollection = new Long(parser.get("dateOfCollection")); final long dateOfCollection = new Long(parser.get("dateOfCollection"));
final SparkSession spark = SparkSession Boolean isSparkSessionManaged = Optional
.builder() .ofNullable(parser.get("isSparkSessionManaged"))
.appName("GenerateNativeStoreSparkJob") .map(Boolean::valueOf)
.master(parser.get("master")) .orElse(Boolean.TRUE);
.getOrCreate(); log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final Map<String, String> ongoingMap = new HashMap<>(); final Map<String, String> ongoingMap = new HashMap<>();
final Map<String, String> reportMap = new HashMap<>(); final Map<String, String> reportMap = new HashMap<>();
final boolean test = parser.get("isTest") == null ? false : Boolean.valueOf(parser.get("isTest")); final boolean test = parser.get("isTest") == null ? false : Boolean.valueOf(parser.get("isTest"));
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
final JavaPairRDD<IntWritable, Text> inputRDD = sc final JavaPairRDD<IntWritable, Text> inputRDD = sc
.sequenceFile(parser.get("input"), IntWritable.class, Text.class); .sequenceFile(parser.get("input"), IntWritable.class, Text.class);
final LongAccumulator totalItems = sc.sc().longAccumulator("TotalItems"); final LongAccumulator totalItems = sc.sc().longAccumulator("TotalItems");
final LongAccumulator invalidRecords = sc.sc().longAccumulator("InvalidRecords");
final LongAccumulator invalidRecords = sc.sc().longAccumulator("InvalidRecords"); final MessageManager manager = new MessageManager(
parser.get("rabbitHost"),
parser.get("rabbitUser"),
parser.get("rabbitPassword"),
false,
false,
null);
final MessageManager manager = new MessageManager( final JavaRDD<MetadataRecord> mappeRDD = inputRDD
parser.get("rabbitHost"), .map(
parser.get("rabbitUser"), item -> parseRecord(
parser.get("rabbitPassword"), item._2().toString(),
false, parser.get("xpath"),
false, parser.get("encoding"),
null); provenance,
dateOfCollection,
totalItems,
invalidRecords))
.filter(Objects::nonNull)
.distinct();
final JavaRDD<MetadataRecord> mappeRDD = inputRDD ongoingMap.put("ongoing", "0");
.map( if (!test) {
item -> parseRecord( manager
item._2().toString(), .sendMessage(
parser.get("xpath"), new Message(
parser.get("encoding"), parser.get("workflowId"), "DataFrameCreation", MessageType.ONGOING, ongoingMap),
provenance, parser.get("rabbitOngoingQueue"),
dateOfCollection, true,
totalItems, false);
invalidRecords)) }
.filter(Objects::nonNull)
.distinct();
ongoingMap.put("ongoing", "0"); final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
if (!test) { final Dataset<MetadataRecord> mdstore = spark.createDataset(mappeRDD.rdd(), encoder);
manager final LongAccumulator mdStoreRecords = sc.sc().longAccumulator("MDStoreRecords");
.sendMessage( mdStoreRecords.add(mdstore.count());
new Message( ongoingMap.put("ongoing", "" + totalItems.value());
parser.get("workflowId"), "DataFrameCreation", MessageType.ONGOING, ongoingMap), if (!test) {
parser.get("rabbitOngoingQueue"), manager
true, .sendMessage(
false); new Message(
} parser.get("workflowId"), "DataFrameCreation", MessageType.ONGOING, ongoingMap),
parser.get("rabbitOngoingQueue"),
true,
false);
}
mdstore.write().format("parquet").save(parser.get("output"));
reportMap.put("inputItem", "" + totalItems.value());
reportMap.put("invalidRecords", "" + invalidRecords.value());
reportMap.put("mdStoreSize", "" + mdStoreRecords.value());
if (!test) {
manager
.sendMessage(
new Message(parser.get("workflowId"), "Collection", MessageType.REPORT, reportMap),
parser.get("rabbitReportQueue"),
true,
false);
manager.close();
}
});
final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
final Dataset<MetadataRecord> mdstore = spark.createDataset(mappeRDD.rdd(), encoder);
final LongAccumulator mdStoreRecords = sc.sc().longAccumulator("MDStoreRecords");
mdStoreRecords.add(mdstore.count());
ongoingMap.put("ongoing", "" + totalItems.value());
if (!test) {
manager
.sendMessage(
new Message(
parser.get("workflowId"), "DataFrameCreation", MessageType.ONGOING, ongoingMap),
parser.get("rabbitOngoingQueue"),
true,
false);
}
mdstore.write().format("parquet").save(parser.get("output"));
reportMap.put("inputItem", "" + totalItems.value());
reportMap.put("invalidRecords", "" + invalidRecords.value());
reportMap.put("mdStoreSize", "" + mdStoreRecords.value());
if (!test) {
manager
.sendMessage(
new Message(parser.get("workflowId"), "Collection", MessageType.REPORT, reportMap),
parser.get("rabbitReportQueue"),
true,
false);
manager.close();
}
} }
} }

View File

@ -1,13 +1,17 @@
package eu.dnetlib.dhp.transformation; package eu.dnetlib.dhp.transformation;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional;
import org.apache.commons.cli.*; import org.apache.commons.cli.*;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
@ -17,8 +21,11 @@ import org.dom4j.Document;
import org.dom4j.DocumentException; import org.dom4j.DocumentException;
import org.dom4j.Node; import org.dom4j.Node;
import org.dom4j.io.SAXReader; import org.dom4j.io.SAXReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord; import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.transformation.vocabulary.Vocabulary; import eu.dnetlib.dhp.transformation.vocabulary.Vocabulary;
import eu.dnetlib.dhp.transformation.vocabulary.VocabularyHelper; import eu.dnetlib.dhp.transformation.vocabulary.VocabularyHelper;
@ -29,6 +36,8 @@ import eu.dnetlib.message.MessageType;
public class TransformSparkJobNode { public class TransformSparkJobNode {
private static final Logger log = LoggerFactory.getLogger(TransformSparkJobNode.class);
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -40,12 +49,18 @@ public class TransformSparkJobNode {
parser.parseArgument(args); parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("input"); final String inputPath = parser.get("input");
final String outputPath = parser.get("output"); final String outputPath = parser.get("output");
final String workflowId = parser.get("workflowId"); final String workflowId = parser.get("workflowId");
final String trasformationRule = extractXSLTFromTR( final String trasformationRule = extractXSLTFromTR(
Objects.requireNonNull(DHPUtils.decompressString(parser.get("transformationRule")))); Objects.requireNonNull(DHPUtils.decompressString(parser.get("transformationRule"))));
final String master = parser.get("master");
final String rabbitUser = parser.get("rabbitUser"); final String rabbitUser = parser.get("rabbitUser");
final String rabbitPassword = parser.get("rabbitPassword"); final String rabbitPassword = parser.get("rabbitPassword");
final String rabbitHost = parser.get("rabbitHost"); final String rabbitHost = parser.get("rabbitHost");
@ -53,46 +68,48 @@ public class TransformSparkJobNode {
final long dateOfCollection = new Long(parser.get("dateOfCollection")); final long dateOfCollection = new Long(parser.get("dateOfCollection"));
final boolean test = parser.get("isTest") == null ? false : Boolean.valueOf(parser.get("isTest")); final boolean test = parser.get("isTest") == null ? false : Boolean.valueOf(parser.get("isTest"));
final SparkSession spark = SparkSession SparkConf conf = new SparkConf();
.builder() runWithSparkSession(
.appName("TransformStoreSparkJob") conf,
.master(master) isSparkSessionManaged,
.getOrCreate(); spark -> {
final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
final Dataset<MetadataRecord> mdstoreInput = spark.read().format("parquet").load(inputPath).as(encoder);
final LongAccumulator totalItems = spark.sparkContext().longAccumulator("TotalItems");
final LongAccumulator errorItems = spark.sparkContext().longAccumulator("errorItems");
final LongAccumulator transformedItems = spark.sparkContext().longAccumulator("transformedItems");
final Map<String, Vocabulary> vocabularies = new HashMap<>();
vocabularies.put("dnet:languages", VocabularyHelper.getVocabularyFromAPI("dnet:languages"));
final TransformFunction transformFunction = new TransformFunction(
totalItems,
errorItems,
transformedItems,
trasformationRule,
dateOfCollection,
vocabularies);
mdstoreInput.map(transformFunction, encoder).write().format("parquet").save(outputPath);
if (rabbitHost != null) {
System.out.println("SEND FINAL REPORT");
final Map<String, String> reportMap = new HashMap<>();
reportMap.put("inputItem", "" + totalItems.value());
reportMap.put("invalidRecords", "" + errorItems.value());
reportMap.put("mdStoreSize", "" + transformedItems.value());
System.out.println(new Message(workflowId, "Transform", MessageType.REPORT, reportMap));
if (!test) {
final MessageManager manager = new MessageManager(rabbitHost, rabbitUser, rabbitPassword, false,
false,
null);
manager
.sendMessage(
new Message(workflowId, "Transform", MessageType.REPORT, reportMap),
rabbitReportQueue,
true,
false);
manager.close();
}
}
});
final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
final Dataset<MetadataRecord> mdstoreInput = spark.read().format("parquet").load(inputPath).as(encoder);
final LongAccumulator totalItems = spark.sparkContext().longAccumulator("TotalItems");
final LongAccumulator errorItems = spark.sparkContext().longAccumulator("errorItems");
final LongAccumulator transformedItems = spark.sparkContext().longAccumulator("transformedItems");
final Map<String, Vocabulary> vocabularies = new HashMap<>();
vocabularies.put("dnet:languages", VocabularyHelper.getVocabularyFromAPI("dnet:languages"));
final TransformFunction transformFunction = new TransformFunction(
totalItems,
errorItems,
transformedItems,
trasformationRule,
dateOfCollection,
vocabularies);
mdstoreInput.map(transformFunction, encoder).write().format("parquet").save(outputPath);
if (rabbitHost != null) {
System.out.println("SEND FINAL REPORT");
final Map<String, String> reportMap = new HashMap<>();
reportMap.put("inputItem", "" + totalItems.value());
reportMap.put("invalidRecords", "" + errorItems.value());
reportMap.put("mdStoreSize", "" + transformedItems.value());
System.out.println(new Message(workflowId, "Transform", MessageType.REPORT, reportMap));
if (!test) {
final MessageManager manager = new MessageManager(rabbitHost, rabbitUser, rabbitPassword, false, false,
null);
manager
.sendMessage(
new Message(workflowId, "Transform", MessageType.REPORT, reportMap),
rabbitReportQueue,
true,
false);
manager.close();
}
}
} }
private static String extractXSLTFromTR(final String tr) throws DocumentException { private static String extractXSLTFromTR(final String tr) throws DocumentException {

View File

@ -1,16 +1,86 @@
[ [
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, {
{"paramName":"e", "paramLongName":"encoding", "paramDescription": "the encoding of the input record should be JSON or XML", "paramRequired": true}, "paramName": "issm",
{"paramName":"d", "paramLongName":"dateOfCollection", "paramDescription": "the date when the record has been stored", "paramRequired": true}, "paramLongName": "isSparkSessionManaged",
{"paramName":"p", "paramLongName":"provenance", "paramDescription": "the infos about the provenance of the collected records", "paramRequired": true}, "paramDescription": "when true will stop SparkSession after job execution",
{"paramName":"x", "paramLongName":"xpath", "paramDescription": "the xpath to identify the record ifentifier", "paramRequired": true}, "paramRequired": false
{"paramName":"i", "paramLongName":"input", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}, },
{"paramName":"o", "paramLongName":"output", "paramDescription": "the path of the result DataFrame on HDFS", "paramRequired": true}, {
{"paramName":"ru", "paramLongName":"rabbitUser", "paramDescription": "the user to connect with RabbitMq for messaging", "paramRequired": true}, "paramName": "e",
{"paramName":"rp", "paramLongName":"rabbitPassword", "paramDescription": "the password to connect with RabbitMq for messaging", "paramRequired": true}, "paramLongName": "encoding",
{"paramName":"rh", "paramLongName":"rabbitHost", "paramDescription": "the host of the RabbitMq server", "paramRequired": true}, "paramDescription": "the encoding of the input record should be JSON or XML",
{"paramName":"ro", "paramLongName":"rabbitOngoingQueue", "paramDescription": "the name of the ongoing queue", "paramRequired": true}, "paramRequired": true
{"paramName":"rr", "paramLongName":"rabbitReportQueue", "paramDescription": "the name of the report queue", "paramRequired": true}, },
{"paramName":"w", "paramLongName":"workflowId", "paramDescription": "the identifier of the dnet Workflow", "paramRequired": true}, {
{"paramName":"t", "paramLongName":"isTest", "paramDescription": "the name of the report queue", "paramRequired": false} "paramName": "d",
"paramLongName": "dateOfCollection",
"paramDescription": "the date when the record has been stored",
"paramRequired": true
},
{
"paramName": "p",
"paramLongName": "provenance",
"paramDescription": "the infos about the provenance of the collected records",
"paramRequired": true
},
{
"paramName": "x",
"paramLongName": "xpath",
"paramDescription": "the xpath to identify the record identifier",
"paramRequired": true
},
{
"paramName": "i",
"paramLongName": "input",
"paramDescription": "the path of the sequencial file to read",
"paramRequired": true
},
{
"paramName": "o",
"paramLongName": "output",
"paramDescription": "the path of the result DataFrame on HDFS",
"paramRequired": true
},
{
"paramName": "ru",
"paramLongName": "rabbitUser",
"paramDescription": "the user to connect with RabbitMq for messaging",
"paramRequired": true
},
{
"paramName": "rp",
"paramLongName": "rabbitPassword",
"paramDescription": "the password to connect with RabbitMq for messaging",
"paramRequired": true
},
{
"paramName": "rh",
"paramLongName": "rabbitHost",
"paramDescription": "the host of the RabbitMq server",
"paramRequired": true
},
{
"paramName": "ro",
"paramLongName": "rabbitOngoingQueue",
"paramDescription": "the name of the ongoing queue",
"paramRequired": true
},
{
"paramName": "rr",
"paramLongName": "rabbitReportQueue",
"paramDescription": "the name of the report queue",
"paramRequired": true
},
{
"paramName": "w",
"paramLongName": "workflowId",
"paramDescription": "the identifier of the dnet Workflow",
"paramRequired": true
},
{
"paramName": "t",
"paramLongName": "isTest",
"paramDescription": "the name of the report queue",
"paramRequired": false
}
] ]

View File

@ -1,16 +1,74 @@
[ [
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, {
{"paramName":"d", "paramLongName":"dateOfCollection", "paramDescription": "the date when the record has been stored", "paramRequired": true}, "paramName": "issm",
{"paramName":"i", "paramLongName":"input", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}, "paramLongName": "isSparkSessionManaged",
{"paramName":"o", "paramLongName":"output", "paramDescription": "the path of the result DataFrame on HDFS", "paramRequired": true}, "paramDescription": "when true will stop SparkSession after job execution",
{"paramName":"w", "paramLongName":"workflowId", "paramDescription": "the identifier of the dnet Workflow", "paramRequired": true}, "paramRequired": false
{"paramName":"tr", "paramLongName":"transformationRule","paramDescription": "the transformation Rule to apply to the input MDStore", "paramRequired": true}, },
{"paramName":"ru", "paramLongName":"rabbitUser", "paramDescription": "the user to connect with RabbitMq for messaging", "paramRequired": true}, {
{"paramName":"rp", "paramLongName":"rabbitPassword", "paramDescription": "the password to connect with RabbitMq for messaging", "paramRequired": true}, "paramName": "d",
{"paramName":"rh", "paramLongName":"rabbitHost", "paramDescription": "the host of the RabbitMq server", "paramRequired": true}, "paramLongName": "dateOfCollection",
{"paramName":"ro", "paramLongName":"rabbitOngoingQueue", "paramDescription": "the name of the ongoing queue", "paramRequired": true}, "paramDescription": "the date when the record has been stored",
{"paramName":"rr", "paramLongName":"rabbitReportQueue", "paramDescription": "the name of the report queue", "paramRequired": true}, "paramRequired": true
{"paramName":"t", "paramLongName":"isTest", "paramDescription": "the name of the report queue", "paramRequired": false} },
{
"paramName": "i",
"paramLongName": "input",
"paramDescription": "the path of the sequencial file to read",
"paramRequired": true
},
{
"paramName": "o",
"paramLongName": "output",
"paramDescription": "the path of the result DataFrame on HDFS",
"paramRequired": true
},
{
"paramName": "w",
"paramLongName": "workflowId",
"paramDescription": "the identifier of the dnet Workflow",
"paramRequired": true
},
{
"paramName": "tr",
"paramLongName": "transformationRule",
"paramDescription": "the transformation Rule to apply to the input MDStore",
"paramRequired": true
},
{
"paramName": "ru",
"paramLongName": "rabbitUser",
"paramDescription": "the user to connect with RabbitMq for messaging",
"paramRequired": true
},
{
"paramName": "rp",
"paramLongName": "rabbitPassword",
"paramDescription": "the password to connect with RabbitMq for messaging",
"paramRequired": true
},
{
"paramName": "rh",
"paramLongName": "rabbitHost",
"paramDescription": "the host of the RabbitMq server",
"paramRequired": true
},
{
"paramName": "ro",
"paramLongName": "rabbitOngoingQueue",
"paramDescription": "the name of the ongoing queue",
"paramRequired": true
},
{
"paramName": "rr",
"paramLongName": "rabbitReportQueue",
"paramDescription": "the name of the report queue",
"paramRequired": true
},
{
"paramName": "t",
"paramLongName": "isTest",
"paramDescription": "the name of the report queue",
"paramRequired": false
}
] ]

View File

@ -9,65 +9,60 @@ import java.nio.file.Path;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.AfterEach; import org.apache.spark.SparkConf;
import org.junit.jupiter.api.BeforeEach; import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.*;
import org.junit.jupiter.api.io.TempDir;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord; import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.model.mdstore.Provenance; import eu.dnetlib.dhp.model.mdstore.Provenance;
import eu.dnetlib.dhp.schema.common.ModelSupport;
public class CollectionJobTest { public class CollectionJobTest {
private Path testDir; private static SparkSession spark;
@BeforeEach @BeforeAll
public void setup() throws IOException { public static void beforeAll() {
testDir = Files.createTempDirectory("dhp-collection"); SparkConf conf = new SparkConf();
conf.setAppName(CollectionJobTest.class.getSimpleName());
conf.setMaster("local");
spark = SparkSession.builder().config(conf).getOrCreate();
} }
@AfterEach @AfterAll
public void teadDown() throws IOException { public static void afterAll() {
FileUtils.deleteDirectory(testDir.toFile()); spark.stop();
} }
@Test @Test
public void tesCollection() throws Exception { public void tesCollection(@TempDir Path testDir) throws Exception {
final Provenance provenance = new Provenance("pippo", "puppa", "ns_prefix"); final Provenance provenance = new Provenance("pippo", "puppa", "ns_prefix");
Assertions.assertNotNull(new ObjectMapper().writeValueAsString(provenance));
GenerateNativeStoreSparkJob GenerateNativeStoreSparkJob
.main( .main(
new String[] { new String[] {
"-mt", "issm", "true",
"local", "-w", "wid",
"-w", "-e", "XML",
"wid", "-d", "" + System.currentTimeMillis(),
"-e", "-p", new ObjectMapper().writeValueAsString(provenance),
"XML", "-x", "./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']",
"-d", "-i", this.getClass().getResource("/eu/dnetlib/dhp/collection/native.seq").toString(),
"" + System.currentTimeMillis(), "-o", testDir.toString() + "/store",
"-p", "-t", "true",
new ObjectMapper().writeValueAsString(provenance), "-ru", "",
"-x", "-rp", "",
"./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']", "-rh", "",
"-i", "-ro", "",
this.getClass().getResource("/eu/dnetlib/dhp/collection/native.seq").toString(), "-rr", ""
"-o",
testDir.toString() + "/store",
"-t",
"true",
"-ru",
"",
"-rp",
"",
"-rh",
"",
"-ro",
"",
"-rr",
""
}); });
System.out.println(new ObjectMapper().writeValueAsString(provenance));
// TODO introduce useful assertions
} }
@Test @Test
@ -85,9 +80,8 @@ public class CollectionJobTest {
null, null,
null); null);
assert record != null; assertNotNull(record.getId());
System.out.println(record.getId()); assertNotNull(record.getOriginalId());
System.out.println(record.getOriginalId());
} }
@Test @Test
@ -112,10 +106,12 @@ public class CollectionJobTest {
System.currentTimeMillis(), System.currentTimeMillis(),
null, null,
null); null);
assert record != null;
record.setBody("ciao"); record.setBody("ciao");
assert record1 != null;
record1.setBody("mondo"); record1.setBody("mondo");
assertNotNull(record);
assertNotNull(record1);
assertEquals(record, record1); assertEquals(record, record1);
} }
} }

View File

@ -12,10 +12,14 @@ import java.util.Map;
import javax.xml.transform.stream.StreamSource; import javax.xml.transform.stream.StreamSource;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator; import org.apache.spark.util.LongAccumulator;
import org.dom4j.Document; import org.dom4j.Document;
import org.dom4j.Node; import org.dom4j.Node;
import org.dom4j.io.SAXReader; import org.dom4j.io.SAXReader;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
@ -23,6 +27,7 @@ import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import eu.dnetlib.dhp.collection.CollectionJobTest;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord; import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.transformation.functions.Cleaner; import eu.dnetlib.dhp.transformation.functions.Cleaner;
import eu.dnetlib.dhp.transformation.vocabulary.Vocabulary; import eu.dnetlib.dhp.transformation.vocabulary.Vocabulary;
@ -33,6 +38,21 @@ import net.sf.saxon.s9api.*;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
public class TransformationJobTest { public class TransformationJobTest {
private static SparkSession spark;
@BeforeAll
public static void beforeAll() {
SparkConf conf = new SparkConf();
conf.setAppName(CollectionJobTest.class.getSimpleName());
conf.setMaster("local");
spark = SparkSession.builder().config(conf).getOrCreate();
}
@AfterAll
public static void afterAll() {
spark.stop();
}
@Mock @Mock
private LongAccumulator accumulator; private LongAccumulator accumulator;
@ -78,31 +98,21 @@ public class TransformationJobTest {
TransformSparkJobNode TransformSparkJobNode
.main( .main(
new String[] { new String[] {
"-mt", "-issm", "true",
"local", "-i", mdstore_input,
"-i", "-o", mdstore_output,
mdstore_input, "-d", "1",
"-o", "-w", "1",
mdstore_output, "-tr", xslt,
"-d", "-t", "true",
"1", "-ru", "",
"-w", "-rp", "",
"1", "-rh", "",
"-tr", "-ro", "",
xslt, "-rr", ""
"-t",
"true",
"-ru",
"",
"-rp",
"",
"-rh",
"",
"-ro",
"",
"-rr",
""
}); });
// TODO introduce useful assertions
} }
@Test @Test