103 lines
3.3 KiB
Java
103 lines
3.3 KiB
Java
package eu.dnetlib.data.mdstore.backends;
|
|
|
|
import java.util.HashSet;
|
|
import java.util.List;
|
|
import java.util.Set;
|
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
import org.springframework.jdbc.core.JdbcTemplate;
|
|
import org.springframework.stereotype.Service;
|
|
|
|
import com.google.common.collect.Sets;
|
|
|
|
import eu.dnetlib.data.mdstore.hadoop.HdfsClient;
|
|
import eu.dnetlib.data.mdstore.model.MDStore;
|
|
import eu.dnetlib.data.mdstore.model.MDStoreVersion;
|
|
import eu.dnetlib.data.mdstore.model.MetadataRecord;
|
|
import eu.dnetlib.errors.MDStoreManagerException;
|
|
|
|
@Service
|
|
public class HdfsBackend implements MDStoreBackend {
|
|
|
|
private static final Logger log = LoggerFactory.getLogger(HdfsBackend.class);
|
|
|
|
private static final String HDFS_PATH = "hdfs_path";
|
|
|
|
@Autowired
|
|
protected HdfsClient hdfsClient;
|
|
|
|
@Autowired
|
|
private JdbcTemplate jdbcTemplate;
|
|
|
|
@Value("${dhp.mdstore-manager.hdfs.base-path}")
|
|
private String hdfsBasePath;
|
|
|
|
@Override
|
|
public void completeNewMDStore(final MDStore mdstore) {
|
|
mdstore.getParams().put(HDFS_PATH, String.format("%s/%s", hdfsBasePath, mdstore.getId()));
|
|
}
|
|
|
|
@Override
|
|
public void completeNewMDStoreVersion(final MDStoreVersion version) {
|
|
version.getParams().put(HDFS_PATH, String.format("%s/%s/%s", hdfsBasePath, version.getMdstore(), version.getId()));
|
|
}
|
|
|
|
@Override
|
|
public void delete(final MDStore md) throws MDStoreManagerException {
|
|
delete(md.getParams().getOrDefault("hdfs_path", "").toString());
|
|
}
|
|
|
|
@Override
|
|
public void delete(final MDStoreVersion version) throws MDStoreManagerException {
|
|
delete(version.getParams().getOrDefault("hdfs_path", "").toString());
|
|
}
|
|
|
|
private void delete(final String path) throws MDStoreManagerException {
|
|
if (StringUtils.isNotBlank(path)) {
|
|
hdfsClient.deletePath(path);
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public Set<String> fixInconsistencies(final boolean delete) throws MDStoreManagerException {
|
|
final Set<String> hdfsDirs = hdfsClient.listHadoopDirs();
|
|
final Set<String> validDirs = new HashSet<>(jdbcTemplate
|
|
.queryForList("select hdfs_path from mdstores union all select hdfs_path from mdstore_versions", String.class));
|
|
|
|
final Set<String> toDelete = Sets.difference(hdfsDirs, validDirs);
|
|
log.info("Found " + toDelete.size() + " hdfs paths to remove");
|
|
|
|
if (delete) {
|
|
for (final String p : toDelete) {
|
|
delete(p);
|
|
}
|
|
}
|
|
return toDelete;
|
|
}
|
|
|
|
@Override
|
|
public Set<String> listInternalFiles(final MDStoreVersion version) throws MDStoreManagerException {
|
|
final String path = version.getParams().getOrDefault("hdfs_path", "").toString();
|
|
if (StringUtils.isNotBlank(path)) {
|
|
return hdfsClient.listContent(path + "/store", HdfsClient::isParquetFile);
|
|
} else {
|
|
throw new MDStoreManagerException("hdfs path is missing");
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public List<MetadataRecord> listEntries(final MDStoreVersion version, final long limit) throws MDStoreManagerException {
|
|
final String path = version.getParams().getOrDefault("hdfs_path", "").toString();
|
|
if (StringUtils.isNotBlank(path)) {
|
|
return hdfsClient.readParquetFiles(path + "/store", limit, MetadataRecord.class);
|
|
} else {
|
|
throw new MDStoreManagerException("hdfs path is missing");
|
|
}
|
|
}
|
|
|
|
}
|