From da65728afedf087b6948c8797cfeb03393214043 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Thu, 15 Feb 2024 14:04:17 +0100 Subject: [PATCH] produce a parquet file --- .../plugin/base/BaseAnalyzerJob.java | 182 +++++++++--------- .../plugin/base/BaseRecordInfo.java | 42 ++++ .../plugin/base/action_set_parameters.json | 12 ++ .../plugin/base/oozie_app/workflow.xml | 21 +- 4 files changed, 156 insertions(+), 101 deletions(-) create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseRecordInfo.java diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseAnalyzerJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseAnalyzerJob.java index cd1216994..1ca9e8966 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseAnalyzerJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseAnalyzerJob.java @@ -4,25 +4,25 @@ package eu.dnetlib.dhp.collection.plugin.base; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.IOException; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.DeflateCodec; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.dom4j.Attribute; import org.dom4j.Document; @@ -33,9 +33,6 @@ import org.dom4j.Node; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; - import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.aggregation.AggregatorReport; @@ -43,8 +40,6 @@ public class BaseAnalyzerJob { private static final Logger log = LoggerFactory.getLogger(BaseAnalyzerJob.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static void main(final String[] args) throws Exception { final String jsonConfiguration = IOUtils @@ -65,122 +60,127 @@ public class BaseAnalyzerJob { final String inputPath = parser.get("inputPath"); log.info("inputPath: {}", inputPath); + final String dataPath = parser.get("dataPath"); + log.info("dataPath {}: ", dataPath); + final String outputPath = parser.get("outputPath"); log.info("outputPath {}: ", outputPath); + final boolean reimport = Boolean.parseBoolean(parser.get("reimport")); + log.info("reimport {}: ", reimport); + final SparkConf conf = new SparkConf(); - runWithSparkSession(conf, isSparkSessionManaged, spark -> processBaseRecords(spark, inputPath, outputPath)); + runWithSparkSession(conf, isSparkSessionManaged, spark -> processBaseRecords(spark, inputPath, dataPath, outputPath, reimport)); } private static void processBaseRecords(final SparkSession spark, final String inputPath, - final String outputPath) throws IOException { + final String dataPath, + final String outputPath, + final boolean reimport) throws IOException { try (final FileSystem fs = FileSystem.get(new Configuration()); final AggregatorReport report = new AggregatorReport()) { - final Map fields = new HashMap<>(); - final Map types = new HashMap<>(); - final Map collections = new HashMap<>(); - final Map totals = new HashMap<>(); - analyze(fs, inputPath, fields, types, collections, totals, report); + if (reimport) { + fs.delete(new Path(dataPath), true); + loadRecords(fs, inputPath, dataPath, report); + } - saveReport(fs, outputPath + "/fields", fields); - saveReport(fs, outputPath + "/types", types); - saveReport(fs, outputPath + "/collections", collections); - saveReport(fs, outputPath + "/totals", totals); + fs.delete(new Path(outputPath), true); + extractInfo(spark, dataPath, outputPath); } catch (final Throwable e) { throw new RuntimeException(e); } } - private static void analyze(final FileSystem fs, + private static void loadRecords(final FileSystem fs, final String inputPath, - final Map fields, - final Map types, - final Map collections, - final Map totals, - final AggregatorReport report) throws JsonProcessingException, IOException, DocumentException { + final String outputPath, + final AggregatorReport report) + throws Exception { final AtomicLong recordsCounter = new AtomicLong(0); - totals.put("Records", recordsCounter); + final LongWritable key = new LongWritable(); + final Text value = new Text(); - final BaseCollectorIterator iteraror = new BaseCollectorIterator(fs, new Path(inputPath), report); - - while (iteraror.hasNext()) { - final Document record = DocumentHelper.parseText(iteraror.next()); - - final long i = recordsCounter.incrementAndGet(); - if ((i % 10000) == 0) { - log.info("# Read records: " + i); - log.info("# fields: " + fields.size()); - log.info("# types: " + types.size()); - log.info("# collections: " + collections.size()); - log.info("# totals: " + totals.size()); - } - - final List recTypes = new ArrayList<>(); - - for (final Object o : record.selectNodes("//*|//@*")) { - - incrementMapCounter(fields, ((Node) o).getPath()); - - final String nodeName = ((Node) o).getName(); - - if (o instanceof Element) { - final Element n = (Element) o; - if ("collection".equals(nodeName)) { - final String collName = n.getText().trim(); - if (StringUtils.isNotBlank(collName)) { - final Map map = new HashMap<>(); - for (final Object ao : n.attributes()) { - map.put(((Attribute) ao).getName(), ((Attribute) ao).getValue()); - } - incrementMapCounter(collections, collName + ": " + OBJECT_MAPPER.writeValueAsString(map)); - } - } else if ("type".equals(nodeName)) { - recTypes.add("TYPE: " + n.getText().trim()); - } else if ("typenorm".equals(nodeName)) { - recTypes.add("TYPE_NORM: " + n.getText().trim()); - } - } - } - - incrementMapCounter(types, recTypes.stream().sorted().distinct().collect(Collectors.joining(", "))); - } - } - - private static void incrementMapCounter(final Map map, final String key) { - if (StringUtils.isNotBlank(key)) { - if (map.containsKey(key)) { - map.get(key).incrementAndGet(); - } else { - map.put(key, new AtomicLong(1)); - } - } - } - - private static void saveReport(final FileSystem fs, final String outputPath, final Map fields) - throws JsonProcessingException, IOException { try (final SequenceFile.Writer writer = SequenceFile .createWriter(fs.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer - .keyClass(IntWritable.class), SequenceFile.Writer + .keyClass(LongWritable.class), SequenceFile.Writer .valueClass(Text.class), SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) { - final Text key = new Text(); - final Text value = new Text(); + final BaseCollectorIterator iteraror = new BaseCollectorIterator(fs, new Path(inputPath), report); - for (final Entry e : fields.entrySet()) { - key.set(e.getKey()); - value.set(e.getKey() + ": " + e.getValue()); + while (iteraror.hasNext()) { + final String record = iteraror.next(); + + final long i = recordsCounter.incrementAndGet(); + if ((i % 10000) == 0) { + log.info("# Loaded records: " + i); + } + + key.set(i); + value.set(record); try { writer.append(key, value); } catch (final Throwable e1) { throw new RuntimeException(e1); } } + + log.info("# COMPLETED - Loaded records: " + recordsCounter.get()); + } + } + + private static void extractInfo(final SparkSession spark, + final String inputPath, + final String targetPath) throws Exception { + + final JavaRDD rdd = JavaSparkContext.fromSparkContext(spark.sparkContext()) + .sequenceFile(inputPath, LongWritable.class, Text.class) + .map(s -> s._2) + .map(BaseAnalyzerJob::extractInfo); + + spark.createDataset(rdd.rdd(), Encoders.bean(BaseRecordInfo.class)) + .write() + .mode(SaveMode.Overwrite) + .format("parquet") + .save(targetPath); + } + + private static BaseRecordInfo extractInfo(final Text s) { + try { + final Document record = DocumentHelper.parseText(s.toString()); + + final BaseRecordInfo info = new BaseRecordInfo(); + + for (final Object o : record.selectNodes("//*|//@*")) { + info.getPaths().add(((Node) o).getPath()); + + final String nodeName = ((Node) o).getName(); + if (o instanceof Element) { + final Element n = (Element) o; + if ("collection".equals(nodeName)) { + final String collName = n.getText().trim(); + if (StringUtils.isNotBlank(collName)) { + final Map attrs = new HashMap<>(); + for (final Object ao : n.attributes()) { + attrs.put(((Attribute) ao).getName(), ((Attribute) ao).getValue()); + } + info.getCollections().put(collName, attrs); + } + } else if ("type".equals(nodeName)) { + info.getTypes().add("TYPE: " + n.getText().trim()); + } else if ("typenorm".equals(nodeName)) { + info.getTypes().add("TYPE_NORM: " + n.getText().trim()); + } + } + } + return info; + } catch (final DocumentException e) { + throw new RuntimeException(e); } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseRecordInfo.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseRecordInfo.java new file mode 100644 index 000000000..727958cf0 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseRecordInfo.java @@ -0,0 +1,42 @@ +package eu.dnetlib.dhp.collection.plugin.base; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; + +public class BaseRecordInfo implements Serializable { + + private static final long serialVersionUID = -8848232018350074593L; + + private Set paths = new LinkedHashSet<>(); + private Set types = new LinkedHashSet<>(); + + public Set getPaths() { + return this.paths; + } + + public void setPaths(final Set paths) { + this.paths = paths; + } + + public Set getTypes() { + return this.types; + } + + public void setTypes(final Set types) { + this.types = types; + } + + public Map> getCollections() { + return this.collections; + } + + public void setCollections(final Map> collections) { + this.collections = collections; + } + + private Map> collections = new HashMap<>(); + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/plugin/base/action_set_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/plugin/base/action_set_parameters.json index db08f16bd..bc3148f3d 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/plugin/base/action_set_parameters.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/plugin/base/action_set_parameters.json @@ -5,10 +5,22 @@ "paramDescription": "the path of the BASE dump", "paramRequired": true }, + { + "paramName": "d", + "paramLongName": "dataPath", + "paramDescription": "the path of the loaded records", + "paramRequired": true + }, { "paramName": "o", "paramLongName": "outputPath", "paramDescription": "the path of the generated report", "paramRequired": true + }, + { + "paramName": "r", + "paramLongName": "reimport", + "paramDescription": "complete re-import", + "paramRequired": true } ] diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/plugin/base/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/plugin/base/oozie_app/workflow.xml index cb86bde35..a59675e4b 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/plugin/base/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/plugin/base/oozie_app/workflow.xml @@ -4,26 +4,25 @@ baseInputPath the path of the BASE dump + + baseDataPath + the path where to store BASE records + baseReportsPath path where to store the reports + + baseReimportFlag + flag to re-import the records from dump + - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - - - - - @@ -43,7 +42,9 @@ --conf spark.sql.shuffle.partitions=3840 --inputPath${baseInputPath} + --dataPath${baseDataPath} --outputPath${baseReportsPath} + --reimport${baseReimportFlag}