code formatting

This commit is contained in:
Claudio Atzori 2021-02-02 12:34:14 +01:00
parent 75807ea5ae
commit bb89b99b24
6 changed files with 164 additions and 167 deletions

View File

@ -1,15 +1,14 @@
package eu.dnetlib.dhp.aggregation.common; package eu.dnetlib.dhp.aggregation.common;
public class AggregationConstants { public class AggregationConstants {
public static final String SEQUENCE_FILE_NAME = "/sequence_file"; public static final String SEQUENCE_FILE_NAME = "/sequence_file";
public static final String MDSTORE_DATA_PATH = "/store"; public static final String MDSTORE_DATA_PATH = "/store";
public static final String MDSTORE_SIZE_PATH = "/size"; public static final String MDSTORE_SIZE_PATH = "/size";
public static final String CONTENT_TOTALITEMS = "TotalItems";
public static final String CONTENT_INVALIDRECORDS = "InvalidRecords";
public static final String CONTENT_TRANSFORMEDRECORDS = "transformedItems";
public static final String CONTENT_TOTALITEMS = "TotalItems";
public static final String CONTENT_INVALIDRECORDS = "InvalidRecords";
public static final String CONTENT_TRANSFORMEDRECORDS = "transformedItems";
} }

View File

@ -5,7 +5,6 @@ import java.io.BufferedOutputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -15,6 +14,8 @@ import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob; import eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord; import eu.dnetlib.dhp.model.mdstore.MetadataRecord;

View File

@ -1,11 +1,16 @@
package eu.dnetlib.dhp.collection; package eu.dnetlib.dhp.collection;
import com.fasterxml.jackson.databind.ObjectMapper; import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.*;
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion; import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.model.mdstore.Provenance; import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.IntWritable;
@ -22,18 +27,15 @@ import org.dom4j.Node;
import org.dom4j.io.SAXReader; import org.dom4j.io.SAXReader;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.model.mdstore.Provenance;
import scala.Tuple2; import scala.Tuple2;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.Optional;
import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.*;
import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
public class GenerateNativeStoreSparkJob { public class GenerateNativeStoreSparkJob {
private static final Logger log = LoggerFactory.getLogger(GenerateNativeStoreSparkJob.class); private static final Logger log = LoggerFactory.getLogger(GenerateNativeStoreSparkJob.class);

View File

@ -1,19 +1,17 @@
package eu.dnetlib.dhp.transformation; package eu.dnetlib.dhp.transformation;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*;
import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.*; import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.*;
import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.saveDataset;
import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.writeTotalSizeOnHDFS;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import eu.dnetlib.dhp.aggregation.common.AggregationConstants;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
@ -25,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion; import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
import eu.dnetlib.dhp.aggregation.common.AggregationCounter; import eu.dnetlib.dhp.aggregation.common.AggregationCounter;
import eu.dnetlib.dhp.aggregation.common.AggregationUtility;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord; import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
@ -67,7 +64,6 @@ public class TransformSparkJobNode {
final String dateOfTransformation = parser.get("dateOfTransformation"); final String dateOfTransformation = parser.get("dateOfTransformation");
log.info(String.format("dateOfTransformation: %s", dateOfTransformation)); log.info(String.format("dateOfTransformation: %s", dateOfTransformation));
final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl); final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl);
final VocabularyGroup vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService); final VocabularyGroup vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService);
@ -94,15 +90,15 @@ public class TransformSparkJobNode {
final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class); final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
saveDataset( saveDataset(
spark.read() spark
.format("parquet") .read()
.load(inputPath) .format("parquet")
.as(encoder) .load(inputPath)
.map( .as(encoder)
TransformationFactory.getTransformationPlugin(args, ct, isLookUpService), .map(
encoder), TransformationFactory.getTransformationPlugin(args, ct, isLookUpService),
outputPath + MDSTORE_DATA_PATH); encoder),
outputPath + MDSTORE_DATA_PATH);
log.info("Transformed item " + ct.getProcessedItems().count()); log.info("Transformed item " + ct.getProcessedItems().count());
log.info("Total item " + ct.getTotalItems().count()); log.info("Total item " + ct.getTotalItems().count());

