method to fix inconsistencies
This commit is contained in:
parent
3099a06e47
commit
a1006af44a
|
@ -3,6 +3,7 @@ package eu.dnetlib.data.mdstore.manager.controller;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -14,6 +15,8 @@ import org.springframework.web.bind.annotation.RequestMapping;
|
||||||
import org.springframework.web.bind.annotation.RequestParam;
|
import org.springframework.web.bind.annotation.RequestParam;
|
||||||
import org.springframework.web.bind.annotation.RestController;
|
import org.springframework.web.bind.annotation.RestController;
|
||||||
|
|
||||||
|
import com.google.common.collect.Sets;
|
||||||
|
|
||||||
import eu.dnetlib.common.controller.AbstractDnetController;
|
import eu.dnetlib.common.controller.AbstractDnetController;
|
||||||
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
|
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
|
||||||
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreWithInfo;
|
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreWithInfo;
|
||||||
|
@ -100,7 +103,11 @@ public class MDStoreController extends AbstractDnetController {
|
||||||
@GetMapping("/version/{versionId}/commit/{size}")
|
@GetMapping("/version/{versionId}/commit/{size}")
|
||||||
public MDStoreVersion commitVersion(@ApiParam("the id of the version that will be promoted to the current version") @PathVariable final String versionId,
|
public MDStoreVersion commitVersion(@ApiParam("the id of the version that will be promoted to the current version") @PathVariable final String versionId,
|
||||||
@ApiParam("the size of the new current mdstore") @PathVariable final long size) throws MDStoreManagerException {
|
@ApiParam("the size of the new current mdstore") @PathVariable final long size) throws MDStoreManagerException {
|
||||||
return databaseUtils.commitMdStoreVersion(versionId, size);
|
try {
|
||||||
|
return databaseUtils.commitMdStoreVersion(versionId, size);
|
||||||
|
} finally {
|
||||||
|
deleteExpiredVersions();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ApiOperation("Abort a preliminary version")
|
@ApiOperation("Abort a preliminary version")
|
||||||
|
@ -159,6 +166,25 @@ public class MDStoreController extends AbstractDnetController {
|
||||||
return StatusResponse.DELETING;
|
return StatusResponse.DELETING;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ApiOperation("Fix the inconsistencies on HDFS")
|
||||||
|
@GetMapping("/hdfs/inconsistencies")
|
||||||
|
public Set<String> fixHdfsInconsistencies(
|
||||||
|
@ApiParam("force the deletion of hdfs paths") @RequestParam(required = false, defaultValue = "false") final boolean delete)
|
||||||
|
throws MDStoreManagerException {
|
||||||
|
|
||||||
|
final Set<String> hdfsDirs = hdfsClient.listHadoopDirs();
|
||||||
|
final Set<String> validDirs = databaseUtils.listValidHdfsPaths();
|
||||||
|
|
||||||
|
final Set<String> toDelete = Sets.difference(hdfsDirs, validDirs);
|
||||||
|
|
||||||
|
if (delete) {
|
||||||
|
for (final String p : toDelete) {
|
||||||
|
hdfsClient.deletePath(p);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return toDelete;
|
||||||
|
}
|
||||||
|
|
||||||
@ApiOperation("Show informations")
|
@ApiOperation("Show informations")
|
||||||
@GetMapping("/info")
|
@GetMapping("/info")
|
||||||
public Map<String, Object> info() {
|
public Map<String, Object> info() {
|
||||||
|
|
|
@ -1,8 +1,10 @@
|
||||||
package eu.dnetlib.data.mdstore.manager.utils;
|
package eu.dnetlib.data.mdstore.manager.utils;
|
||||||
|
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import javax.transaction.Transactional;
|
import javax.transaction.Transactional;
|
||||||
|
@ -158,6 +160,7 @@ public class DatabaseUtils {
|
||||||
v.setSize(size);
|
v.setSize(size);
|
||||||
v.setLastUpdate(new Date());
|
v.setLastUpdate(new Date());
|
||||||
mdstoreVersionRepository.save(v);
|
mdstoreVersionRepository.save(v);
|
||||||
|
|
||||||
return v;
|
return v;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -181,6 +184,11 @@ public class DatabaseUtils {
|
||||||
return v.getHdfsPath();
|
return v.getHdfsPath();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Set<String> listValidHdfsPaths() {
|
||||||
|
return new HashSet<>(jdbcTemplate
|
||||||
|
.queryForList(" select hdfs_path from mdstores union all select hdfs_path from mdstore_versions", String.class));
|
||||||
|
}
|
||||||
|
|
||||||
public String getHdfsBasePath() {
|
public String getHdfsBasePath() {
|
||||||
return hdfsBasePath;
|
return hdfsBasePath;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,10 +1,13 @@
|
||||||
package eu.dnetlib.data.mdstore.manager.utils;
|
package eu.dnetlib.data.mdstore.manager.utils;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.LinkedHashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
@ -21,9 +24,48 @@ public class HdfsClient {
|
||||||
@Value("${dhp.mdstore-manager.hadoop.user}")
|
@Value("${dhp.mdstore-manager.hadoop.user}")
|
||||||
private String hadoopUser;
|
private String hadoopUser;
|
||||||
|
|
||||||
|
@Value("${dhp.mdstore-manager.hdfs.base-path}")
|
||||||
|
private String hdfsBasePath;
|
||||||
|
|
||||||
private static final Log log = LogFactory.getLog(HdfsClient.class);
|
private static final Log log = LogFactory.getLog(HdfsClient.class);
|
||||||
|
|
||||||
public void deletePath(final String path) throws MDStoreManagerException {
|
public void deletePath(final String path) throws MDStoreManagerException {
|
||||||
|
|
||||||
|
try (final FileSystem fs = FileSystem.get(conf())) {
|
||||||
|
fs.delete(new Path(path), true);
|
||||||
|
log.info("HDFS Path deleted: " + path);
|
||||||
|
} catch (IllegalArgumentException | IOException e) {
|
||||||
|
log.error("Eror deleting path: " + path, e);
|
||||||
|
throw new MDStoreManagerException("Eror deleting path: " + path, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Set<String> listHadoopDirs() {
|
||||||
|
final Set<String> res = new LinkedHashSet<>();
|
||||||
|
|
||||||
|
try (final FileSystem fs = FileSystem.get(conf())) {
|
||||||
|
for (final FileStatus mdDir : fs.listStatus(new Path(hdfsBasePath))) {
|
||||||
|
if (isValidDir(mdDir)) {
|
||||||
|
res.add(String.format("%s/%s", hdfsBasePath, mdDir.getPath().getName()));
|
||||||
|
for (final FileStatus verDir : fs.listStatus(mdDir.getPath())) {
|
||||||
|
if (isValidDir(verDir)) {
|
||||||
|
res.add(String.format("%s/%s/%s", hdfsBasePath, mdDir.getPath().getName(), verDir.getPath().getName()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (final Exception e) {
|
||||||
|
log.error("Error Listing path: " + hdfsBasePath, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isValidDir(final FileStatus fileStatus) {
|
||||||
|
return fileStatus.isDirectory() && fileStatus.getPath().getName().startsWith("md-");
|
||||||
|
}
|
||||||
|
|
||||||
|
private Configuration conf() throws MDStoreManagerException {
|
||||||
final Configuration conf = new Configuration();
|
final Configuration conf = new Configuration();
|
||||||
|
|
||||||
System.setProperty("HADOOP_USER_NAME", hadoopUser);
|
System.setProperty("HADOOP_USER_NAME", hadoopUser);
|
||||||
|
@ -38,14 +80,7 @@ public class HdfsClient {
|
||||||
log.error("Invalid Haddop Cluster: " + hadoopCluster);
|
log.error("Invalid Haddop Cluster: " + hadoopCluster);
|
||||||
throw new MDStoreManagerException("Invalid Haddop Cluster: " + hadoopCluster);
|
throw new MDStoreManagerException("Invalid Haddop Cluster: " + hadoopCluster);
|
||||||
}
|
}
|
||||||
|
return conf;
|
||||||
try (final FileSystem fs = FileSystem.get(conf)) {
|
|
||||||
fs.delete(new Path(path), true);
|
|
||||||
log.info("HDFS Path deleted: " + path);
|
|
||||||
} catch (IllegalArgumentException | IOException e) {
|
|
||||||
log.error("Eror deleting path: " + path, e);
|
|
||||||
throw new MDStoreManagerException("Eror deleting path: " + path, e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getHadoopCluster() {
|
public String getHadoopCluster() {
|
||||||
|
|
|
@ -23,5 +23,5 @@ logging.level.io.swagger.models.parameters.AbstractSerializableParameter = error
|
||||||
|
|
||||||
# Hadoop
|
# Hadoop
|
||||||
dhp.mdstore-manager.hadoop.cluster = GARR
|
dhp.mdstore-manager.hadoop.cluster = GARR
|
||||||
dhp.mdstore-manager.hdfs.base-path = /tmp/mdstoremanager_dev
|
dhp.mdstore-manager.hdfs.base-path = /data/dnet.dev/mdstore
|
||||||
dhp.mdstore-manager.hadoop.user = dnet.dev
|
dhp.mdstore-manager.hadoop.user = dnet.dev
|
||||||
|
|
Loading…
Reference in New Issue