118 lines
4.5 KiB
Java
118 lines
4.5 KiB
Java
package com.sandro.app.fs;
|
|
import com.fasterxml.jackson.core.type.TypeReference;
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
import com.sandro.app.SparkUtility;
|
|
import org.apache.commons.io.IOUtils;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.fs.*;
|
|
import org.apache.spark.SparkConf;
|
|
import org.apache.spark.sql.SparkSession;
|
|
import scala.collection.mutable.Map;
|
|
|
|
import java.io.IOException;
|
|
import java.util.HashMap;
|
|
import java.util.List;
|
|
import java.util.Objects;
|
|
|
|
public class FsChecks {
|
|
|
|
|
|
public static Configuration getHadoopConfiguration(String nameNode) {
|
|
// ====== Init HDFS File System Object
|
|
Configuration conf = new Configuration();
|
|
// Set FileSystem URI
|
|
conf.set("fs.defaultFS", nameNode);
|
|
// Because of Maven
|
|
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
|
|
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
|
|
|
|
System.setProperty("hadoop.home.dir", "/");
|
|
return conf;
|
|
}
|
|
|
|
private static MDStoreInfo extractPath(final String path, final String basePath) {
|
|
|
|
int res = path.indexOf(basePath);
|
|
if (res >0){
|
|
String[] split = path.substring(res).split("/");
|
|
if (split.length > 2) {
|
|
final String ts = split[split.length -1];
|
|
final String mdStore = split[split.length -2];
|
|
return new MDStoreInfo(mdStore, null, Long.parseLong(ts));
|
|
}
|
|
}
|
|
return null;
|
|
|
|
|
|
}
|
|
|
|
public static void main(String[] args) throws IOException {
|
|
|
|
Map<String, String> parsedArgs = SparkUtility.parseArguments(args);
|
|
|
|
final String namenode = parsedArgs.get("namenode").getOrElse(null);
|
|
final String master = parsedArgs.getOrElse("master", null);
|
|
|
|
final SparkConf conf = new SparkConf();
|
|
final SparkSession spark =SparkSession
|
|
.builder()
|
|
.config(conf)
|
|
.master(master)
|
|
.appName(FsChecks.class.getSimpleName())
|
|
.getOrCreate();
|
|
|
|
|
|
spark.sparkContext().setLogLevel("WARN");
|
|
|
|
final FileSystem fileSystem = FileSystem.get(getHadoopConfiguration(namenode));
|
|
|
|
|
|
final String stores =IOUtils.toString(Objects.requireNonNull(FsChecks.class.getResourceAsStream("/mdstore_info.json")));
|
|
|
|
|
|
final ObjectMapper mapper = new ObjectMapper();
|
|
final List<MDStoreInfo> storesINfo =mapper.readValue(stores, new TypeReference<List<MDStoreInfo>>(){});
|
|
|
|
|
|
final String basePath ="/user/sandro.labruzzo/stores/";
|
|
Path p = new Path(basePath);
|
|
final java.util.Map<String, MDStoreInfo> hdfs_store= new HashMap<>();
|
|
final RemoteIterator<LocatedFileStatus> ls = fileSystem.listFiles(p, true);
|
|
while (ls.hasNext()){
|
|
String current =ls.next().getPath().toString();
|
|
final MDStoreInfo info = extractPath(current, basePath);
|
|
if (info!= null) {
|
|
hdfs_store.put(info.getMdstore(), info);
|
|
}
|
|
}
|
|
|
|
storesINfo.stream().filter(s ->s.getLatestTimestamp() != null).forEach( s ->{
|
|
if (!hdfs_store.containsKey(s.getMdstore())) {
|
|
System.out.printf("Adding mdstore %s\n",s.getMdstore());
|
|
try {
|
|
fileSystem.mkdirs(new Path(basePath+s.getMdstore()));
|
|
fileSystem.create(new Path(basePath+s.getMdstore()+"/"+s.getLatestTimestamp()), true);
|
|
System.out.printf("Added path %s/%s/%d\n",basePath, s.getMdstore(),s.getLatestTimestamp());
|
|
} catch (IOException e) {
|
|
throw new RuntimeException(e);
|
|
}
|
|
}
|
|
else {
|
|
final MDStoreInfo current = hdfs_store.get(s.getMdstore());
|
|
if (s.getLatestTimestamp() > current.getLatestTimestamp()) {
|
|
System.out.println("Updating MDStore "+s.getMdstore());
|
|
final String rmPath = String.format("%s%s/%d", basePath, current.getMdstore(), current.getLatestTimestamp());
|
|
try {
|
|
System.out.println("deleting "+rmPath);
|
|
fileSystem.create(new Path(basePath+s.getMdstore()+"/"+s.getLatestTimestamp()), true);
|
|
|
|
fileSystem.delete(new Path(rmPath), true);
|
|
} catch (IOException e) {
|
|
throw new RuntimeException("Unable to remove path "+rmPath, e);
|
|
}
|
|
}
|
|
}
|
|
});
|
|
}
|
|
}
|