83 lines
2.7 KiB
Java
83 lines
2.7 KiB
Java
package eu.dnetlib.data.mdstore.hadoop;
|
|
|
|
import java.util.HashSet;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Set;
|
|
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
import org.springframework.stereotype.Service;
|
|
|
|
import com.google.common.collect.Sets;
|
|
|
|
import eu.dnetlib.data.mdstore.MDStoreService;
|
|
import eu.dnetlib.data.mdstore.model.MDStore;
|
|
import eu.dnetlib.data.mdstore.model.MDStoreVersion;
|
|
import eu.dnetlib.errors.MDStoreManagerException;
|
|
|
|
@Service
|
|
public class HadoopMDStoreService extends MDStoreService {
|
|
|
|
private static final Logger log = LoggerFactory.getLogger(HadoopMDStoreService.class);
|
|
|
|
@Autowired
|
|
protected HdfsClient hdfsClient;
|
|
|
|
@Value("${dhp.mdstore-manager.hdfs.base-path}")
|
|
private String hdfsBasePath;
|
|
|
|
@Override
|
|
protected void deleteFromBackend(final String path) throws MDStoreManagerException {
|
|
hdfsClient.deletePath(path);
|
|
}
|
|
|
|
@Override
|
|
protected MDStore newMDStore(final String format,
|
|
final String layout,
|
|
final String interpretation,
|
|
final String dsName,
|
|
final String dsId,
|
|
final String apiId) {
|
|
return MDStore.newInstance(format, layout, interpretation, dsName, dsId, apiId, hdfsBasePath);
|
|
}
|
|
|
|
@Override
|
|
protected MDStoreVersion newMDStoreVersion(final String mdId, final boolean writing) {
|
|
return MDStoreVersion.newInstance(mdId, writing, hdfsBasePath);
|
|
}
|
|
|
|
public Set<String> fixHdfsInconsistencies(final boolean delete) throws MDStoreManagerException {
|
|
final Set<String> hdfsDirs = hdfsClient.listHadoopDirs();
|
|
final Set<String> validDirs = new HashSet<>(jdbcTemplate
|
|
.queryForList("select hdfs_path from mdstores union all select hdfs_path from mdstore_versions", String.class));
|
|
|
|
final Set<String> toDelete = Sets.difference(hdfsDirs, validDirs);
|
|
log.info("Found " + toDelete.size() + " hdfs paths to remove");
|
|
|
|
if (delete) {
|
|
for (final String p : toDelete) {
|
|
deleteFromBackend(p);
|
|
}
|
|
}
|
|
return toDelete;
|
|
}
|
|
|
|
public List<Map<String, String>> listMdstoreParquet(final String mdId, final long limit) throws MDStoreManagerException {
|
|
return listVersionParquet(findMdStore(mdId).getCurrentVersion(), limit);
|
|
}
|
|
|
|
public List<Map<String, String>> listVersionParquet(final String versionId, final long limit) throws MDStoreManagerException {
|
|
final String path = findVersion(versionId).getHdfsPath();
|
|
return hdfsClient.readParquetFiles(path + "/store", limit);
|
|
}
|
|
|
|
public Set<String> listVersionFiles(final String versionId) throws MDStoreManagerException {
|
|
final String path = findVersion(versionId).getHdfsPath();
|
|
return hdfsClient.listContent(path + "/store", HdfsClient::isParquetFile);
|
|
}
|
|
|
|
}
|