From 963a2500be2f6895d2d903f84dd5fbc335a94c85 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Wed, 14 Feb 2024 10:37:39 +0100 Subject: [PATCH] new reports in hadoop job --- .../plugin/base/BaseAnalyzerJob.java | 99 ++++++++++--------- .../plugin/base/oozie_app/workflow.xml | 8 +- .../base/BaseCollectorIteratorTest.java | 53 +++++++--- 3 files changed, 100 insertions(+), 60 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseAnalyzerJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseAnalyzerJob.java index 7a9c96a5e..2ce928215 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseAnalyzerJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseAnalyzerJob.java @@ -3,11 +3,14 @@ package eu.dnetlib.dhp.collection.plugin.base; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; @@ -23,6 +26,7 @@ import org.apache.spark.sql.SparkSession; import org.dom4j.Attribute; import org.dom4j.Document; import org.dom4j.Element; +import org.dom4j.Node; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,83 +69,90 @@ public class BaseAnalyzerJob { final SparkConf conf = new SparkConf(); - runWithSparkSession(conf, isSparkSessionManaged, spark -> { - removeOutputDir(spark, outputPath); - processBaseRecords(spark, inputPath, outputPath); - }); - } - - private static void removeOutputDir(final SparkSession spark, final String path) { - HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); + runWithSparkSession(conf, isSparkSessionManaged, spark -> processBaseRecords(spark, inputPath, outputPath)); } private static void processBaseRecords(final SparkSession spark, final String inputPath, final String outputPath) throws IOException { + HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); + try (final FileSystem fs = FileSystem.get(new Configuration()); final AggregatorReport report = new AggregatorReport()) { - final Map collections = new HashMap<>(); - collect(fs, inputPath, outputPath + "/temp", collections, report); - finalReport(fs, outputPath + "/final", collections); + final AtomicInteger recordsCounter = new AtomicInteger(0); + final Map fields = new HashMap<>(); + final Map types = new HashMap<>(); + final Map collections = new HashMap<>(); + + analyze(fs, inputPath, recordsCounter, fields, types, collections, report); + saveReport(fs, outputPath + "/total", Map.of("#records", recordsCounter)); + saveReport(fs, outputPath + "/fields", fields); + saveReport(fs, outputPath + "/types", types); + saveReport(fs, outputPath + "/collections", collections); } catch (final Throwable e) { throw new RuntimeException(e); } } - private static void collect(final FileSystem fs, + private static void analyze(final FileSystem fs, final String inputPath, - final String outputPath, - final Map collections, + final AtomicInteger recordsCounter, + final Map fields, + final Map types, + final Map collections, final AggregatorReport report) throws JsonProcessingException, IOException { - try (final SequenceFile.Writer writer = SequenceFile - .createWriter(fs.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer - .keyClass(IntWritable.class), SequenceFile.Writer - .valueClass(Text.class), SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) { + final BaseCollectorIterator iteraror = new BaseCollectorIterator(fs, new Path(inputPath), report); - final AtomicInteger recordsCounter = new AtomicInteger(0); - final AtomicInteger collectionsCounter = new AtomicInteger(0); + while (iteraror.hasNext()) { + final Document record = iteraror.next(); - final BaseCollectorIterator iteraror = new BaseCollectorIterator(fs, new Path(inputPath), report); + final int i = recordsCounter.incrementAndGet(); + if ((i % 10000) == 0) { + log.info("# Read records: " + i); + } - while (iteraror.hasNext()) { - final Document record = iteraror.next(); + final List recTypes = new ArrayList<>(); - final int i = recordsCounter.incrementAndGet(); - if ((i % 1000) == 0) { - log.info("# Read records: " + i); - } + for (final Object o : record.selectNodes("//*[local-name()='metadata']//*")) { - for (final Object o : record.selectNodes("//*[local-name() = 'collection']")) { + incrementMapCounter(fields, ((Node) o).getPath()); + final String nodeName = ((Node) o).getName(); + + if ("collection".equals(nodeName)) { final Element n = (Element) o; final String collName = n.getText().trim(); - if (StringUtils.isNotBlank(collName) && !collections.containsKey(collName)) { + if (StringUtils.isNotBlank(collName)) { final Map map = new HashMap<>(); - for (final Object ao : n.attributes()) { map.put(((Attribute) ao).getName(), ((Attribute) ao).getValue()); } - final String attrs = OBJECT_MAPPER.writeValueAsString(map); - - collections.put(collName, attrs); - - final IntWritable key = new IntWritable(collectionsCounter.incrementAndGet()); - final Text value = new Text(collName + ": " + attrs); - - try { - writer.append(key, value); - } catch (final Throwable e1) { - throw new RuntimeException(e1); - } + incrementMapCounter(collections, collName + ": " + OBJECT_MAPPER.writeValueAsString(map)); } + } else if ("type".equals(nodeName)) { + recTypes.add("TYPE: " + nodeName); + } else if ("typenorm".equals(nodeName)) { + recTypes.add("TYPE_NORM: " + nodeName); } } + + incrementMapCounter(types, recTypes.stream().sorted().distinct().collect(Collectors.joining(", "))); + } + } + + private static void incrementMapCounter(final Map map, final String key) { + if (StringUtils.isNotBlank(key)) { + if (map.containsKey(key)) { + map.get(key).incrementAndGet(); + } else { + map.put(key, new AtomicInteger(1)); + } } } - private static void finalReport(final FileSystem fs, final String outputPath, final Map collections) + private static void saveReport(final FileSystem fs, final String outputPath, final Map fields) throws JsonProcessingException, IOException { try (final SequenceFile.Writer writer = SequenceFile .createWriter(fs.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer @@ -151,7 +162,7 @@ public class BaseAnalyzerJob { final Text key = new Text(); final Text value = new Text(); - for (final Entry e : collections.entrySet()) { + for (final Entry e : fields.entrySet()) { key.set(e.getKey()); value.set(e.getKey() + ": " + e.getValue()); try { diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/plugin/base/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/plugin/base/oozie_app/workflow.xml index 184faabbd..cb86bde35 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/plugin/base/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/plugin/base/oozie_app/workflow.xml @@ -5,7 +5,7 @@ the path of the BASE dump - reportsPath + baseReportsPath path where to store the reports @@ -18,8 +18,8 @@ - - + + @@ -43,7 +43,7 @@ --conf spark.sql.shuffle.partitions=3840 --inputPath${baseInputPath} - --outputPath${reportsPath} + --outputPath${baseReportsPath} diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIteratorTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIteratorTest.java index 195cebcf4..3e0961957 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIteratorTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIteratorTest.java @@ -3,13 +3,17 @@ package eu.dnetlib.dhp.collection.plugin.base; import static org.junit.jupiter.api.Assertions.assertEquals; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.StringUtils; import org.dom4j.Attribute; import org.dom4j.Document; import org.dom4j.Element; +import org.dom4j.Node; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.junit.jupiter.MockitoExtension; @@ -29,6 +33,8 @@ public class BaseCollectorIteratorTest { final BaseCollectorIterator iterator = new BaseCollectorIterator("base-sample.tar", new AggregatorReport()); final Map> collections = new HashMap<>(); + final Map fields = new HashMap<>(); + final Set types = new HashSet<>(); while (iterator.hasNext()) { final Document record = iterator.next(); @@ -41,21 +47,33 @@ public class BaseCollectorIteratorTest { // System.out.println(record.asXML()); - for (final Object o : record.selectNodes("//*[local-name() = 'collection']")) { - - final Element n = (Element) o; - final String collName = n.getText().trim(); - if (StringUtils.isNotBlank(collName) && !collections.containsKey(collName)) { - final Map collAttrs = new HashMap<>(); - - for (final Object ao : n.attributes()) { - collAttrs.put(((Attribute) ao).getName(), ((Attribute) ao).getValue()); - } - - collections.put(collName, collAttrs); + for (final Object o : record.selectNodes("//*[local-name()='metadata']//*")) { + final String path = ((Node) o).getPath(); + if (fields.containsKey(path)) { + fields.get(path).incrementAndGet(); + } else { + fields.put(path, new AtomicInteger(1)); } + + if ("collection".equals(((Node) o).getName())) { + final Element n = (Element) o; + final String collName = n.getText().trim(); + if (StringUtils.isNotBlank(collName) && !collections.containsKey(collName)) { + final Map collAttrs = new HashMap<>(); + for (final Object ao : n.attributes()) { + collAttrs.put(((Attribute) ao).getName(), ((Attribute) ao).getValue()); + } + collections.put(collName, collAttrs); + } + } + + if ("type".equals(((Node) o).getName())) { + types.add(((Element) o).getText().trim()); + } + } + } final ObjectMapper mapper = new ObjectMapper(); @@ -64,6 +82,17 @@ public class BaseCollectorIteratorTest { } + for (final Entry e : fields.entrySet()) { + System.out.println(e.getKey() + ": " + e.getValue().get()); + + } + + System.out.println("TYPES: "); + for (final String s : types) { + System.out.println(s); + + } + assertEquals(30000, count); }