fixed a problem with empty mdstore list

This commit is contained in:
Michele Artini 2021-06-14 11:57:00 +02:00
parent cf360d7c97
commit 83132ee99a
1 changed files with 16 additions and 11 deletions

View File

@ -14,6 +14,8 @@ import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.mapred.SequenceFileOutputFormat;
@ -52,9 +54,8 @@ public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils IOUtils
.toString( .toString(MigrateHdfsMdstoresApplication.class
MigrateHdfsMdstoresApplication.class .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/migrate_hdfs_mstores_parameters.json")));
.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/migrate_hdfs_mstores_parameters.json")));
parser.parseArgument(args); parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional final Boolean isSparkSessionManaged = Optional
@ -94,14 +95,18 @@ public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication
.filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration())) .filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration()))
.toArray(size -> new String[size]); .toArray(size -> new String[size]);
spark if (validPaths.length > 0) {
.read() spark
.parquet(validPaths) .read()
.map((MapFunction<Row, String>) r -> enrichRecord(r), Encoders.STRING()) .parquet(validPaths)
.toJavaRDD() .map((MapFunction<Row, String>) r -> enrichRecord(r), Encoders.STRING())
.mapToPair(xml -> new Tuple2<>(new Text(UUID.randomUUID() + ":" + type), new Text(xml))) .toJavaRDD()
// .coalesce(1) .mapToPair(xml -> new Tuple2<>(new Text(UUID.randomUUID() + ":" + type), new Text(xml)))
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); // .coalesce(1)
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
} else {
FileSystem.get(sc.hadoopConfiguration()).createNewFile(new Path(outputPath));
}
} }
private static String enrichRecord(final Row r) { private static String enrichRecord(final Row r) {