235 lines
9.6 KiB
Java
235 lines
9.6 KiB
Java
package eu.dnetlib.data.mdstore.manager.controller;
|
|
|
|
import java.util.LinkedHashMap;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Set;
|
|
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.web.bind.annotation.DeleteMapping;
|
|
import org.springframework.web.bind.annotation.GetMapping;
|
|
import org.springframework.web.bind.annotation.PathVariable;
|
|
import org.springframework.web.bind.annotation.RequestMapping;
|
|
import org.springframework.web.bind.annotation.RequestParam;
|
|
import org.springframework.web.bind.annotation.RestController;
|
|
|
|
import com.google.common.collect.Sets;
|
|
|
|
import eu.dnetlib.common.controller.AbstractDnetController;
|
|
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
|
|
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreWithInfo;
|
|
import eu.dnetlib.data.mdstore.manager.exceptions.MDStoreManagerException;
|
|
import eu.dnetlib.data.mdstore.manager.utils.DatabaseUtils;
|
|
import eu.dnetlib.data.mdstore.manager.utils.HdfsClient;
|
|
import io.swagger.annotations.Api;
|
|
import io.swagger.annotations.ApiOperation;
|
|
import io.swagger.annotations.ApiParam;
|
|
|
|
@RestController
|
|
@RequestMapping("/mdstores")
|
|
@Api(tags = {
|
|
"Metadata Stores"
|
|
})
|
|
public class MDStoreController extends AbstractDnetController {
|
|
|
|
@Autowired
|
|
private DatabaseUtils databaseUtils;
|
|
|
|
@Autowired
|
|
private HdfsClient hdfsClient;
|
|
|
|
private static final Logger log = LoggerFactory.getLogger(DatabaseUtils.class);
|
|
|
|
@ApiOperation("Return all the mdstores")
|
|
@GetMapping("/")
|
|
public Iterable<MDStoreWithInfo> find() {
|
|
return databaseUtils.listMdStores();
|
|
}
|
|
|
|
@ApiOperation("Return all the mdstore identifiers")
|
|
@GetMapping("/ids")
|
|
public List<String> findIdentifiers() {
|
|
return databaseUtils.listMdStoreIDs();
|
|
}
|
|
|
|
@ApiOperation("Return a mdstores by id")
|
|
@GetMapping("/mdstore/{mdId}")
|
|
public MDStoreWithInfo getMdStore(@ApiParam("the mdstore identifier") @PathVariable final String mdId) throws MDStoreManagerException {
|
|
return databaseUtils.findMdStore(mdId);
|
|
}
|
|
|
|
@ApiOperation("Increase the read count of the current mdstore")
|
|
@GetMapping("/mdstore/{mdId}/startReading")
|
|
public MDStoreVersion startReading(@ApiParam("the mdstore identifier") @PathVariable final String mdId) throws MDStoreManagerException {
|
|
return databaseUtils.startReading(mdId);
|
|
}
|
|
|
|
@ApiOperation("Create a new mdstore")
|
|
@GetMapping("/new/{format}/{layout}/{interpretation}")
|
|
public MDStoreWithInfo createMDStore(
|
|
@ApiParam("mdstore format") @PathVariable final String format,
|
|
@ApiParam("mdstore layout") @PathVariable final String layout,
|
|
@ApiParam("mdstore interpretation") @PathVariable final String interpretation,
|
|
@ApiParam("datasource name") @RequestParam(required = false) final String dsName,
|
|
@ApiParam("datasource id") @RequestParam(required = false) final String dsId,
|
|
@ApiParam("api id") @RequestParam(required = false) final String apiId) throws MDStoreManagerException {
|
|
final String id = databaseUtils.createMDStore(format, layout, interpretation, dsName, dsId, apiId);
|
|
return databaseUtils.findMdStore(id);
|
|
}
|
|
|
|
@ApiOperation("Delete a mdstore by id")
|
|
@DeleteMapping("/mdstore/{mdId}")
|
|
public StatusResponse delete(@ApiParam("the id of the mdstore that will be deleted") @PathVariable final String mdId) throws MDStoreManagerException {
|
|
final String hdfsPath = databaseUtils.deleteMdStore(mdId);
|
|
hdfsClient.deletePath(hdfsPath);
|
|
return StatusResponse.DELETED;
|
|
}
|
|
|
|
@ApiOperation("Return all the versions of a mdstore")
|
|
@GetMapping("/mdstore/{mdId}/versions")
|
|
public Iterable<MDStoreVersion> listVersions(@PathVariable final String mdId) throws MDStoreManagerException {
|
|
return databaseUtils.listVersions(mdId);
|
|
}
|
|
|
|
@ApiOperation("Create a new preliminary version of a mdstore")
|
|
@GetMapping("/mdstore/{mdId}/newVersion")
|
|
public MDStoreVersion prepareNewVersion(@ApiParam("the id of the mdstore for which will be created a new version") @PathVariable final String mdId) {
|
|
return databaseUtils.prepareMdStoreVersion(mdId);
|
|
}
|
|
|
|
@ApiOperation("Promote a preliminary version to current")
|
|
@GetMapping("/version/{versionId}/commit/{size}")
|
|
public MDStoreVersion commitVersion(@ApiParam("the id of the version that will be promoted to the current version") @PathVariable final String versionId,
|
|
@ApiParam("the size of the new current mdstore") @PathVariable final long size) throws MDStoreManagerException {
|
|
try {
|
|
return databaseUtils.commitMdStoreVersion(versionId, size);
|
|
} finally {
|
|
deleteExpiredVersions();
|
|
}
|
|
}
|
|
|
|
@ApiOperation("Abort a preliminary version")
|
|
@GetMapping("/version/{versionId}/abort")
|
|
public StatusResponse commitVersion(@ApiParam("the id of the version to abort") @PathVariable final String versionId) throws MDStoreManagerException {
|
|
final String hdfsPath = databaseUtils.deleteMdStoreVersion(versionId, true);
|
|
hdfsClient.deletePath(hdfsPath);
|
|
return StatusResponse.ABORTED;
|
|
}
|
|
|
|
@ApiOperation("Return an existing mdstore version")
|
|
@GetMapping("/version/{versionId}")
|
|
public MDStoreVersion getVersion(@ApiParam("the id of the version that has to be deleted") @PathVariable final String versionId)
|
|
throws MDStoreManagerException {
|
|
return databaseUtils.findVersion(versionId);
|
|
}
|
|
|
|
@ApiOperation("Delete a mdstore version")
|
|
@DeleteMapping("/version/{versionId}")
|
|
public StatusResponse deleteVersion(@ApiParam("the id of the version that has to be deleted") @PathVariable final String versionId,
|
|
@ApiParam("if true, the controls on writing and readcount values will be skipped") @RequestParam(required = false, defaultValue = "false") final boolean force)
|
|
throws MDStoreManagerException {
|
|
final String hdfsPath = databaseUtils.deleteMdStoreVersion(versionId, force);
|
|
hdfsClient.deletePath(hdfsPath);
|
|
return StatusResponse.DELETED;
|
|
}
|
|
|
|
@ApiOperation("Decrease the read count of a mdstore version")
|
|
@GetMapping("/version/{versionId}/endReading")
|
|
public MDStoreVersion endReading(@ApiParam("the id of the version that has been completely read") @PathVariable final String versionId)
|
|
throws MDStoreManagerException {
|
|
return databaseUtils.endReading(versionId);
|
|
}
|
|
|
|
@ApiOperation("Reset the read count of a mdstore version")
|
|
@GetMapping("/version/{versionId}/resetReading")
|
|
public MDStoreVersion resetReading(@ApiParam("the id of the version") @PathVariable final String versionId)
|
|
throws MDStoreManagerException {
|
|
return databaseUtils.resetReading(versionId);
|
|
}
|
|
|
|
@ApiOperation("Delete expired versions")
|
|
@DeleteMapping("/versions/expired")
|
|
public StatusResponse deleteExpiredVersions() {
|
|
new Thread(this::performDeleteOfExpiredVersions).start();
|
|
return StatusResponse.DELETING;
|
|
}
|
|
|
|
private synchronized void performDeleteOfExpiredVersions() {
|
|
log.info("Deleting expired version...");
|
|
for (final String versionId : databaseUtils.listExpiredVersions()) {
|
|
try {
|
|
final String hdfsPath = databaseUtils.deleteMdStoreVersion(versionId, true);
|
|
hdfsClient.deletePath(hdfsPath);
|
|
} catch (final MDStoreManagerException e) {
|
|
log.warn("Error deleteting version " + versionId, e);
|
|
}
|
|
}
|
|
log.info("Done.");
|
|
}
|
|
|
|
@ApiOperation("Fix the inconsistencies on HDFS")
|
|
@GetMapping("/hdfs/inconsistencies")
|
|
public Set<String> fixHdfsInconsistencies(
|
|
@ApiParam("force the deletion of hdfs paths") @RequestParam(required = false, defaultValue = "false") final boolean delete)
|
|
throws MDStoreManagerException {
|
|
|
|
final Set<String> hdfsDirs = hdfsClient.listHadoopDirs();
|
|
final Set<String> validDirs = databaseUtils.listValidHdfsPaths();
|
|
|
|
final Set<String> toDelete = Sets.difference(hdfsDirs, validDirs);
|
|
log.info("Found " + toDelete.size() + " hdfs paths to remove");
|
|
|
|
if (delete) {
|
|
for (final String p : toDelete) {
|
|
hdfsClient.deletePath(p);
|
|
}
|
|
}
|
|
return toDelete;
|
|
}
|
|
|
|
@ApiOperation("Show informations")
|
|
@GetMapping("/info")
|
|
public Map<String, Object> info() {
|
|
final Map<String, Object> info = new LinkedHashMap<>();
|
|
info.put("number_of_mdstores", databaseUtils.countMdStores());
|
|
info.put("hadoop_user", hdfsClient.getHadoopUser());
|
|
info.put("hadoop_cluster", hdfsClient.getHadoopCluster());
|
|
info.put("hdfs_base_path", databaseUtils.getHdfsBasePath());
|
|
info.put("expired_versions", databaseUtils.listExpiredVersions());
|
|
return info;
|
|
}
|
|
|
|
@ApiOperation("list the file inside the path of a mdstore version")
|
|
@GetMapping("/version/{versionId}/parquet/files")
|
|
public Set<String> listVersionFiles(@PathVariable final String versionId) throws MDStoreManagerException {
|
|
final String path = databaseUtils.findVersion(versionId).getHdfsPath();
|
|
return hdfsClient.listContent(path + "/store", HdfsClient::isParquetFile);
|
|
}
|
|
|
|
@ApiOperation("read the parquet file of a mdstore version")
|
|
@GetMapping("/version/{versionId}/parquet/content/{limit}")
|
|
public List<Map<String, String>> listVersionParquet(@PathVariable final String versionId, @PathVariable final long limit) throws MDStoreManagerException {
|
|
final String path = databaseUtils.findVersion(versionId).getHdfsPath();
|
|
return hdfsClient.readParquetFiles(path + "/store", limit);
|
|
}
|
|
|
|
@ApiOperation("read the parquet file of a mdstore (current version)")
|
|
@GetMapping("/mdstore/{mdId}/parquet/content/{limit}")
|
|
public List<Map<String, String>> listMdstoreParquet(@PathVariable final String mdId, @PathVariable final long limit) throws MDStoreManagerException {
|
|
final String versionId = databaseUtils.findMdStore(mdId).getCurrentVersion();
|
|
final String path = databaseUtils.findVersion(versionId).getHdfsPath();
|
|
return hdfsClient.readParquetFiles(path + "/store", limit);
|
|
}
|
|
|
|
protected void setDatabaseUtils(final DatabaseUtils databaseUtils) {
|
|
this.databaseUtils = databaseUtils;
|
|
}
|
|
|
|
protected void setHdfsClient(final HdfsClient hdfsClient) {
|
|
this.hdfsClient = hdfsClient;
|
|
}
|
|
|
|
}
|