factored out constants

This commit is contained in:
Claudio Atzori 2021-02-02 12:28:21 +01:00
parent 4ed1e306b6
commit 75807ea5ae
6 changed files with 63 additions and 48 deletions

View File

@ -0,0 +1,15 @@
package eu.dnetlib.dhp.aggregation.common;
public class AggregationConstants {
public static final String SEQUENCE_FILE_NAME = "/sequence_file";
public static final String MDSTORE_DATA_PATH = "/store";
public static final String MDSTORE_SIZE_PATH = "/size";
public static final String CONTENT_TOTALITEMS = "TotalItems";
public static final String CONTENT_INVALIDRECORDS = "InvalidRecords";
public static final String CONTENT_TRANSFORMEDRECORDS = "transformedItems";
}

View File

@ -5,6 +5,7 @@ import java.io.BufferedOutputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -21,6 +22,8 @@ public class AggregationUtility {
private static final Logger log = LoggerFactory.getLogger(AggregationUtility.class); private static final Logger log = LoggerFactory.getLogger(AggregationUtility.class);
public static final ObjectMapper MAPPER = new ObjectMapper();
public static void writeTotalSizeOnHDFS(final SparkSession spark, final Long total, final String path) public static void writeTotalSizeOnHDFS(final SparkSession spark, final Long total, final String path)
throws IOException { throws IOException {

View File

@ -1,15 +1,11 @@
package eu.dnetlib.dhp.collection; package eu.dnetlib.dhp.collection;
import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*; import com.fasterxml.jackson.databind.ObjectMapper;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import java.io.*; import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import java.nio.charset.StandardCharsets; import eu.dnetlib.dhp.model.mdstore.Provenance;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.IntWritable;
@ -26,26 +22,22 @@ import org.dom4j.Node;
import org.dom4j.io.SAXReader; import org.dom4j.io.SAXReader;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.worker.CollectorWorkerApplication;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.model.mdstore.Provenance;
import net.sf.saxon.expr.Component;
import scala.Tuple2; import scala.Tuple2;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.Optional;
import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.*;
import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
public class GenerateNativeStoreSparkJob { public class GenerateNativeStoreSparkJob {
private static final Logger log = LoggerFactory.getLogger(GenerateNativeStoreSparkJob.class); private static final Logger log = LoggerFactory.getLogger(GenerateNativeStoreSparkJob.class);
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final String DATASET_NAME = "/store";
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -88,11 +80,6 @@ public class GenerateNativeStoreSparkJob {
log.info("isSparkSessionManaged: {}", isSparkSessionManaged); log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
/*
* conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf .registerKryoClasses( new
* Class[] { MetadataRecord.class, Provenance.class });
*/
runWithSparkSession( runWithSparkSession(
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
@ -109,10 +96,10 @@ public class GenerateNativeStoreSparkJob {
MDStoreVersion readVersion) throws IOException { MDStoreVersion readVersion) throws IOException {
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
final LongAccumulator totalItems = sc.sc().longAccumulator("TotalItems"); final LongAccumulator totalItems = sc.sc().longAccumulator(CONTENT_TOTALITEMS);
final LongAccumulator invalidRecords = sc.sc().longAccumulator("InvalidRecords"); final LongAccumulator invalidRecords = sc.sc().longAccumulator(CONTENT_INVALIDRECORDS);
final String seqFilePath = currentVersion.getHdfsPath() + CollectorWorkerApplication.SEQUENCE_FILE_NAME; final String seqFilePath = currentVersion.getHdfsPath() + SEQUENCE_FILE_NAME;
final JavaRDD<MetadataRecord> nativeStore = sc final JavaRDD<MetadataRecord> nativeStore = sc
.sequenceFile(seqFilePath, IntWritable.class, Text.class) .sequenceFile(seqFilePath, IntWritable.class, Text.class)
.map( .map(
@ -130,13 +117,13 @@ public class GenerateNativeStoreSparkJob {
final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class); final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
final Dataset<MetadataRecord> mdstore = spark.createDataset(nativeStore.rdd(), encoder); final Dataset<MetadataRecord> mdstore = spark.createDataset(nativeStore.rdd(), encoder);
final String targetPath = currentVersion.getHdfsPath() + DATASET_NAME; final String targetPath = currentVersion.getHdfsPath() + MDSTORE_DATA_PATH;
if (readVersion != null) { // INCREMENTAL MODE if (readVersion != null) { // INCREMENTAL MODE
log.info("updating {} incrementally with {}", targetPath, readVersion.getHdfsPath()); log.info("updating {} incrementally with {}", targetPath, readVersion.getHdfsPath());
Dataset<MetadataRecord> currentMdStoreVersion = spark Dataset<MetadataRecord> currentMdStoreVersion = spark
.read() .read()
.load(readVersion.getHdfsPath() + DATASET_NAME) .load(readVersion.getHdfsPath() + MDSTORE_DATA_PATH)
.as(encoder); .as(encoder);
TypedColumn<MetadataRecord, MetadataRecord> aggregator = new MDStoreAggregator().toColumn(); TypedColumn<MetadataRecord, MetadataRecord> aggregator = new MDStoreAggregator().toColumn();
@ -159,7 +146,7 @@ public class GenerateNativeStoreSparkJob {
final Long total = spark.read().load(targetPath).count(); final Long total = spark.read().load(targetPath).count();
log.info("collected {} records for datasource '{}'", total, provenance.getDatasourceName()); log.info("collected {} records for datasource '{}'", total, provenance.getDatasourceName());
writeTotalSizeOnHDFS(spark, total, currentVersion.getHdfsPath() + "/size"); writeTotalSizeOnHDFS(spark, total, currentVersion.getHdfsPath() + MDSTORE_SIZE_PATH);
} }
public static class MDStoreAggregator extends Aggregator<MetadataRecord, MetadataRecord, MetadataRecord> { public static class MDStoreAggregator extends Aggregator<MetadataRecord, MetadataRecord, MetadataRecord> {

View File

@ -1,6 +1,8 @@
package eu.dnetlib.dhp.collection.worker; package eu.dnetlib.dhp.collection.worker;
import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.*;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -25,8 +27,6 @@ public class CollectorWorkerApplication {
private static final CollectorPluginFactory collectorPluginFactory = new CollectorPluginFactory(); private static final CollectorPluginFactory collectorPluginFactory = new CollectorPluginFactory();
public static String SEQUENCE_FILE_NAME = "/sequence_file";
/** /**
* @param args * @param args
*/ */

View File

@ -2,14 +2,17 @@
package eu.dnetlib.dhp.transformation; package eu.dnetlib.dhp.transformation;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*;
import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.*;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import eu.dnetlib.dhp.aggregation.common.AggregationConstants;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
@ -76,29 +79,36 @@ public class TransformSparkJobNode {
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> transformRecords( spark -> transformRecords(
parser.getObjectMap(), isLookupService, spark, nativeMdStoreVersion.getHdfsPath() + "/store", parser.getObjectMap(), isLookupService, spark, nativeMdStoreVersion.getHdfsPath() + MDSTORE_DATA_PATH,
cleanedMdStoreVersion.getHdfsPath() + "/store")); cleanedMdStoreVersion.getHdfsPath() + MDSTORE_DATA_PATH));
} }
public static void transformRecords(final Map<String, String> args, final ISLookUpService isLookUpService, public static void transformRecords(final Map<String, String> args, final ISLookUpService isLookUpService,
final SparkSession spark, final String inputPath, final String outputPath) final SparkSession spark, final String inputPath, final String outputPath)
throws DnetTransformationException, IOException { throws DnetTransformationException, IOException {
final LongAccumulator totalItems = spark.sparkContext().longAccumulator("TotalItems"); final LongAccumulator totalItems = spark.sparkContext().longAccumulator(CONTENT_TOTALITEMS);
final LongAccumulator errorItems = spark.sparkContext().longAccumulator("errorItems"); final LongAccumulator errorItems = spark.sparkContext().longAccumulator(CONTENT_INVALIDRECORDS);
final LongAccumulator transformedItems = spark.sparkContext().longAccumulator("transformedItems"); final LongAccumulator transformedItems = spark.sparkContext().longAccumulator(CONTENT_TRANSFORMEDRECORDS);
final AggregationCounter ct = new AggregationCounter(totalItems, errorItems, transformedItems); final AggregationCounter ct = new AggregationCounter(totalItems, errorItems, transformedItems);
final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class); final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
final Dataset<MetadataRecord> mdstoreInput = spark.read().format("parquet").load(inputPath).as(encoder);
final MapFunction<MetadataRecord, MetadataRecord> XSLTTransformationFunction = TransformationFactory saveDataset(
.getTransformationPlugin(args, ct, isLookUpService); spark.read()
mdstoreInput.map(XSLTTransformationFunction, encoder).write().save(outputPath + "/store"); .format("parquet")
.load(inputPath)
.as(encoder)
.map(
TransformationFactory.getTransformationPlugin(args, ct, isLookUpService),
encoder),
outputPath + MDSTORE_DATA_PATH);
log.info("Transformed item " + ct.getProcessedItems().count()); log.info("Transformed item " + ct.getProcessedItems().count());
log.info("Total item " + ct.getTotalItems().count()); log.info("Total item " + ct.getTotalItems().count());
log.info("Transformation Error item " + ct.getErrorItems().count()); log.info("Transformation Error item " + ct.getErrorItems().count());
AggregationUtility.writeTotalSizeOnHDFS(spark, ct.getProcessedItems().count(), outputPath + "/size"); writeTotalSizeOnHDFS(spark, ct.getProcessedItems().count(), outputPath + MDSTORE_SIZE_PATH);
} }
} }

View File

@ -145,7 +145,7 @@ public class AggregationJobTest {
} }
//@Test @Test
@Order(3) @Order(3)
public void testTransformSparkJob() throws Exception { public void testTransformSparkJob() throws Exception {