package eu.dnetlib.data.mdstore; import java.util.Date; import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import javax.transaction.Transactional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Service; import eu.dnetlib.data.mdstore.repository.MDStoreCurrentVersionRepository; import eu.dnetlib.data.mdstore.repository.MDStoreRepository; import eu.dnetlib.data.mdstore.repository.MDStoreVersionRepository; import eu.dnetlib.data.mdstore.repository.MDStoreWithInfoRepository; import eu.dnetlib.dhp.schema.mdstore.MDStore; import eu.dnetlib.dhp.schema.mdstore.MDStoreCurrentVersion; import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion; import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo; import eu.dnetlib.errors.MDStoreManagerException; @Service public class MDStoreService { @Autowired private MDStoreRepository mdstoreRepository; @Autowired private MDStoreVersionRepository mdstoreVersionRepository; @Autowired private MDStoreCurrentVersionRepository mdstoreCurrentVersionRepository; @Autowired private MDStoreWithInfoRepository mdstoreWithInfoRepository; @Autowired private JdbcTemplate jdbcTemplate; @Value("${dhp.mdstore-manager.hdfs.base-path}") private String hdfsBasePath; private static final Logger log = LoggerFactory.getLogger(MDStoreService.class); public Iterable listMdStores() { return mdstoreWithInfoRepository.findAll(); } public List listMdStoreIDs() { return mdstoreRepository.findAll().stream().map(MDStore::getId).collect(Collectors.toList()); } public long countMdStores() { return mdstoreRepository.count(); } public Iterable listVersions(final String mdId) { return mdstoreVersionRepository.findByMdstore(mdId); } public List listExpiredVersions() { return jdbcTemplate .queryForList("select v.id from mdstore_versions v left outer join mdstore_current_versions cv on (v.id = cv.current_version) where v.writing = false and v.readcount = 0 and cv.mdstore is null;", String.class); } public MDStoreWithInfo findMdStore(final String mdId) throws MDStoreManagerException { return mdstoreWithInfoRepository.findById(mdId).orElseThrow(() -> new MDStoreManagerException("Missing mdstore: " + mdId)); } public MDStoreVersion findVersion(final String versionId) throws MDStoreManagerException { return mdstoreVersionRepository.findById(versionId).orElseThrow(() -> new MDStoreManagerException("Missing mdstore version: " + versionId)); } @Transactional public String createMDStore(final String format, final String layout, final String interpretation, final String dsName, final String dsId, final String apiId) { final MDStore md = MDStore.newInstance(format, layout, interpretation, dsName, dsId, apiId, hdfsBasePath); mdstoreRepository.save(md); final MDStoreVersion v = MDStoreVersion.newInstance(md.getId(), false, hdfsBasePath); v.setLastUpdate(new Date()); mdstoreVersionRepository.save(v); mdstoreCurrentVersionRepository.save(MDStoreCurrentVersion.newInstance(v)); return md.getId(); } @Transactional public String deleteMdStore(final String mdId) throws MDStoreManagerException { final Optional md = mdstoreRepository.findById(mdId); if (!md.isPresent()) { log.error("MDStore not found: " + mdId); throw new MDStoreManagerException("MDStore not found: " + mdId); } if (mdstoreVersionRepository.countByMdstoreAndReadCountGreaterThan(mdId, 0) > 0) { log.error("Read transactions found on mdstore: " + mdId); throw new MDStoreManagerException("Read transactions found on mdstore: " + mdId); } if (mdstoreVersionRepository.countByMdstoreAndWriting(mdId, true) > 0) { log.error("Write transactions found on mdstore: " + mdId); throw new MDStoreManagerException("Write transactions found on mdstore: " + mdId); } mdstoreCurrentVersionRepository.deleteById(mdId); mdstoreVersionRepository.deleteByMdstore(mdId); mdstoreRepository.deleteById(mdId); return md.get().getHdfsPath(); } @Transactional public MDStoreVersion startReading(final String mdId) throws MDStoreManagerException { final MDStoreCurrentVersion cv = mdstoreCurrentVersionRepository.findById(mdId).orElseThrow(() -> new MDStoreManagerException("Missing mdstore: " + mdId)); final MDStoreVersion v = mdstoreVersionRepository.findById(cv.getCurrentVersion()) .orElseThrow(() -> new MDStoreManagerException("Missing version: " + cv.getCurrentVersion())); v.setReadCount(v.getReadCount() + 1); mdstoreVersionRepository.save(v); return v; } @Transactional public MDStoreVersion endReading(final String versionId) throws MDStoreManagerException { final MDStoreVersion v = mdstoreVersionRepository.findById(versionId).orElseThrow(() -> new MDStoreManagerException("Version not found")); v.setReadCount(Math.max(0, v.getReadCount() - 1)); return v; } @Transactional public MDStoreVersion resetReading(final String versionId) throws MDStoreManagerException { final MDStoreVersion v = mdstoreVersionRepository.findById(versionId).orElseThrow(() -> new MDStoreManagerException("Version not found")); v.setReadCount(0); return v; } @Transactional public MDStoreVersion prepareMdStoreVersion(final String mdId) { final MDStoreVersion v = MDStoreVersion.newInstance(mdId, true, hdfsBasePath); mdstoreVersionRepository.save(v); return v; } @Transactional public MDStoreVersion commitMdStoreVersion(final String versionId, final long size) throws MDStoreManagerException { final MDStoreVersion v = mdstoreVersionRepository.findById(versionId).orElseThrow(() -> new MDStoreManagerException("Invalid version: " + versionId)); mdstoreCurrentVersionRepository.save(MDStoreCurrentVersion.newInstance(v)); v.setWriting(false); v.setSize(size); v.setLastUpdate(new Date()); mdstoreVersionRepository.save(v); return v; } @Transactional public String deleteMdStoreVersion(final String versionId, final boolean force) throws MDStoreManagerException { final MDStoreVersion v = mdstoreVersionRepository.findById(versionId).orElseThrow(() -> new MDStoreManagerException("Version not found")); if (mdstoreCurrentVersionRepository .countByCurrentVersion(versionId) > 0) { throw new MDStoreManagerException("I cannot delete this version because it is the current version"); } if (!force) { if (v.isWriting()) { throw new MDStoreManagerException("I cannot delete this version because it is in write mode"); } if (v.getReadCount() > 0) { throw new MDStoreManagerException("I cannot delete this version because it is in read mode"); } } mdstoreVersionRepository.delete(v); return v.getHdfsPath(); } public Set listValidHdfsPaths() { return new HashSet<>(jdbcTemplate .queryForList(" select hdfs_path from mdstores union all select hdfs_path from mdstore_versions", String.class)); } public String getHdfsBasePath() { return hdfsBasePath; } public void setHdfsBasePath(final String hdfsBasePath) { this.hdfsBasePath = hdfsBasePath; } }