forked from D-Net/dnet-hadoop
WIP: metadata collection in INCREMENTAL mode and relative test
This commit is contained in:
parent
bead34d11a
commit
8eaa1fd4b4
|
@ -26,13 +26,13 @@ public class MetadataRecord implements Serializable {
|
||||||
private String body;
|
private String body;
|
||||||
|
|
||||||
/** the date when the record has been stored */
|
/** the date when the record has been stored */
|
||||||
private long dateOfCollection;
|
private Long dateOfCollection;
|
||||||
|
|
||||||
/** the date when the record has been stored */
|
/** the date when the record has been stored */
|
||||||
private long dateOfTransformation;
|
private Long dateOfTransformation;
|
||||||
|
|
||||||
public MetadataRecord() {
|
public MetadataRecord() {
|
||||||
this.dateOfCollection = System.currentTimeMillis();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public MetadataRecord(
|
public MetadataRecord(
|
||||||
|
@ -40,7 +40,7 @@ public class MetadataRecord implements Serializable {
|
||||||
String encoding,
|
String encoding,
|
||||||
Provenance provenance,
|
Provenance provenance,
|
||||||
String body,
|
String body,
|
||||||
long dateOfCollection) {
|
Long dateOfCollection) {
|
||||||
|
|
||||||
this.originalId = originalId;
|
this.originalId = originalId;
|
||||||
this.encoding = encoding;
|
this.encoding = encoding;
|
||||||
|
@ -90,19 +90,19 @@ public class MetadataRecord implements Serializable {
|
||||||
this.body = body;
|
this.body = body;
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getDateOfCollection() {
|
public Long getDateOfCollection() {
|
||||||
return dateOfCollection;
|
return dateOfCollection;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setDateOfCollection(long dateOfCollection) {
|
public void setDateOfCollection(Long dateOfCollection) {
|
||||||
this.dateOfCollection = dateOfCollection;
|
this.dateOfCollection = dateOfCollection;
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getDateOfTransformation() {
|
public Long getDateOfTransformation() {
|
||||||
return dateOfTransformation;
|
return dateOfTransformation;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setDateOfTransformation(long dateOfTransformation) {
|
public void setDateOfTransformation(Long dateOfTransformation) {
|
||||||
this.dateOfTransformation = dateOfTransformation;
|
this.dateOfTransformation = dateOfTransformation;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -8,21 +8,38 @@ import java.nio.charset.StandardCharsets;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.spark.sql.Dataset;
|
||||||
|
import org.apache.spark.sql.SaveMode;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob;
|
||||||
|
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
|
||||||
|
|
||||||
public class AggregationUtility {
|
public class AggregationUtility {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(AggregationUtility.class);
|
||||||
|
|
||||||
public static void writeTotalSizeOnHDFS(final SparkSession spark, final Long total, final String path)
|
public static void writeTotalSizeOnHDFS(final SparkSession spark, final Long total, final String path)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration());
|
log.info("writing size ({}) info file {}", total, path);
|
||||||
|
try (FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration());
|
||||||
|
BufferedOutputStream os = new BufferedOutputStream(fs.create(new Path(path)))) {
|
||||||
|
os.write(total.toString().getBytes(StandardCharsets.UTF_8));
|
||||||
|
os.flush();
|
||||||
|
}
|
||||||
|
|
||||||
FSDataOutputStream output = fs.create(new Path(path));
|
|
||||||
|
|
||||||
final BufferedOutputStream os = new BufferedOutputStream(output);
|
|
||||||
|
|
||||||
os.write(total.toString().getBytes(StandardCharsets.UTF_8));
|
|
||||||
|
|
||||||
os.close();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static <T> void saveDataset(final Dataset<T> mdstore, final String targetPath) {
|
||||||
|
log.info("saving dataset in: {}", targetPath);
|
||||||
|
mdstore
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.format("parquet")
|
||||||
|
.save(targetPath);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,23 +1,20 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.collection;
|
package eu.dnetlib.dhp.collection;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*;
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
import java.io.*;
|
import java.io.*;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.Collections;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.io.IntWritable;
|
import org.apache.hadoop.io.IntWritable;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.JavaPairRDD;
|
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
|
@ -30,31 +27,155 @@ import org.dom4j.io.SAXReader;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
|
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
|
||||||
import eu.dnetlib.dhp.aggregation.common.AggregationUtility;
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.collection.worker.CollectorWorkerApplication;
|
import eu.dnetlib.dhp.collection.worker.CollectorWorkerApplication;
|
||||||
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
|
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
|
||||||
import eu.dnetlib.dhp.model.mdstore.Provenance;
|
import eu.dnetlib.dhp.model.mdstore.Provenance;
|
||||||
|
import net.sf.saxon.expr.Component;
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
|
|
||||||
public class GenerateNativeStoreSparkJob {
|
public class GenerateNativeStoreSparkJob {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(GenerateNativeStoreSparkJob.class);
|
private static final Logger log = LoggerFactory.getLogger(GenerateNativeStoreSparkJob.class);
|
||||||
|
|
||||||
|
private static final ObjectMapper MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
private static final String DATASET_NAME = "/store";
|
private static final String DATASET_NAME = "/store";
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
||||||
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
|
IOUtils
|
||||||
|
.toString(
|
||||||
|
GenerateNativeStoreSparkJob.class
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/collection/collection_input_parameters.json")));
|
||||||
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
final String provenanceArgument = parser.get("provenance");
|
||||||
|
log.info("Provenance is {}", provenanceArgument);
|
||||||
|
final Provenance provenance = MAPPER.readValue(provenanceArgument, Provenance.class);
|
||||||
|
|
||||||
|
final String dateOfCollectionArgs = parser.get("dateOfCollection");
|
||||||
|
log.info("dateOfCollection is {}", dateOfCollectionArgs);
|
||||||
|
final Long dateOfCollection = new Long(dateOfCollectionArgs);
|
||||||
|
|
||||||
|
String mdStoreVersion = parser.get("mdStoreVersion");
|
||||||
|
log.info("mdStoreVersion is {}", mdStoreVersion);
|
||||||
|
|
||||||
|
final MDStoreVersion currentVersion = MAPPER.readValue(mdStoreVersion, MDStoreVersion.class);
|
||||||
|
|
||||||
|
String readMdStoreVersionParam = parser.get("readMdStoreVersion");
|
||||||
|
log.info("readMdStoreVersion is {}", readMdStoreVersionParam);
|
||||||
|
|
||||||
|
final MDStoreVersion readMdStoreVersion = StringUtils.isBlank(readMdStoreVersionParam) ? null
|
||||||
|
: MAPPER.readValue(readMdStoreVersionParam, MDStoreVersion.class);
|
||||||
|
|
||||||
|
final String xpath = parser.get("xpath");
|
||||||
|
log.info("xpath is {}", xpath);
|
||||||
|
|
||||||
|
final String encoding = parser.get("encoding");
|
||||||
|
log.info("encoding is {}", encoding);
|
||||||
|
|
||||||
|
Boolean isSparkSessionManaged = Optional
|
||||||
|
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||||
|
.map(Boolean::valueOf)
|
||||||
|
.orElse(Boolean.TRUE);
|
||||||
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
|
|
||||||
|
SparkConf conf = new SparkConf();
|
||||||
|
/*
|
||||||
|
* conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf .registerKryoClasses( new
|
||||||
|
* Class[] { MetadataRecord.class, Provenance.class });
|
||||||
|
*/
|
||||||
|
|
||||||
|
runWithSparkSession(
|
||||||
|
conf,
|
||||||
|
isSparkSessionManaged,
|
||||||
|
spark -> createNativeMDStore(
|
||||||
|
spark, provenance, dateOfCollection, xpath, encoding, currentVersion, readMdStoreVersion));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void createNativeMDStore(SparkSession spark,
|
||||||
|
Provenance provenance,
|
||||||
|
Long dateOfCollection,
|
||||||
|
String xpath,
|
||||||
|
String encoding,
|
||||||
|
MDStoreVersion currentVersion,
|
||||||
|
MDStoreVersion readVersion) throws IOException {
|
||||||
|
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
|
final LongAccumulator totalItems = sc.sc().longAccumulator("TotalItems");
|
||||||
|
final LongAccumulator invalidRecords = sc.sc().longAccumulator("InvalidRecords");
|
||||||
|
|
||||||
|
final String seqFilePath = currentVersion.getHdfsPath() + CollectorWorkerApplication.SEQUENCE_FILE_NAME;
|
||||||
|
final JavaRDD<MetadataRecord> nativeStore = sc
|
||||||
|
.sequenceFile(seqFilePath, IntWritable.class, Text.class)
|
||||||
|
.map(
|
||||||
|
item -> parseRecord(
|
||||||
|
item._2().toString(),
|
||||||
|
xpath,
|
||||||
|
encoding,
|
||||||
|
provenance,
|
||||||
|
dateOfCollection,
|
||||||
|
totalItems,
|
||||||
|
invalidRecords))
|
||||||
|
.filter(Objects::nonNull)
|
||||||
|
.distinct();
|
||||||
|
|
||||||
|
final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
|
||||||
|
final Dataset<MetadataRecord> mdstore = spark.createDataset(nativeStore.rdd(), encoder);
|
||||||
|
|
||||||
|
final String targetPath = currentVersion.getHdfsPath() + DATASET_NAME;
|
||||||
|
|
||||||
|
if (readVersion != null) { // INCREMENTAL MODE
|
||||||
|
log.info("updating {} incrementally with {}", targetPath, readVersion.getHdfsPath());
|
||||||
|
Dataset<MetadataRecord> currentMdStoreVersion = spark
|
||||||
|
.read()
|
||||||
|
.load(readVersion.getHdfsPath() + DATASET_NAME)
|
||||||
|
.as(encoder);
|
||||||
|
TypedColumn<MetadataRecord, MetadataRecord> aggregator = new MDStoreAggregator().toColumn();
|
||||||
|
|
||||||
|
final Dataset<MetadataRecord> map = currentMdStoreVersion
|
||||||
|
.union(mdstore)
|
||||||
|
.groupByKey(
|
||||||
|
(MapFunction<MetadataRecord, String>) MetadataRecord::getId,
|
||||||
|
Encoders.STRING())
|
||||||
|
.agg(aggregator)
|
||||||
|
.map((MapFunction<Tuple2<String, MetadataRecord>, MetadataRecord>) Tuple2::_2, encoder);
|
||||||
|
|
||||||
|
map.select("id").takeAsList(100).forEach(s -> log.info(s.toString()));
|
||||||
|
|
||||||
|
saveDataset(map, targetPath);
|
||||||
|
|
||||||
|
} else {
|
||||||
|
saveDataset(mdstore, targetPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
final Long total = spark.read().load(targetPath).count();
|
||||||
|
log.info("collected {} records for datasource '{}'", total, provenance.getDatasourceName());
|
||||||
|
|
||||||
|
writeTotalSizeOnHDFS(spark, total, currentVersion.getHdfsPath() + "/size");
|
||||||
|
}
|
||||||
|
|
||||||
public static class MDStoreAggregator extends Aggregator<MetadataRecord, MetadataRecord, MetadataRecord> {
|
public static class MDStoreAggregator extends Aggregator<MetadataRecord, MetadataRecord, MetadataRecord> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public MetadataRecord zero() {
|
public MetadataRecord zero() {
|
||||||
return new MetadataRecord();
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public MetadataRecord reduce(MetadataRecord b, MetadataRecord a) {
|
public MetadataRecord reduce(MetadataRecord b, MetadataRecord a) {
|
||||||
|
return getLatestRecord(b, a);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MetadataRecord merge(MetadataRecord b, MetadataRecord a) {
|
||||||
return getLatestRecord(b, a);
|
return getLatestRecord(b, a);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -68,136 +189,22 @@ public class GenerateNativeStoreSparkJob {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public MetadataRecord merge(MetadataRecord b, MetadataRecord a) {
|
public MetadataRecord finish(MetadataRecord r) {
|
||||||
return getLatestRecord(b, a);
|
return r;
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public MetadataRecord finish(MetadataRecord j) {
|
|
||||||
return j;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Encoder<MetadataRecord> bufferEncoder() {
|
public Encoder<MetadataRecord> bufferEncoder() {
|
||||||
return Encoders.kryo(MetadataRecord.class);
|
return Encoders.bean(MetadataRecord.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Encoder<MetadataRecord> outputEncoder() {
|
public Encoder<MetadataRecord> outputEncoder() {
|
||||||
return Encoders.kryo(MetadataRecord.class);
|
return Encoders.bean(MetadataRecord.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
|
||||||
|
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
|
||||||
IOUtils
|
|
||||||
.toString(
|
|
||||||
GenerateNativeStoreSparkJob.class
|
|
||||||
.getResourceAsStream(
|
|
||||||
"/eu/dnetlib/dhp/collection/collection_input_parameters.json")));
|
|
||||||
parser.parseArgument(args);
|
|
||||||
final ObjectMapper jsonMapper = new ObjectMapper();
|
|
||||||
final String provenanceArgument = parser.get("provenance");
|
|
||||||
log.info("Provenance is {}", provenanceArgument);
|
|
||||||
final Provenance provenance = jsonMapper.readValue(provenanceArgument, Provenance.class);
|
|
||||||
|
|
||||||
final String dateOfCollectionArgs = parser.get("dateOfCollection");
|
|
||||||
log.info("dateOfCollection is {}", dateOfCollectionArgs);
|
|
||||||
final long dateOfCollection = new Long(dateOfCollectionArgs);
|
|
||||||
|
|
||||||
String mdStoreVersion = parser.get("mdStoreVersion");
|
|
||||||
log.info("mdStoreVersion is {}", mdStoreVersion);
|
|
||||||
|
|
||||||
final MDStoreVersion currentVersion = jsonMapper.readValue(mdStoreVersion, MDStoreVersion.class);
|
|
||||||
|
|
||||||
String readMdStoreVersionParam = parser.get("readMdStoreVersion");
|
|
||||||
log.info("readMdStoreVersion is {}", readMdStoreVersionParam);
|
|
||||||
|
|
||||||
final MDStoreVersion readMdStoreVersion = StringUtils.isBlank(readMdStoreVersionParam) ? null
|
|
||||||
: jsonMapper.readValue(readMdStoreVersionParam, MDStoreVersion.class);
|
|
||||||
|
|
||||||
Boolean isSparkSessionManaged = Optional
|
|
||||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
|
||||||
.map(Boolean::valueOf)
|
|
||||||
.orElse(Boolean.TRUE);
|
|
||||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
|
||||||
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
|
|
||||||
conf.registerKryoClasses(Collections.singleton(MetadataRecord.class).toArray(new Class[] {}));
|
|
||||||
|
|
||||||
runWithSparkSession(
|
|
||||||
conf,
|
|
||||||
isSparkSessionManaged,
|
|
||||||
spark -> {
|
|
||||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
|
||||||
|
|
||||||
final JavaPairRDD<IntWritable, Text> inputRDD = sc
|
|
||||||
.sequenceFile(
|
|
||||||
currentVersion.getHdfsPath() + CollectorWorkerApplication.SEQUENTIAL_FILE_NAME,
|
|
||||||
IntWritable.class, Text.class);
|
|
||||||
|
|
||||||
final LongAccumulator totalItems = sc.sc().longAccumulator("TotalItems");
|
|
||||||
final LongAccumulator invalidRecords = sc.sc().longAccumulator("InvalidRecords");
|
|
||||||
|
|
||||||
final JavaRDD<MetadataRecord> nativeStore = inputRDD
|
|
||||||
.map(
|
|
||||||
item -> parseRecord(
|
|
||||||
item._2().toString(),
|
|
||||||
parser.get("xpath"),
|
|
||||||
parser.get("encoding"),
|
|
||||||
provenance,
|
|
||||||
dateOfCollection,
|
|
||||||
totalItems,
|
|
||||||
invalidRecords))
|
|
||||||
.filter(Objects::nonNull)
|
|
||||||
.distinct();
|
|
||||||
|
|
||||||
final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
|
|
||||||
|
|
||||||
final Dataset<MetadataRecord> mdstore = spark.createDataset(nativeStore.rdd(), encoder);
|
|
||||||
|
|
||||||
final String targetPath = currentVersion.getHdfsPath() + DATASET_NAME;
|
|
||||||
if (readMdStoreVersion != null) {
|
|
||||||
// INCREMENTAL MODE
|
|
||||||
|
|
||||||
Dataset<MetadataRecord> currentMdStoreVersion = spark
|
|
||||||
.read()
|
|
||||||
.load(readMdStoreVersion.getHdfsPath() + DATASET_NAME)
|
|
||||||
.as(encoder);
|
|
||||||
TypedColumn<MetadataRecord, MetadataRecord> aggregator = new MDStoreAggregator().toColumn();
|
|
||||||
|
|
||||||
saveDataset(
|
|
||||||
currentMdStoreVersion
|
|
||||||
.union(mdstore)
|
|
||||||
.groupByKey(
|
|
||||||
(MapFunction<MetadataRecord, String>) MetadataRecord::getId,
|
|
||||||
Encoders.STRING())
|
|
||||||
.agg(aggregator)
|
|
||||||
.map((MapFunction<Tuple2<String, MetadataRecord>, MetadataRecord>) Tuple2::_2, encoder),
|
|
||||||
targetPath);
|
|
||||||
|
|
||||||
} else {
|
|
||||||
saveDataset(mdstore, targetPath);
|
|
||||||
}
|
|
||||||
|
|
||||||
final Long total = spark.read().load(targetPath).count();
|
|
||||||
|
|
||||||
AggregationUtility.writeTotalSizeOnHDFS(spark, total, currentVersion.getHdfsPath() + "/size");
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void saveDataset(final Dataset<MetadataRecord> currentMdStore, final String targetPath) {
|
|
||||||
currentMdStore
|
|
||||||
.write()
|
|
||||||
.mode(SaveMode.Overwrite)
|
|
||||||
.format("parquet")
|
|
||||||
.save(targetPath);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
public static MetadataRecord parseRecord(
|
public static MetadataRecord parseRecord(
|
||||||
final String input,
|
final String input,
|
||||||
final String xpath,
|
final String xpath,
|
||||||
|
@ -219,7 +226,7 @@ public class GenerateNativeStoreSparkJob {
|
||||||
invalidRecords.add(1);
|
invalidRecords.add(1);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
return new MetadataRecord(originalIdentifier, encoding, provenance, input, dateOfCollection);
|
return new MetadataRecord(originalIdentifier, encoding, provenance, document.asXML(), dateOfCollection);
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
invalidRecords.add(1);
|
invalidRecords.add(1);
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -1,11 +1,6 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.collection.worker;
|
package eu.dnetlib.dhp.collection.worker;
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.FileOutputStream;
|
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.util.Properties;
|
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -16,7 +11,6 @@ import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory;
|
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory;
|
||||||
import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor;
|
import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor;
|
||||||
import eu.dnetlib.dhp.common.rest.DNetRestClient;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* DnetCollectortWorkerApplication is the main class responsible to start the Dnet Collection into HDFS. This module
|
* DnetCollectortWorkerApplication is the main class responsible to start the Dnet Collection into HDFS. This module
|
||||||
|
@ -31,7 +25,7 @@ public class CollectorWorkerApplication {
|
||||||
|
|
||||||
private static final CollectorPluginFactory collectorPluginFactory = new CollectorPluginFactory();
|
private static final CollectorPluginFactory collectorPluginFactory = new CollectorPluginFactory();
|
||||||
|
|
||||||
public static String SEQUENTIAL_FILE_NAME = "/sequence_file";
|
public static String SEQUENCE_FILE_NAME = "/sequence_file";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param args
|
* @param args
|
||||||
|
@ -61,7 +55,7 @@ public class CollectorWorkerApplication {
|
||||||
|
|
||||||
final ApiDescriptor api = jsonMapper.readValue(apiDescriptor, ApiDescriptor.class);
|
final ApiDescriptor api = jsonMapper.readValue(apiDescriptor, ApiDescriptor.class);
|
||||||
final CollectorWorker worker = new CollectorWorker(collectorPluginFactory, api, hdfsuri,
|
final CollectorWorker worker = new CollectorWorker(collectorPluginFactory, api, hdfsuri,
|
||||||
currentVersion.getHdfsPath() + SEQUENTIAL_FILE_NAME);
|
currentVersion.getHdfsPath() + SEQUENCE_FILE_NAME);
|
||||||
worker.collect();
|
worker.collect();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,169 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.collection;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
|
import java.io.FileReader;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.hadoop.io.IntWritable;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
|
import org.apache.spark.sql.Dataset;
|
||||||
|
import org.apache.spark.sql.Encoder;
|
||||||
|
import org.apache.spark.sql.Encoders;
|
||||||
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
import org.junit.jupiter.api.*;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
|
||||||
|
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
|
||||||
|
|
||||||
|
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
|
||||||
|
public class GenerateNativeStoreSparkJobTest {
|
||||||
|
|
||||||
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
|
private static SparkSession spark;
|
||||||
|
|
||||||
|
private static Path workingDir;
|
||||||
|
|
||||||
|
private static Encoder<MetadataRecord> encoder;
|
||||||
|
|
||||||
|
private static final String encoding = "XML";
|
||||||
|
private static final String dateOfCollection = System.currentTimeMillis() + "";
|
||||||
|
private static final String xpath = "//*[local-name()='header']/*[local-name()='identifier']";
|
||||||
|
private static String provenance;
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(GenerateNativeStoreSparkJobTest.class);
|
||||||
|
|
||||||
|
@BeforeAll
|
||||||
|
public static void beforeAll() throws IOException {
|
||||||
|
provenance = IOUtils.toString(GenerateNativeStoreSparkJobTest.class.getResourceAsStream("provenance.json"));
|
||||||
|
workingDir = Files.createTempDirectory(GenerateNativeStoreSparkJobTest.class.getSimpleName());
|
||||||
|
log.info("using work dir {}", workingDir);
|
||||||
|
|
||||||
|
SparkConf conf = new SparkConf();
|
||||||
|
|
||||||
|
conf.setAppName(GenerateNativeStoreSparkJobTest.class.getSimpleName());
|
||||||
|
|
||||||
|
conf.setMaster("local[*]");
|
||||||
|
conf.set("spark.driver.host", "localhost");
|
||||||
|
conf.set("hive.metastore.local", "true");
|
||||||
|
conf.set("spark.ui.enabled", "false");
|
||||||
|
conf.set("spark.sql.warehouse.dir", workingDir.toString());
|
||||||
|
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
|
||||||
|
|
||||||
|
encoder = Encoders.bean(MetadataRecord.class);
|
||||||
|
spark = SparkSession
|
||||||
|
.builder()
|
||||||
|
.appName(GenerateNativeStoreSparkJobTest.class.getSimpleName())
|
||||||
|
.config(conf)
|
||||||
|
.getOrCreate();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterAll
|
||||||
|
public static void afterAll() throws IOException {
|
||||||
|
FileUtils.deleteDirectory(workingDir.toFile());
|
||||||
|
spark.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@Order(1)
|
||||||
|
public void testGenerateNativeStoreSparkJobRefresh() throws Exception {
|
||||||
|
|
||||||
|
MDStoreVersion mdStoreV1 = prepareVersion("mdStoreVersion_1.json");
|
||||||
|
FileUtils.forceMkdir(new File(mdStoreV1.getHdfsPath()));
|
||||||
|
|
||||||
|
IOUtils
|
||||||
|
.copy(
|
||||||
|
getClass().getResourceAsStream("sequence_file"),
|
||||||
|
new FileOutputStream(mdStoreV1.getHdfsPath() + "/sequence_file"));
|
||||||
|
|
||||||
|
GenerateNativeStoreSparkJob
|
||||||
|
.main(
|
||||||
|
new String[] {
|
||||||
|
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||||
|
"-encoding", encoding,
|
||||||
|
"-dateOfCollection", dateOfCollection,
|
||||||
|
"-provenance", provenance,
|
||||||
|
"-xpath", xpath,
|
||||||
|
"-mdStoreVersion", OBJECT_MAPPER.writeValueAsString(mdStoreV1),
|
||||||
|
"-readMdStoreVersion", "",
|
||||||
|
"-workflowId", "abc"
|
||||||
|
});
|
||||||
|
|
||||||
|
verify(mdStoreV1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@Order(2)
|
||||||
|
public void testGenerateNativeStoreSparkJobIncremental() throws Exception {
|
||||||
|
|
||||||
|
MDStoreVersion mdStoreV2 = prepareVersion("mdStoreVersion_2.json");
|
||||||
|
FileUtils.forceMkdir(new File(mdStoreV2.getHdfsPath()));
|
||||||
|
|
||||||
|
IOUtils
|
||||||
|
.copy(
|
||||||
|
getClass().getResourceAsStream("sequence_file"),
|
||||||
|
new FileOutputStream(mdStoreV2.getHdfsPath() + "/sequence_file"));
|
||||||
|
|
||||||
|
MDStoreVersion mdStoreV1 = prepareVersion("mdStoreVersion_1.json");
|
||||||
|
|
||||||
|
GenerateNativeStoreSparkJob
|
||||||
|
.main(
|
||||||
|
new String[] {
|
||||||
|
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||||
|
"-encoding", encoding,
|
||||||
|
"-dateOfCollection", dateOfCollection,
|
||||||
|
"-provenance", provenance,
|
||||||
|
"-xpath", xpath,
|
||||||
|
"-mdStoreVersion", OBJECT_MAPPER.writeValueAsString(mdStoreV2),
|
||||||
|
"-readMdStoreVersion", OBJECT_MAPPER.writeValueAsString(mdStoreV1),
|
||||||
|
"-workflowId", "abc"
|
||||||
|
});
|
||||||
|
|
||||||
|
verify(mdStoreV2);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void verify(MDStoreVersion mdStoreVersion) throws IOException {
|
||||||
|
Assertions.assertTrue(new File(mdStoreVersion.getHdfsPath()).exists());
|
||||||
|
|
||||||
|
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||||
|
long seqFileSize = sc
|
||||||
|
.sequenceFile(mdStoreVersion.getHdfsPath() + "/sequence_file", IntWritable.class, Text.class)
|
||||||
|
.count();
|
||||||
|
|
||||||
|
final Dataset<MetadataRecord> mdstore = spark.read().load(mdStoreVersion.getHdfsPath() + "/store").as(encoder);
|
||||||
|
long mdStoreSize = mdstore.count();
|
||||||
|
|
||||||
|
long declaredSize = Long.parseLong(IOUtils.toString(new FileReader(mdStoreVersion.getHdfsPath() + "/size")));
|
||||||
|
|
||||||
|
Assertions.assertEquals(seqFileSize, declaredSize, "the size must be equal");
|
||||||
|
Assertions.assertEquals(seqFileSize, mdStoreSize, "the size must be equal");
|
||||||
|
|
||||||
|
long uniqueIds = mdstore
|
||||||
|
.map((MapFunction<MetadataRecord, String>) MetadataRecord::getId, Encoders.STRING())
|
||||||
|
.distinct()
|
||||||
|
.count();
|
||||||
|
|
||||||
|
Assertions.assertEquals(seqFileSize, uniqueIds, "the size must be equal");
|
||||||
|
}
|
||||||
|
|
||||||
|
private MDStoreVersion prepareVersion(String filename) throws IOException {
|
||||||
|
MDStoreVersion mdstore = OBJECT_MAPPER
|
||||||
|
.readValue(IOUtils.toString(getClass().getResource(filename)), MDStoreVersion.class);
|
||||||
|
mdstore.setHdfsPath(String.format(mdstore.getHdfsPath(), workingDir.toString()));
|
||||||
|
return mdstore;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -1,9 +0,0 @@
|
||||||
{
|
|
||||||
"id": "md-7557225f-77cc-407d-bdf4-d2fe03131464-1611935085410",
|
|
||||||
"mdstore": "md-7557225f-77cc-407d-bdf4-d2fe03131464",
|
|
||||||
"writing": true,
|
|
||||||
"readCount": 0,
|
|
||||||
"lastUpdate": null,
|
|
||||||
"size": 0,
|
|
||||||
"hdfsPath": "/data/dnet.dev/mdstore/md-7557225f-77cc-407d-bdf4-d2fe03131464/md-7557225f-77cc-407d-bdf4-d2fe03131464-1611935085410"
|
|
||||||
}
|
|
|
@ -0,0 +1,9 @@
|
||||||
|
{
|
||||||
|
"id":"md-84e86d00-5771-4ed9-b17f-177ef4b46e42-1612187678801",
|
||||||
|
"mdstore":"md-84e86d00-5771-4ed9-b17f-177ef4b46e42",
|
||||||
|
"writing":true,
|
||||||
|
"readCount":0,
|
||||||
|
"lastUpdate":null,
|
||||||
|
"size":0,
|
||||||
|
"hdfsPath":"%s/mdstore/md-84e86d00-5771-4ed9-b17f-177ef4b46e42/v1"
|
||||||
|
}
|
|
@ -0,0 +1,9 @@
|
||||||
|
{
|
||||||
|
"id":"md-84e86d00-5771-4ed9-b17f-177ef4b46e42-1612187459108",
|
||||||
|
"mdstore":"md-84e86d00-5771-4ed9-b17f-177ef4b46e42",
|
||||||
|
"writing":false,
|
||||||
|
"readCount":1,
|
||||||
|
"lastUpdate":1612187563099,
|
||||||
|
"size":71,
|
||||||
|
"hdfsPath":"%s/mdstore/md-84e86d00-5771-4ed9-b17f-177ef4b46e42/v2"
|
||||||
|
}
|
|
@ -0,0 +1,5 @@
|
||||||
|
{
|
||||||
|
"datasourceId":"74912366-d6df-49c1-a1fd-8a52fa98ce5f_UmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZXMvUmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZVR5cGU\u003d",
|
||||||
|
"datasourceName":"PSNC Institutional Repository",
|
||||||
|
"nsPrefix":"psnc______pl"
|
||||||
|
}
|
Binary file not shown.
Loading…
Reference in New Issue