fixed a problem with Dataset model

This commit is contained in:
Michele Artini 2024-02-16 11:36:46 +01:00
parent 8ffdd9747d
commit d2b7541583
4 changed files with 126 additions and 46 deletions

View File

@ -5,8 +5,10 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
@ -43,17 +45,18 @@ public class BaseAnalyzerJob {
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
final String jsonConfiguration = IOUtils final String jsonConfiguration = IOUtils
.toString(BaseAnalyzerJob.class .toString(
.getResourceAsStream("/eu/dnetlib/dhp/collection/plugin/base/action_set_parameters.json")); BaseAnalyzerJob.class
.getResourceAsStream("/eu/dnetlib/dhp/collection/plugin/base/action_set_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args); parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional final Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged")) .ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf) .map(Boolean::valueOf)
.orElse(Boolean.TRUE); .orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged); log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
@ -71,24 +74,24 @@ public class BaseAnalyzerJob {
final SparkConf conf = new SparkConf(); final SparkConf conf = new SparkConf();
runWithSparkSession(conf, isSparkSessionManaged, spark -> processBaseRecords(spark, inputPath, dataPath, outputPath, reimport)); runWithSparkSession(
conf, isSparkSessionManaged, spark -> processBaseRecords(spark, inputPath, dataPath, outputPath, reimport));
} }
private static void processBaseRecords(final SparkSession spark, private static void processBaseRecords(final SparkSession spark,
final String inputPath, final String inputPath,
final String dataPath, final String dataPath,
final String outputPath, final String outputPath,
final boolean reimport) throws IOException { final boolean reimport) throws IOException {
try (final FileSystem fs = FileSystem.get(new Configuration()); try (final FileSystem fs = FileSystem.get(new Configuration());
final AggregatorReport report = new AggregatorReport()) { final AggregatorReport report = new AggregatorReport()) {
if (reimport) { if (reimport) {
fs.delete(new Path(dataPath), true);
loadRecords(fs, inputPath, dataPath, report); loadRecords(fs, inputPath, dataPath, report);
} }
fs.delete(new Path(outputPath), true); // fs.delete(new Path(outputPath), true);
extractInfo(spark, dataPath, outputPath); extractInfo(spark, dataPath, outputPath);
} catch (final Throwable e) { } catch (final Throwable e) {
throw new RuntimeException(e); throw new RuntimeException(e);
@ -96,10 +99,10 @@ public class BaseAnalyzerJob {
} }
private static void loadRecords(final FileSystem fs, private static void loadRecords(final FileSystem fs,
final String inputPath, final String inputPath,
final String outputPath, final String outputPath,
final AggregatorReport report) final AggregatorReport report)
throws Exception { throws Exception {
final AtomicLong recordsCounter = new AtomicLong(0); final AtomicLong recordsCounter = new AtomicLong(0);
@ -107,9 +110,12 @@ public class BaseAnalyzerJob {
final Text value = new Text(); final Text value = new Text();
try (final SequenceFile.Writer writer = SequenceFile try (final SequenceFile.Writer writer = SequenceFile
.createWriter(fs.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer .createWriter(
.keyClass(LongWritable.class), SequenceFile.Writer fs.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer
.valueClass(Text.class), SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) { .keyClass(LongWritable.class),
SequenceFile.Writer
.valueClass(Text.class),
SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) {
final BaseCollectorIterator iteraror = new BaseCollectorIterator(fs, new Path(inputPath), report); final BaseCollectorIterator iteraror = new BaseCollectorIterator(fs, new Path(inputPath), report);
@ -135,31 +141,35 @@ public class BaseAnalyzerJob {
} }
private static void extractInfo(final SparkSession spark, private static void extractInfo(final SparkSession spark,
final String inputPath, final String inputPath,
final String targetPath) throws Exception { final String targetPath) throws Exception {
final JavaRDD<BaseRecordInfo> rdd = JavaSparkContext.fromSparkContext(spark.sparkContext()) final JavaRDD<BaseRecordInfo> rdd = JavaSparkContext
.sequenceFile(inputPath, LongWritable.class, Text.class) .fromSparkContext(spark.sparkContext())
.map(s -> s._2) .sequenceFile(inputPath, LongWritable.class, Text.class)
.map(BaseAnalyzerJob::extractInfo); .map(s -> s._2.toString())
.map(BaseAnalyzerJob::extractInfo);
spark.createDataset(rdd.rdd(), Encoders.bean(BaseRecordInfo.class)) spark
.write() .createDataset(rdd.rdd(), Encoders.bean(BaseRecordInfo.class))
.mode(SaveMode.Overwrite) .write()
.format("parquet") .mode(SaveMode.Overwrite)
.save(targetPath); .format("parquet")
.save(targetPath);
} }
private static BaseRecordInfo extractInfo(final Text s) { protected static BaseRecordInfo extractInfo(final String s) {
try { try {
final Document record = DocumentHelper.parseText(s.toString()); final Document record = DocumentHelper.parseText(s);
final BaseRecordInfo info = new BaseRecordInfo(); final BaseRecordInfo info = new BaseRecordInfo();
info.setId(record.valueOf("//*[local-name() = 'header']/*[local-name() = 'identifier']").trim()); final Set<String> paths = new LinkedHashSet<>();
final Set<String> types = new LinkedHashSet<>();
final Map<String, Map<String, String>> colls = new HashMap<>();
for (final Object o : record.selectNodes("//*|//@*")) { for (final Object o : record.selectNodes("//*|//@*")) {
info.getPaths().add(((Node) o).getPath()); paths.add(((Node) o).getPath());
if (o instanceof Element) { if (o instanceof Element) {
final Element n = (Element) o; final Element n = (Element) o;
@ -173,15 +183,21 @@ public class BaseAnalyzerJob {
for (final Object ao : n.attributes()) { for (final Object ao : n.attributes()) {
attrs.put(((Attribute) ao).getName(), ((Attribute) ao).getValue()); attrs.put(((Attribute) ao).getName(), ((Attribute) ao).getValue());
} }
info.getCollections().put(collName, attrs); colls.put(collName, attrs);
} }
} else if ("type".equals(nodeName)) { } else if ("type".equals(nodeName)) {
info.getTypes().add("TYPE: " + n.getText().trim()); types.add("TYPE: " + n.getText().trim());
} else if ("typenorm".equals(nodeName)) { } else if ("typenorm".equals(nodeName)) {
info.getTypes().add("TYPE_NORM: " + n.getText().trim()); types.add("TYPE_NORM: " + n.getText().trim());
} }
} }
} }
info.setId(record.valueOf("//*[local-name() = 'header']/*[local-name() = 'identifier']").trim());
info.getTypes().addAll(types);
info.getPaths().addAll(paths);
info.setCollections(colls);
return info; return info;
} catch (final DocumentException e) { } catch (final DocumentException e) {
throw new RuntimeException(e); throw new RuntimeException(e);

View File

@ -1,10 +1,11 @@
package eu.dnetlib.dhp.collection.plugin.base; package eu.dnetlib.dhp.collection.plugin.base;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedHashSet; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
public class BaseRecordInfo implements Serializable { public class BaseRecordInfo implements Serializable {
@ -12,8 +13,8 @@ public class BaseRecordInfo implements Serializable {
private String id; private String id;
private Map<String, Map<String, String>> collections = new HashMap<>(); private Map<String, Map<String, String>> collections = new HashMap<>();
private Set<String> paths = new LinkedHashSet<>(); private List<String> paths = new ArrayList<>();
private Set<String> types = new LinkedHashSet<>(); private List<String> types = new ArrayList<>();
public String getId() { public String getId() {
return this.id; return this.id;
@ -23,19 +24,19 @@ public class BaseRecordInfo implements Serializable {
this.id = id; this.id = id;
} }
public Set<String> getPaths() { public List<String> getPaths() {
return this.paths; return this.paths;
} }
public void setPaths(final Set<String> paths) { public void setPaths(final List<String> paths) {
this.paths = paths; this.paths = paths;
} }
public Set<String> getTypes() { public List<String> getTypes() {
return this.types; return this.types;
} }
public void setTypes(final Set<String> types) { public void setTypes(final List<String> types) {
this.types = types; this.types = types;
} }

View File

@ -3,14 +3,22 @@ package eu.dnetlib.dhp.collection.plugin.base;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.dom4j.Attribute; import org.dom4j.Attribute;
import org.dom4j.Document; import org.dom4j.Document;
import org.dom4j.DocumentHelper; import org.dom4j.DocumentHelper;
@ -99,4 +107,28 @@ public class BaseCollectorIteratorTest {
assertEquals(30000, count); assertEquals(30000, count);
} }
@Test
public void testParquet() throws Exception {
final String xml = IOUtils.toString(getClass().getResourceAsStream("record.xml"));
final SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate();
final List<BaseRecordInfo> ls = new ArrayList<>();
for (int i = 0; i < 10; i++) {
ls.add(BaseAnalyzerJob.extractInfo(xml));
}
final JavaRDD<BaseRecordInfo> rdd = JavaSparkContext
.fromSparkContext(spark.sparkContext())
.parallelize(ls);
final Dataset<BaseRecordInfo> df = spark
.createDataset(rdd.rdd(), Encoders.bean(BaseRecordInfo.class));
df.printSchema();
df.show(false);
}
} }

View File

@ -0,0 +1,31 @@
<record>
<header xmlns="http://www.openarchives.org/OAI/2.0/">
<identifier>ftterritoryanzac:oai:www.territorystories.nt.gov.au:10070/74188</identifier>
<datestamp>2015-09-07T22:26:28Z</datestamp>
</header>
<metadata xmlns="http://www.openarchives.org/OAI/2.0/" xmlns:base_dc="http://oai.base-search.net/base_dc/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dc="http://purl.org/dc/elements/1.1/">
<base_dc:dc xsi:schemaLocation="http://oai.base-search.net/base_dc/ http://oai.base-search.net/base_dc/base_dc.xsd">
<base_dc:global_id>ftterritoryanzac:oai:www.territorystories.nt.gov.au:10070/74188</base_dc:global_id>
<base_dc:continent>cau</base_dc:continent>
<base_dc:country>au</base_dc:country>
<base_dc:collection opendoar_id="1234">ftterritoryanzac</base_dc:collection>
<base_dc:collname>Territory Stories (Northern Territory Government, Australia)</base_dc:collname>
<dc:title>W. Wardle</dc:title>
<dc:creator>Wardle, W.</dc:creator>
<dc:description>"NX 112520 Bdr. W. Wardle Darwin Coast Art. 1943-45" ; NX 112520. Bombardier W. Wardle. Darwin Coast Artillery. 1943-1945.</dc:description>
<dc:date>2007-12-18T01:19:36Z</dc:date>
<base_dc:year>2007</base_dc:year>
<dc:type>Image</dc:type>
<dc:type>Reference</dc:type>
<base_dc:typenorm>51</base_dc:typenorm>
<dc:format>Cropped 408w X 338h Size 26.62k</dc:format>
<dc:identifier>http://hdl.handle.net/10070/74188</dc:identifier>
<base_dc:link>http://hdl.handle.net/10070/74188</base_dc:link>
<dc:relation>Darwin Commemorative Wall Quilt</dc:relation>
<dc:relation>http://www.ww2roll.gov.au/script/veteran.asp?ServiceID=A&amp;VeteranID=220307</dc:relation>
<dc:relation>http://hdl.handle.net/10070/74188</dc:relation>
<base_dc:oa>1</base_dc:oa>
<base_dc:lang>unknown</base_dc:lang>
</base_dc:dc>
</metadata>
</record>