forked from D-Net/dnet-hadoop
Merge branch 'stable_ids' of https://code-repo.d4science.org/D-Net/dnet-hadoop into stable_ids
This commit is contained in:
commit
4da141bd7c
|
@ -52,9 +52,8 @@ public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication
|
||||||
public static void main(final String[] args) throws Exception {
|
public static void main(final String[] args) throws Exception {
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
IOUtils
|
IOUtils
|
||||||
.toString(
|
.toString(MigrateHdfsMdstoresApplication.class
|
||||||
MigrateHdfsMdstoresApplication.class
|
.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/migrate_hdfs_mstores_parameters.json")));
|
||||||
.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/migrate_hdfs_mstores_parameters.json")));
|
|
||||||
parser.parseArgument(args);
|
parser.parseArgument(args);
|
||||||
|
|
||||||
final Boolean isSparkSessionManaged = Optional
|
final Boolean isSparkSessionManaged = Optional
|
||||||
|
@ -94,14 +93,21 @@ public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication
|
||||||
.filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration()))
|
.filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration()))
|
||||||
.toArray(size -> new String[size]);
|
.toArray(size -> new String[size]);
|
||||||
|
|
||||||
spark
|
if (validPaths.length > 0) {
|
||||||
.read()
|
spark
|
||||||
.parquet(validPaths)
|
.read()
|
||||||
.map((MapFunction<Row, String>) r -> enrichRecord(r), Encoders.STRING())
|
.parquet(validPaths)
|
||||||
.toJavaRDD()
|
.map((MapFunction<Row, String>) r -> enrichRecord(r), Encoders.STRING())
|
||||||
.mapToPair(xml -> new Tuple2<>(new Text(UUID.randomUUID() + ":" + type), new Text(xml)))
|
.toJavaRDD()
|
||||||
// .coalesce(1)
|
.mapToPair(xml -> new Tuple2<>(new Text(UUID.randomUUID() + ":" + type), new Text(xml)))
|
||||||
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
|
// .coalesce(1)
|
||||||
|
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
|
||||||
|
} else {
|
||||||
|
spark.emptyDataFrame()
|
||||||
|
.toJavaRDD()
|
||||||
|
.mapToPair(xml -> new Tuple2<>(new Text(), new Text()))
|
||||||
|
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String enrichRecord(final Row r) {
|
private static String enrichRecord(final Row r) {
|
||||||
|
|
Loading…
Reference in New Issue