produce a parquet file

This commit is contained in:
Michele Artini 2024-02-15 14:04:17 +01:00
parent e254720377
commit da65728afe
4 changed files with 156 additions and 101 deletions

View File

@ -4,25 +4,25 @@ package eu.dnetlib.dhp.collection.plugin.base;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.DeflateCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.dom4j.Attribute;
import org.dom4j.Document;
@ -33,9 +33,6 @@ import org.dom4j.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
@ -43,8 +40,6 @@ public class BaseAnalyzerJob {
private static final Logger log = LoggerFactory.getLogger(BaseAnalyzerJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(final String[] args) throws Exception {
final String jsonConfiguration = IOUtils
@ -65,122 +60,127 @@ public class BaseAnalyzerJob {
final String inputPath = parser.get("inputPath");
log.info("inputPath: {}", inputPath);
final String dataPath = parser.get("dataPath");
log.info("dataPath {}: ", dataPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath {}: ", outputPath);
final boolean reimport = Boolean.parseBoolean(parser.get("reimport"));
log.info("reimport {}: ", reimport);
final SparkConf conf = new SparkConf();
runWithSparkSession(conf, isSparkSessionManaged, spark -> processBaseRecords(spark, inputPath, outputPath));
runWithSparkSession(conf, isSparkSessionManaged, spark -> processBaseRecords(spark, inputPath, dataPath, outputPath, reimport));
}
private static void processBaseRecords(final SparkSession spark,
final String inputPath,
final String outputPath) throws IOException {
final String dataPath,
final String outputPath,
final boolean reimport) throws IOException {
try (final FileSystem fs = FileSystem.get(new Configuration());
final AggregatorReport report = new AggregatorReport()) {
final Map<String, AtomicLong> fields = new HashMap<>();
final Map<String, AtomicLong> types = new HashMap<>();
final Map<String, AtomicLong> collections = new HashMap<>();
final Map<String, AtomicLong> totals = new HashMap<>();
analyze(fs, inputPath, fields, types, collections, totals, report);
if (reimport) {
fs.delete(new Path(dataPath), true);
loadRecords(fs, inputPath, dataPath, report);
}
saveReport(fs, outputPath + "/fields", fields);
saveReport(fs, outputPath + "/types", types);
saveReport(fs, outputPath + "/collections", collections);
saveReport(fs, outputPath + "/totals", totals);
fs.delete(new Path(outputPath), true);
extractInfo(spark, dataPath, outputPath);
} catch (final Throwable e) {
throw new RuntimeException(e);
}
}
private static void analyze(final FileSystem fs,
private static void loadRecords(final FileSystem fs,
final String inputPath,
final Map<String, AtomicLong> fields,
final Map<String, AtomicLong> types,
final Map<String, AtomicLong> collections,
final Map<String, AtomicLong> totals,
final AggregatorReport report) throws JsonProcessingException, IOException, DocumentException {
final String outputPath,
final AggregatorReport report)
throws Exception {
final AtomicLong recordsCounter = new AtomicLong(0);
totals.put("Records", recordsCounter);
final LongWritable key = new LongWritable();
final Text value = new Text();
final BaseCollectorIterator iteraror = new BaseCollectorIterator(fs, new Path(inputPath), report);
while (iteraror.hasNext()) {
final Document record = DocumentHelper.parseText(iteraror.next());
final long i = recordsCounter.incrementAndGet();
if ((i % 10000) == 0) {
log.info("# Read records: " + i);
log.info("# fields: " + fields.size());
log.info("# types: " + types.size());
log.info("# collections: " + collections.size());
log.info("# totals: " + totals.size());
}
final List<String> recTypes = new ArrayList<>();
for (final Object o : record.selectNodes("//*|//@*")) {
incrementMapCounter(fields, ((Node) o).getPath());
final String nodeName = ((Node) o).getName();
if (o instanceof Element) {
final Element n = (Element) o;
if ("collection".equals(nodeName)) {
final String collName = n.getText().trim();
if (StringUtils.isNotBlank(collName)) {
final Map<String, String> map = new HashMap<>();
for (final Object ao : n.attributes()) {
map.put(((Attribute) ao).getName(), ((Attribute) ao).getValue());
}
incrementMapCounter(collections, collName + ": " + OBJECT_MAPPER.writeValueAsString(map));
}
} else if ("type".equals(nodeName)) {
recTypes.add("TYPE: " + n.getText().trim());
} else if ("typenorm".equals(nodeName)) {
recTypes.add("TYPE_NORM: " + n.getText().trim());
}
}
}
incrementMapCounter(types, recTypes.stream().sorted().distinct().collect(Collectors.joining(", ")));
}
}
private static void incrementMapCounter(final Map<String, AtomicLong> map, final String key) {
if (StringUtils.isNotBlank(key)) {
if (map.containsKey(key)) {
map.get(key).incrementAndGet();
} else {
map.put(key, new AtomicLong(1));
}
}
}
private static void saveReport(final FileSystem fs, final String outputPath, final Map<String, AtomicLong> fields)
throws JsonProcessingException, IOException {
try (final SequenceFile.Writer writer = SequenceFile
.createWriter(fs.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer
.keyClass(IntWritable.class), SequenceFile.Writer
.keyClass(LongWritable.class), SequenceFile.Writer
.valueClass(Text.class), SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) {
final Text key = new Text();
final Text value = new Text();
final BaseCollectorIterator iteraror = new BaseCollectorIterator(fs, new Path(inputPath), report);
for (final Entry<String, AtomicLong> e : fields.entrySet()) {
key.set(e.getKey());
value.set(e.getKey() + ": " + e.getValue());
while (iteraror.hasNext()) {
final String record = iteraror.next();
final long i = recordsCounter.incrementAndGet();
if ((i % 10000) == 0) {
log.info("# Loaded records: " + i);
}
key.set(i);
value.set(record);
try {
writer.append(key, value);
} catch (final Throwable e1) {
throw new RuntimeException(e1);
}
}
log.info("# COMPLETED - Loaded records: " + recordsCounter.get());
}
}
private static void extractInfo(final SparkSession spark,
final String inputPath,
final String targetPath) throws Exception {
final JavaRDD<BaseRecordInfo> rdd = JavaSparkContext.fromSparkContext(spark.sparkContext())
.sequenceFile(inputPath, LongWritable.class, Text.class)
.map(s -> s._2)
.map(BaseAnalyzerJob::extractInfo);
spark.createDataset(rdd.rdd(), Encoders.bean(BaseRecordInfo.class))
.write()
.mode(SaveMode.Overwrite)
.format("parquet")
.save(targetPath);
}
private static BaseRecordInfo extractInfo(final Text s) {
try {
final Document record = DocumentHelper.parseText(s.toString());
final BaseRecordInfo info = new BaseRecordInfo();
for (final Object o : record.selectNodes("//*|//@*")) {
info.getPaths().add(((Node) o).getPath());
final String nodeName = ((Node) o).getName();
if (o instanceof Element) {
final Element n = (Element) o;
if ("collection".equals(nodeName)) {
final String collName = n.getText().trim();
if (StringUtils.isNotBlank(collName)) {
final Map<String, String> attrs = new HashMap<>();
for (final Object ao : n.attributes()) {
attrs.put(((Attribute) ao).getName(), ((Attribute) ao).getValue());
}
info.getCollections().put(collName, attrs);
}
} else if ("type".equals(nodeName)) {
info.getTypes().add("TYPE: " + n.getText().trim());
} else if ("typenorm".equals(nodeName)) {
info.getTypes().add("TYPE_NORM: " + n.getText().trim());
}
}
}
return info;
} catch (final DocumentException e) {
throw new RuntimeException(e);
}
}

View File

@ -0,0 +1,42 @@
package eu.dnetlib.dhp.collection.plugin.base;
import java.io.Serializable;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
public class BaseRecordInfo implements Serializable {
private static final long serialVersionUID = -8848232018350074593L;
private Set<String> paths = new LinkedHashSet<>();
private Set<String> types = new LinkedHashSet<>();
public Set<String> getPaths() {
return this.paths;
}
public void setPaths(final Set<String> paths) {
this.paths = paths;
}
public Set<String> getTypes() {
return this.types;
}
public void setTypes(final Set<String> types) {
this.types = types;
}
public Map<String, Map<String, String>> getCollections() {
return this.collections;
}
public void setCollections(final Map<String, Map<String, String>> collections) {
this.collections = collections;
}
private Map<String, Map<String, String>> collections = new HashMap<>();
}

View File

@ -5,10 +5,22 @@
"paramDescription": "the path of the BASE dump",
"paramRequired": true
},
{
"paramName": "d",
"paramLongName": "dataPath",
"paramDescription": "the path of the loaded records",
"paramRequired": true
},
{
"paramName": "o",
"paramLongName": "outputPath",
"paramDescription": "the path of the generated report",
"paramRequired": true
},
{
"paramName": "r",
"paramLongName": "reimport",
"paramDescription": "complete re-import",
"paramRequired": true
}
]

View File

@ -4,26 +4,25 @@
<name>baseInputPath</name>
<description>the path of the BASE dump</description>
</property>
<property>
<name>baseDataPath</name>
<description>the path where to store BASE records</description>
</property>
<property>
<name>baseReportsPath</name>
<description>path where to store the reports</description>
</property>
<property>
<name>baseReimportFlag</name>
<description>flag to re-import the records from dump</description>
</property>
</parameters>
<start to="deleteoutputpath"/>
<start to="analyzeBaseRecords"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="deleteoutputpath">
<fs>
<delete path='${baseReportsPath}'/>
<mkdir path='${baseReportsPath}'/>
</fs>
<ok to="analyzeBaseRecords"/>
<error to="Kill"/>
</action>
<action name="analyzeBaseRecords">
<spark xmlns="uri:oozie:spark-action:0.2">
@ -43,7 +42,9 @@
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--inputPath</arg><arg>${baseInputPath}</arg>
<arg>--dataPath</arg><arg>${baseDataPath}</arg>
<arg>--outputPath</arg><arg>${baseReportsPath}</arg>
<arg>--reimport</arg><arg>${baseReimportFlag}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>