2020-04-27 14:52:31 +02:00
|
|
|
|
2019-03-18 10:44:35 +01:00
|
|
|
package eu.dnetlib.dhp.collection;
|
|
|
|
|
2021-02-01 19:29:10 +01:00
|
|
|
import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*;
|
2020-05-05 12:39:04 +02:00
|
|
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
|
|
|
|
2021-01-29 16:42:41 +01:00
|
|
|
import java.io.*;
|
2020-04-18 12:42:58 +02:00
|
|
|
import java.nio.charset.StandardCharsets;
|
2021-02-01 19:29:10 +01:00
|
|
|
import java.util.List;
|
2020-04-18 12:42:58 +02:00
|
|
|
import java.util.Objects;
|
2020-05-05 12:39:04 +02:00
|
|
|
import java.util.Optional;
|
2020-04-28 11:23:29 +02:00
|
|
|
|
2019-10-07 17:02:53 +02:00
|
|
|
import org.apache.commons.io.IOUtils;
|
2019-03-18 10:44:35 +01:00
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
|
import org.apache.hadoop.io.IntWritable;
|
|
|
|
import org.apache.hadoop.io.Text;
|
2020-05-05 12:39:04 +02:00
|
|
|
import org.apache.spark.SparkConf;
|
2019-03-18 10:44:35 +01:00
|
|
|
import org.apache.spark.api.java.JavaRDD;
|
|
|
|
import org.apache.spark.api.java.JavaSparkContext;
|
2021-02-01 13:56:05 +01:00
|
|
|
import org.apache.spark.api.java.function.MapFunction;
|
|
|
|
import org.apache.spark.sql.*;
|
|
|
|
import org.apache.spark.sql.expressions.Aggregator;
|
2019-03-18 10:44:35 +01:00
|
|
|
import org.apache.spark.util.LongAccumulator;
|
|
|
|
import org.dom4j.Document;
|
|
|
|
import org.dom4j.Node;
|
|
|
|
import org.dom4j.io.SAXReader;
|
2020-05-05 12:39:04 +02:00
|
|
|
import org.slf4j.Logger;
|
|
|
|
import org.slf4j.LoggerFactory;
|
2019-03-18 10:44:35 +01:00
|
|
|
|
2021-02-01 19:29:10 +01:00
|
|
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
2020-04-28 11:23:29 +02:00
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
|
|
2021-01-29 16:42:41 +01:00
|
|
|
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
|
2020-04-28 11:23:29 +02:00
|
|
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
2021-01-29 16:42:41 +01:00
|
|
|
import eu.dnetlib.dhp.collection.worker.CollectorWorkerApplication;
|
2020-04-28 11:23:29 +02:00
|
|
|
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
|
|
|
|
import eu.dnetlib.dhp.model.mdstore.Provenance;
|
2021-02-01 19:29:10 +01:00
|
|
|
import net.sf.saxon.expr.Component;
|
2021-02-01 13:56:05 +01:00
|
|
|
import scala.Tuple2;
|
2020-04-28 11:23:29 +02:00
|
|
|
|
2019-03-18 10:44:35 +01:00
|
|
|
public class GenerateNativeStoreSparkJob {
|
|
|
|
|
2020-05-05 12:39:04 +02:00
|
|
|
private static final Logger log = LoggerFactory.getLogger(GenerateNativeStoreSparkJob.class);
|
2021-02-01 13:56:05 +01:00
|
|
|
|
2021-02-01 19:29:10 +01:00
|
|
|
private static final ObjectMapper MAPPER = new ObjectMapper();
|
2021-02-01 13:56:05 +01:00
|
|
|
|
2021-02-01 19:29:10 +01:00
|
|
|
private static final String DATASET_NAME = "/store";
|
2021-02-01 13:56:05 +01:00
|
|
|
|
2021-01-25 15:05:37 +01:00
|
|
|
public static void main(String[] args) throws Exception {
|
|
|
|
|
|
|
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
2021-01-28 09:51:17 +01:00
|
|
|
IOUtils
|
|
|
|
.toString(
|
|
|
|
GenerateNativeStoreSparkJob.class
|
|
|
|
.getResourceAsStream(
|
|
|
|
"/eu/dnetlib/dhp/collection/collection_input_parameters.json")));
|
2021-01-25 15:05:37 +01:00
|
|
|
parser.parseArgument(args);
|
2021-02-01 19:29:10 +01:00
|
|
|
|
2021-01-28 09:51:17 +01:00
|
|
|
final String provenanceArgument = parser.get("provenance");
|
|
|
|
log.info("Provenance is {}", provenanceArgument);
|
2021-02-01 19:29:10 +01:00
|
|
|
final Provenance provenance = MAPPER.readValue(provenanceArgument, Provenance.class);
|
2021-01-29 16:42:41 +01:00
|
|
|
|
2021-01-28 09:51:17 +01:00
|
|
|
final String dateOfCollectionArgs = parser.get("dateOfCollection");
|
|
|
|
log.info("dateOfCollection is {}", dateOfCollectionArgs);
|
2021-02-01 19:29:10 +01:00
|
|
|
final Long dateOfCollection = new Long(dateOfCollectionArgs);
|
2021-01-29 16:42:41 +01:00
|
|
|
|
|
|
|
String mdStoreVersion = parser.get("mdStoreVersion");
|
|
|
|
log.info("mdStoreVersion is {}", mdStoreVersion);
|
|
|
|
|
2021-02-01 19:29:10 +01:00
|
|
|
final MDStoreVersion currentVersion = MAPPER.readValue(mdStoreVersion, MDStoreVersion.class);
|
2021-01-25 15:05:37 +01:00
|
|
|
|
2021-02-01 13:56:05 +01:00
|
|
|
String readMdStoreVersionParam = parser.get("readMdStoreVersion");
|
|
|
|
log.info("readMdStoreVersion is {}", readMdStoreVersionParam);
|
|
|
|
|
|
|
|
final MDStoreVersion readMdStoreVersion = StringUtils.isBlank(readMdStoreVersionParam) ? null
|
2021-02-01 19:29:10 +01:00
|
|
|
: MAPPER.readValue(readMdStoreVersionParam, MDStoreVersion.class);
|
|
|
|
|
|
|
|
final String xpath = parser.get("xpath");
|
|
|
|
log.info("xpath is {}", xpath);
|
|
|
|
|
|
|
|
final String encoding = parser.get("encoding");
|
|
|
|
log.info("encoding is {}", encoding);
|
2021-02-01 13:56:05 +01:00
|
|
|
|
2021-01-25 15:05:37 +01:00
|
|
|
Boolean isSparkSessionManaged = Optional
|
2021-01-28 09:51:17 +01:00
|
|
|
.ofNullable(parser.get("isSparkSessionManaged"))
|
|
|
|
.map(Boolean::valueOf)
|
|
|
|
.orElse(Boolean.TRUE);
|
2021-01-25 15:05:37 +01:00
|
|
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
|
|
|
|
|
|
|
SparkConf conf = new SparkConf();
|
2021-02-01 19:29:10 +01:00
|
|
|
/*
|
|
|
|
* conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf .registerKryoClasses( new
|
|
|
|
* Class[] { MetadataRecord.class, Provenance.class });
|
|
|
|
*/
|
2021-02-01 13:56:05 +01:00
|
|
|
|
2021-01-25 15:05:37 +01:00
|
|
|
runWithSparkSession(
|
2021-01-28 09:51:17 +01:00
|
|
|
conf,
|
|
|
|
isSparkSessionManaged,
|
2021-02-01 19:29:10 +01:00
|
|
|
spark -> createNativeMDStore(
|
|
|
|
spark, provenance, dateOfCollection, xpath, encoding, currentVersion, readMdStoreVersion));
|
2021-01-25 15:05:37 +01:00
|
|
|
}
|
|
|
|
|
2021-02-01 19:29:10 +01:00
|
|
|
private static void createNativeMDStore(SparkSession spark,
|
|
|
|
Provenance provenance,
|
|
|
|
Long dateOfCollection,
|
|
|
|
String xpath,
|
|
|
|
String encoding,
|
|
|
|
MDStoreVersion currentVersion,
|
|
|
|
MDStoreVersion readVersion) throws IOException {
|
|
|
|
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
|
|
|
|
|
|
|
final LongAccumulator totalItems = sc.sc().longAccumulator("TotalItems");
|
|
|
|
final LongAccumulator invalidRecords = sc.sc().longAccumulator("InvalidRecords");
|
|
|
|
|
|
|
|
final String seqFilePath = currentVersion.getHdfsPath() + CollectorWorkerApplication.SEQUENCE_FILE_NAME;
|
|
|
|
final JavaRDD<MetadataRecord> nativeStore = sc
|
|
|
|
.sequenceFile(seqFilePath, IntWritable.class, Text.class)
|
|
|
|
.map(
|
|
|
|
item -> parseRecord(
|
|
|
|
item._2().toString(),
|
|
|
|
xpath,
|
|
|
|
encoding,
|
|
|
|
provenance,
|
|
|
|
dateOfCollection,
|
|
|
|
totalItems,
|
|
|
|
invalidRecords))
|
|
|
|
.filter(Objects::nonNull)
|
|
|
|
.distinct();
|
|
|
|
|
|
|
|
final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
|
|
|
|
final Dataset<MetadataRecord> mdstore = spark.createDataset(nativeStore.rdd(), encoder);
|
|
|
|
|
|
|
|
final String targetPath = currentVersion.getHdfsPath() + DATASET_NAME;
|
|
|
|
|
|
|
|
if (readVersion != null) { // INCREMENTAL MODE
|
|
|
|
log.info("updating {} incrementally with {}", targetPath, readVersion.getHdfsPath());
|
|
|
|
Dataset<MetadataRecord> currentMdStoreVersion = spark
|
|
|
|
.read()
|
|
|
|
.load(readVersion.getHdfsPath() + DATASET_NAME)
|
|
|
|
.as(encoder);
|
|
|
|
TypedColumn<MetadataRecord, MetadataRecord> aggregator = new MDStoreAggregator().toColumn();
|
|
|
|
|
|
|
|
final Dataset<MetadataRecord> map = currentMdStoreVersion
|
|
|
|
.union(mdstore)
|
|
|
|
.groupByKey(
|
|
|
|
(MapFunction<MetadataRecord, String>) MetadataRecord::getId,
|
|
|
|
Encoders.STRING())
|
|
|
|
.agg(aggregator)
|
|
|
|
.map((MapFunction<Tuple2<String, MetadataRecord>, MetadataRecord>) Tuple2::_2, encoder);
|
|
|
|
|
|
|
|
map.select("id").takeAsList(100).forEach(s -> log.info(s.toString()));
|
|
|
|
|
|
|
|
saveDataset(map, targetPath);
|
|
|
|
|
|
|
|
} else {
|
|
|
|
saveDataset(mdstore, targetPath);
|
|
|
|
}
|
|
|
|
|
|
|
|
final Long total = spark.read().load(targetPath).count();
|
|
|
|
log.info("collected {} records for datasource '{}'", total, provenance.getDatasourceName());
|
|
|
|
|
|
|
|
writeTotalSizeOnHDFS(spark, total, currentVersion.getHdfsPath() + "/size");
|
|
|
|
}
|
|
|
|
|
|
|
|
public static class MDStoreAggregator extends Aggregator<MetadataRecord, MetadataRecord, MetadataRecord> {
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public MetadataRecord zero() {
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public MetadataRecord reduce(MetadataRecord b, MetadataRecord a) {
|
|
|
|
return getLatestRecord(b, a);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public MetadataRecord merge(MetadataRecord b, MetadataRecord a) {
|
|
|
|
return getLatestRecord(b, a);
|
|
|
|
}
|
|
|
|
|
|
|
|
private MetadataRecord getLatestRecord(MetadataRecord b, MetadataRecord a) {
|
|
|
|
if (b == null)
|
|
|
|
return a;
|
|
|
|
|
|
|
|
if (a == null)
|
|
|
|
return b;
|
|
|
|
return (a.getDateOfCollection() > b.getDateOfCollection()) ? a : b;
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public MetadataRecord finish(MetadataRecord r) {
|
|
|
|
return r;
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public Encoder<MetadataRecord> bufferEncoder() {
|
|
|
|
return Encoders.bean(MetadataRecord.class);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public Encoder<MetadataRecord> outputEncoder() {
|
|
|
|
return Encoders.bean(MetadataRecord.class);
|
|
|
|
}
|
2021-02-01 14:58:06 +01:00
|
|
|
|
|
|
|
}
|
|
|
|
|
2020-04-27 14:52:31 +02:00
|
|
|
public static MetadataRecord parseRecord(
|
|
|
|
final String input,
|
|
|
|
final String xpath,
|
|
|
|
final String encoding,
|
|
|
|
final Provenance provenance,
|
|
|
|
final Long dateOfCollection,
|
|
|
|
final LongAccumulator totalItems,
|
|
|
|
final LongAccumulator invalidRecords) {
|
|
|
|
|
|
|
|
if (totalItems != null)
|
|
|
|
totalItems.add(1);
|
|
|
|
try {
|
|
|
|
SAXReader reader = new SAXReader();
|
|
|
|
Document document = reader.read(new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)));
|
|
|
|
Node node = document.selectSingleNode(xpath);
|
|
|
|
final String originalIdentifier = node.getText();
|
|
|
|
if (StringUtils.isBlank(originalIdentifier)) {
|
|
|
|
if (invalidRecords != null)
|
|
|
|
invalidRecords.add(1);
|
|
|
|
return null;
|
|
|
|
}
|
2021-02-01 19:29:10 +01:00
|
|
|
return new MetadataRecord(originalIdentifier, encoding, provenance, document.asXML(), dateOfCollection);
|
2020-04-27 14:52:31 +02:00
|
|
|
} catch (Throwable e) {
|
2021-01-28 09:51:17 +01:00
|
|
|
invalidRecords.add(1);
|
2020-04-27 14:52:31 +02:00
|
|
|
return null;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-03-18 10:44:35 +01:00
|
|
|
}
|