View File

@ -12,11 +12,6 @@ import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.transformation.TransformSparkJobNode;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.IntWritable;
@ -35,163 +30,170 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion; import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
import eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord; import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.transformation.TransformSparkJobNode;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@TestMethodOrder(MethodOrderer.OrderAnnotation.class) @TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class AggregationJobTest { public class AggregationJobTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static SparkSession spark; private static SparkSession spark;
private static Path workingDir; private static Path workingDir;
private static Encoder<MetadataRecord> encoder; private static Encoder<MetadataRecord> encoder;
private static final String encoding = "XML"; private static final String encoding = "XML";
private static final String dateOfCollection = System.currentTimeMillis() + ""; private static final String dateOfCollection = System.currentTimeMillis() + "";
private static final String xpath = "//*[local-name()='header']/*[local-name()='identifier']"; private static final String xpath = "//*[local-name()='header']/*[local-name()='identifier']";
private static String provenance; private static String provenance;
private static final Logger log = LoggerFactory.getLogger(AggregationJobTest.class); private static final Logger log = LoggerFactory.getLogger(AggregationJobTest.class);
@BeforeAll @BeforeAll
public static void beforeAll() throws IOException { public static void beforeAll() throws IOException {
provenance = IOUtils.toString(AggregationJobTest.class.getResourceAsStream("/eu/dnetlib/dhp/collection/provenance.json")); provenance = IOUtils
workingDir = Files.createTempDirectory(AggregationJobTest.class.getSimpleName()); .toString(AggregationJobTest.class.getResourceAsStream("/eu/dnetlib/dhp/collection/provenance.json"));
log.info("using work dir {}", workingDir); workingDir = Files.createTempDirectory(AggregationJobTest.class.getSimpleName());
log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
conf.setAppName(AggregationJobTest.class.getSimpleName()); conf.setAppName(AggregationJobTest.class.getSimpleName());
conf.setMaster("local[*]"); conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost"); conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true"); conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false"); conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString()); conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
encoder = Encoders.bean(MetadataRecord.class); encoder = Encoders.bean(MetadataRecord.class);
spark = SparkSession spark = SparkSession
.builder() .builder()
.appName(AggregationJobTest.class.getSimpleName()) .appName(AggregationJobTest.class.getSimpleName())
.config(conf) .config(conf)
.getOrCreate(); .getOrCreate();
} }
@AfterAll @AfterAll
public static void afterAll() throws IOException { public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile()); FileUtils.deleteDirectory(workingDir.toFile());
spark.stop(); spark.stop();
} }
@Test @Test
@Order(1) @Order(1)
public void testGenerateNativeStoreSparkJobRefresh() throws Exception { public void testGenerateNativeStoreSparkJobRefresh() throws Exception {
MDStoreVersion mdStoreV1 = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_1.json"); MDStoreVersion mdStoreV1 = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_1.json");
FileUtils.forceMkdir(new File(mdStoreV1.getHdfsPath())); FileUtils.forceMkdir(new File(mdStoreV1.getHdfsPath()));
IOUtils IOUtils
.copy( .copy(
getClass().getResourceAsStream("/eu/dnetlib/dhp/collection/sequence_file"), getClass().getResourceAsStream("/eu/dnetlib/dhp/collection/sequence_file"),
new FileOutputStream(mdStoreV1.getHdfsPath() + "/sequence_file")); new FileOutputStream(mdStoreV1.getHdfsPath() + "/sequence_file"));
GenerateNativeStoreSparkJob GenerateNativeStoreSparkJob
.main( .main(
new String[]{ new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-encoding", encoding, "-encoding", encoding,
"-dateOfCollection", dateOfCollection, "-dateOfCollection", dateOfCollection,
"-provenance", provenance, "-provenance", provenance,
"-xpath", xpath, "-xpath", xpath,
"-mdStoreVersion", OBJECT_MAPPER.writeValueAsString(mdStoreV1), "-mdStoreVersion", OBJECT_MAPPER.writeValueAsString(mdStoreV1),
"-readMdStoreVersion", "", "-readMdStoreVersion", "",
"-workflowId", "abc" "-workflowId", "abc"
}); });
verify(mdStoreV1); verify(mdStoreV1);
} }
@Test @Test
@Order(2) @Order(2)
public void testGenerateNativeStoreSparkJobIncremental() throws Exception { public void testGenerateNativeStoreSparkJobIncremental() throws Exception {
MDStoreVersion mdStoreV2 = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_2.json"); MDStoreVersion mdStoreV2 = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_2.json");
FileUtils.forceMkdir(new File(mdStoreV2.getHdfsPath())); FileUtils.forceMkdir(new File(mdStoreV2.getHdfsPath()));
IOUtils IOUtils
.copy( .copy(
getClass().getResourceAsStream("/eu/dnetlib/dhp/collection/sequence_file"), getClass().getResourceAsStream("/eu/dnetlib/dhp/collection/sequence_file"),
new FileOutputStream(mdStoreV2.getHdfsPath() + "/sequence_file")); new FileOutputStream(mdStoreV2.getHdfsPath() + "/sequence_file"));
MDStoreVersion mdStoreV1 = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_1.json"); MDStoreVersion mdStoreV1 = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_1.json");
GenerateNativeStoreSparkJob GenerateNativeStoreSparkJob
.main( .main(
new String[]{ new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-encoding", encoding, "-encoding", encoding,
"-dateOfCollection", dateOfCollection, "-dateOfCollection", dateOfCollection,
"-provenance", provenance, "-provenance", provenance,
"-xpath", xpath, "-xpath", xpath,
"-mdStoreVersion", OBJECT_MAPPER.writeValueAsString(mdStoreV2), "-mdStoreVersion", OBJECT_MAPPER.writeValueAsString(mdStoreV2),
"-readMdStoreVersion", OBJECT_MAPPER.writeValueAsString(mdStoreV1), "-readMdStoreVersion", OBJECT_MAPPER.writeValueAsString(mdStoreV1),
"-workflowId", "abc" "-workflowId", "abc"
}); });
verify(mdStoreV2); verify(mdStoreV2);
} }
@Test
@Order(3)
public void testTransformSparkJob() throws Exception {
@Test MDStoreVersion mdStoreV2 = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_2.json");
@Order(3) MDStoreVersion mdStoreCleanedVersion = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreCleanedVersion.json");
public void testTransformSparkJob() throws Exception {
MDStoreVersion mdStoreV2 = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_2.json"); TransformSparkJobNode.main(new String[] {
MDStoreVersion mdStoreCleanedVersion = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreCleanedVersion.json"); "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-dateOfTransformation", dateOfCollection,
"-mdstoreInputVersion", OBJECT_MAPPER.writeValueAsString(mdStoreV2),
"-mdstoreOutputVersion", OBJECT_MAPPER.writeValueAsString(mdStoreCleanedVersion),
"-transformationPlugin", "XSLT_TRANSFORM",
"-isLookupUrl", "https://dev-openaire.d4science.org/is/services/isLookUp",
"-transformationRuleId",
"183dde52-a69b-4db9-a07e-1ef2be105294_VHJhbnNmb3JtYXRpb25SdWxlRFNSZXNvdXJjZXMvVHJhbnNmb3JtYXRpb25SdWxlRFNSZXNvdXJjZVR5cGU="
});
TransformSparkJobNode.main(new String[]{ }
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-dateOfTransformation", dateOfCollection,
"-mdstoreInputVersion", OBJECT_MAPPER.writeValueAsString(mdStoreV2),
"-mdstoreOutputVersion", OBJECT_MAPPER.writeValueAsString(mdStoreCleanedVersion),
"-transformationPlugin", "XSLT_TRANSFORM",
"-isLookupUrl", "https://dev-openaire.d4science.org/is/services/isLookUp",
"-transformationRuleId", "183dde52-a69b-4db9-a07e-1ef2be105294_VHJhbnNmb3JtYXRpb25SdWxlRFNSZXNvdXJjZXMvVHJhbnNmb3JtYXRpb25SdWxlRFNSZXNvdXJjZVR5cGU="});
} protected void verify(MDStoreVersion mdStoreVersion) throws IOException {
Assertions.assertTrue(new File(mdStoreVersion.getHdfsPath()).exists());
protected void verify(MDStoreVersion mdStoreVersion) throws IOException { final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
Assertions.assertTrue(new File(mdStoreVersion.getHdfsPath()).exists()); long seqFileSize = sc
.sequenceFile(mdStoreVersion.getHdfsPath() + "/sequence_file", IntWritable.class, Text.class)
.count();
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final Dataset<MetadataRecord> mdstore = spark.read().load(mdStoreVersion.getHdfsPath() + "/store").as(encoder);
long seqFileSize = sc long mdStoreSize = mdstore.count();
.sequenceFile(mdStoreVersion.getHdfsPath() + "/sequence_file", IntWritable.class, Text.class)
.count();
final Dataset<MetadataRecord> mdstore = spark.read().load(mdStoreVersion.getHdfsPath() + "/store").as(encoder); long declaredSize = Long.parseLong(IOUtils.toString(new FileReader(mdStoreVersion.getHdfsPath() + "/size")));
long mdStoreSize = mdstore.count();
long declaredSize = Long.parseLong(IOUtils.toString(new FileReader(mdStoreVersion.getHdfsPath() + "/size"))); Assertions.assertEquals(seqFileSize, declaredSize, "the size must be equal");
Assertions.assertEquals(seqFileSize, mdStoreSize, "the size must be equal");
Assertions.assertEquals(seqFileSize, declaredSize, "the size must be equal"); long uniqueIds = mdstore
Assertions.assertEquals(seqFileSize, mdStoreSize, "the size must be equal"); .map((MapFunction<MetadataRecord, String>) MetadataRecord::getId, Encoders.STRING())
.distinct()
.count();
long uniqueIds = mdstore Assertions.assertEquals(seqFileSize, uniqueIds, "the size must be equal");
.map((MapFunction<MetadataRecord, String>) MetadataRecord::getId, Encoders.STRING()) }
.distinct()
.count();
Assertions.assertEquals(seqFileSize, uniqueIds, "the size must be equal"); private MDStoreVersion prepareVersion(String filename) throws IOException {
} MDStoreVersion mdstore = OBJECT_MAPPER
.readValue(IOUtils.toString(getClass().getResource(filename)), MDStoreVersion.class);
private MDStoreVersion prepareVersion(String filename) throws IOException { mdstore.setHdfsPath(String.format(mdstore.getHdfsPath(), workingDir.toString()));
MDStoreVersion mdstore = OBJECT_MAPPER return mdstore;
.readValue(IOUtils.toString(getClass().getResource(filename)), MDStoreVersion.class); }
mdstore.setHdfsPath(String.format(mdstore.getHdfsPath(), workingDir.toString()));
return mdstore;
}
} }

View File

@ -75,9 +75,6 @@ public class TransformationJobTest {
spark.stop(); spark.stop();
} }
@Test @Test
@DisplayName("Test Transform Single XML using XSLTTransformator") @DisplayName("Test Transform Single XML using XSLTTransformator")
public void testTransformSaxonHE() throws Exception { public void testTransformSaxonHE() throws Exception {