|
|
|
@ -65,11 +65,19 @@ public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication
|
|
|
|
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
|
|
|
|
|
|
|
|
|
final String mdstoreManagerUrl = parser.get("mdstoreManagerUrl");
|
|
|
|
|
log.info("mdstoreManagerUrl: {}", mdstoreManagerUrl);
|
|
|
|
|
|
|
|
|
|
final String mdFormat = parser.get("mdFormat");
|
|
|
|
|
log.info("mdFormat: {}", mdFormat);
|
|
|
|
|
|
|
|
|
|
final String mdLayout = parser.get("mdLayout");
|
|
|
|
|
log.info("mdLayout: {}", mdLayout);
|
|
|
|
|
|
|
|
|
|
final String mdInterpretation = parser.get("mdInterpretation");
|
|
|
|
|
log.info("mdInterpretation: {}", mdInterpretation);
|
|
|
|
|
|
|
|
|
|
final String hdfsPath = parser.get("hdfsPath");
|
|
|
|
|
log.info("hdfsPath: {}", hdfsPath);
|
|
|
|
|
|
|
|
|
|
final Set<String> paths = mdstorePaths(mdstoreManagerUrl, mdFormat, mdLayout, mdInterpretation);
|
|
|
|
|
|
|
|
|
@ -95,6 +103,8 @@ public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication
|
|
|
|
|
.filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration()))
|
|
|
|
|
.toArray(size -> new String[size]);
|
|
|
|
|
|
|
|
|
|
log.info("Processing existing paths {}", Arrays.asList(validPaths));
|
|
|
|
|
|
|
|
|
|
if (validPaths.length > 0) {
|
|
|
|
|
spark
|
|
|
|
|
.read()
|
|
|
|
|