diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseAnalyzerJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseAnalyzerJob.java index ee7283a99..582676560 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseAnalyzerJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseAnalyzerJob.java @@ -5,8 +5,10 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.IOException; import java.util.HashMap; +import java.util.LinkedHashSet; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.io.IOUtils; @@ -43,17 +45,18 @@ public class BaseAnalyzerJob { public static void main(final String[] args) throws Exception { final String jsonConfiguration = IOUtils - .toString(BaseAnalyzerJob.class - .getResourceAsStream("/eu/dnetlib/dhp/collection/plugin/base/action_set_parameters.json")); + .toString( + BaseAnalyzerJob.class + .getResourceAsStream("/eu/dnetlib/dhp/collection/plugin/base/action_set_parameters.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); @@ -71,24 +74,24 @@ public class BaseAnalyzerJob { final SparkConf conf = new SparkConf(); - runWithSparkSession(conf, isSparkSessionManaged, spark -> processBaseRecords(spark, inputPath, dataPath, outputPath, reimport)); + runWithSparkSession( + conf, isSparkSessionManaged, spark -> processBaseRecords(spark, inputPath, dataPath, outputPath, reimport)); } private static void processBaseRecords(final SparkSession spark, - final String inputPath, - final String dataPath, - final String outputPath, - final boolean reimport) throws IOException { + final String inputPath, + final String dataPath, + final String outputPath, + final boolean reimport) throws IOException { try (final FileSystem fs = FileSystem.get(new Configuration()); - final AggregatorReport report = new AggregatorReport()) { + final AggregatorReport report = new AggregatorReport()) { if (reimport) { - fs.delete(new Path(dataPath), true); loadRecords(fs, inputPath, dataPath, report); } - fs.delete(new Path(outputPath), true); + // fs.delete(new Path(outputPath), true); extractInfo(spark, dataPath, outputPath); } catch (final Throwable e) { throw new RuntimeException(e); @@ -96,10 +99,10 @@ public class BaseAnalyzerJob { } private static void loadRecords(final FileSystem fs, - final String inputPath, - final String outputPath, - final AggregatorReport report) - throws Exception { + final String inputPath, + final String outputPath, + final AggregatorReport report) + throws Exception { final AtomicLong recordsCounter = new AtomicLong(0); @@ -107,9 +110,12 @@ public class BaseAnalyzerJob { final Text value = new Text(); try (final SequenceFile.Writer writer = SequenceFile - .createWriter(fs.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer - .keyClass(LongWritable.class), SequenceFile.Writer - .valueClass(Text.class), SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) { + .createWriter( + fs.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer + .keyClass(LongWritable.class), + SequenceFile.Writer + .valueClass(Text.class), + SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) { final BaseCollectorIterator iteraror = new BaseCollectorIterator(fs, new Path(inputPath), report); @@ -135,31 +141,35 @@ public class BaseAnalyzerJob { } private static void extractInfo(final SparkSession spark, - final String inputPath, - final String targetPath) throws Exception { + final String inputPath, + final String targetPath) throws Exception { - final JavaRDD rdd = JavaSparkContext.fromSparkContext(spark.sparkContext()) - .sequenceFile(inputPath, LongWritable.class, Text.class) - .map(s -> s._2) - .map(BaseAnalyzerJob::extractInfo); + final JavaRDD rdd = JavaSparkContext + .fromSparkContext(spark.sparkContext()) + .sequenceFile(inputPath, LongWritable.class, Text.class) + .map(s -> s._2.toString()) + .map(BaseAnalyzerJob::extractInfo); - spark.createDataset(rdd.rdd(), Encoders.bean(BaseRecordInfo.class)) - .write() - .mode(SaveMode.Overwrite) - .format("parquet") - .save(targetPath); + spark + .createDataset(rdd.rdd(), Encoders.bean(BaseRecordInfo.class)) + .write() + .mode(SaveMode.Overwrite) + .format("parquet") + .save(targetPath); } - private static BaseRecordInfo extractInfo(final Text s) { + protected static BaseRecordInfo extractInfo(final String s) { try { - final Document record = DocumentHelper.parseText(s.toString()); + final Document record = DocumentHelper.parseText(s); final BaseRecordInfo info = new BaseRecordInfo(); - info.setId(record.valueOf("//*[local-name() = 'header']/*[local-name() = 'identifier']").trim()); + final Set paths = new LinkedHashSet<>(); + final Set types = new LinkedHashSet<>(); + final Map> colls = new HashMap<>(); for (final Object o : record.selectNodes("//*|//@*")) { - info.getPaths().add(((Node) o).getPath()); + paths.add(((Node) o).getPath()); if (o instanceof Element) { final Element n = (Element) o; @@ -173,15 +183,21 @@ public class BaseAnalyzerJob { for (final Object ao : n.attributes()) { attrs.put(((Attribute) ao).getName(), ((Attribute) ao).getValue()); } - info.getCollections().put(collName, attrs); + colls.put(collName, attrs); } } else if ("type".equals(nodeName)) { - info.getTypes().add("TYPE: " + n.getText().trim()); + types.add("TYPE: " + n.getText().trim()); } else if ("typenorm".equals(nodeName)) { - info.getTypes().add("TYPE_NORM: " + n.getText().trim()); + types.add("TYPE_NORM: " + n.getText().trim()); } } } + + info.setId(record.valueOf("//*[local-name() = 'header']/*[local-name() = 'identifier']").trim()); + info.getTypes().addAll(types); + info.getPaths().addAll(paths); + info.setCollections(colls); + return info; } catch (final DocumentException e) { throw new RuntimeException(e); diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseRecordInfo.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseRecordInfo.java index 920e61230..d7d635a0d 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseRecordInfo.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseRecordInfo.java @@ -1,10 +1,11 @@ + package eu.dnetlib.dhp.collection.plugin.base; import java.io.Serializable; +import java.util.ArrayList; import java.util.HashMap; -import java.util.LinkedHashSet; +import java.util.List; import java.util.Map; -import java.util.Set; public class BaseRecordInfo implements Serializable { @@ -12,8 +13,8 @@ public class BaseRecordInfo implements Serializable { private String id; private Map> collections = new HashMap<>(); - private Set paths = new LinkedHashSet<>(); - private Set types = new LinkedHashSet<>(); + private List paths = new ArrayList<>(); + private List types = new ArrayList<>(); public String getId() { return this.id; @@ -23,19 +24,19 @@ public class BaseRecordInfo implements Serializable { this.id = id; } - public Set getPaths() { + public List getPaths() { return this.paths; } - public void setPaths(final Set paths) { + public void setPaths(final List paths) { this.paths = paths; } - public Set getTypes() { + public List getTypes() { return this.types; } - public void setTypes(final Set types) { + public void setTypes(final List types) { this.types = types; } diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIteratorTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIteratorTest.java index d8893e8db..57f01445e 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIteratorTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIteratorTest.java @@ -3,14 +3,22 @@ package eu.dnetlib.dhp.collection.plugin.base; import static org.junit.jupiter.api.Assertions.assertEquals; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; import org.dom4j.Attribute; import org.dom4j.Document; import org.dom4j.DocumentHelper; @@ -99,4 +107,28 @@ public class BaseCollectorIteratorTest { assertEquals(30000, count); } + @Test + public void testParquet() throws Exception { + + final String xml = IOUtils.toString(getClass().getResourceAsStream("record.xml")); + + final SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate(); + + final List ls = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + ls.add(BaseAnalyzerJob.extractInfo(xml)); + } + + final JavaRDD rdd = JavaSparkContext + .fromSparkContext(spark.sparkContext()) + .parallelize(ls); + + final Dataset df = spark + .createDataset(rdd.rdd(), Encoders.bean(BaseRecordInfo.class)); + + df.printSchema(); + + df.show(false); + } } diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/plugin/base/record.xml b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/plugin/base/record.xml new file mode 100644 index 000000000..8f0f505fd --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/plugin/base/record.xml @@ -0,0 +1,31 @@ + +
+ ftterritoryanzac:oai:www.territorystories.nt.gov.au:10070/74188 + 2015-09-07T22:26:28Z +
+ + + ftterritoryanzac:oai:www.territorystories.nt.gov.au:10070/74188 + cau + au + ftterritoryanzac + Territory Stories (Northern Territory Government, Australia) + W. Wardle + Wardle, W. + "NX 112520 Bdr. W. Wardle Darwin Coast Art. 1943-45" ; NX 112520. Bombardier W. Wardle. Darwin Coast Artillery. 1943-1945. + 2007-12-18T01:19:36Z + 2007 + Image + Reference + 51 + Cropped 408w X 338h Size 26.62k + http://hdl.handle.net/10070/74188 + http://hdl.handle.net/10070/74188 + Darwin Commemorative Wall Quilt + http://www.ww2roll.gov.au/script/veteran.asp?ServiceID=A&VeteranID=220307 + http://hdl.handle.net/10070/74188 + 1 + unknown + + +
\ No newline at end of file