removed invalid deletion

This commit is contained in:
Michele Artini 2024-02-14 11:59:30 +01:00
parent ddd6a7ceb3
commit 2e11197142
1 changed files with 18 additions and 25 deletions

View File

@ -36,7 +36,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.ror.GenerateRorActionSetJob;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
public class BaseAnalyzerJob {
@ -48,18 +47,17 @@ public class BaseAnalyzerJob {
public static void main(final String[] args) throws Exception {
final String jsonConfiguration = IOUtils
.toString(
GenerateRorActionSetJob.class
.getResourceAsStream("/eu/dnetlib/dhp/collection/plugin/base/action_set_parameters.json"));
.toString(GenerateRorActionSetJob.class
.getResourceAsStream("/eu/dnetlib/dhp/collection/plugin/base/action_set_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
@ -75,13 +73,11 @@ public class BaseAnalyzerJob {
}
private static void processBaseRecords(final SparkSession spark,
final String inputPath,
final String outputPath) throws IOException {
HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration());
final String inputPath,
final String outputPath) throws IOException {
try (final FileSystem fs = FileSystem.get(new Configuration());
final AggregatorReport report = new AggregatorReport()) {
final AggregatorReport report = new AggregatorReport()) {
final Map<String, AtomicLong> fields = new HashMap<>();
final Map<String, AtomicLong> types = new HashMap<>();
final Map<String, AtomicLong> collections = new HashMap<>();
@ -99,12 +95,12 @@ public class BaseAnalyzerJob {
}
private static void analyze(final FileSystem fs,
final String inputPath,
final Map<String, AtomicLong> fields,
final Map<String, AtomicLong> types,
final Map<String, AtomicLong> collections,
final Map<String, AtomicLong> totals,
final AggregatorReport report) throws JsonProcessingException, IOException {
final String inputPath,
final Map<String, AtomicLong> fields,
final Map<String, AtomicLong> types,
final Map<String, AtomicLong> collections,
final Map<String, AtomicLong> totals,
final AggregatorReport report) throws JsonProcessingException, IOException {
final AtomicLong recordsCounter = new AtomicLong(0);
@ -161,14 +157,11 @@ public class BaseAnalyzerJob {
}
private static void saveReport(final FileSystem fs, final String outputPath, final Map<String, AtomicLong> fields)
throws JsonProcessingException, IOException {
throws JsonProcessingException, IOException {
try (final SequenceFile.Writer writer = SequenceFile
.createWriter(
fs.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer
.keyClass(IntWritable.class),
SequenceFile.Writer
.valueClass(Text.class),
SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) {
.createWriter(fs.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer
.keyClass(IntWritable.class), SequenceFile.Writer
.valueClass(Text.class), SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) {
final Text key = new Text();
final Text value = new Text();