307 lines
12 KiB
Java
307 lines
12 KiB
Java
package eu.dnetlib.services.mdstores.service;
|
|
|
|
import java.time.LocalDateTime;
|
|
import java.time.ZoneOffset;
|
|
import java.util.Comparator;
|
|
import java.util.LinkedHashMap;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Set;
|
|
import java.util.UUID;
|
|
import java.util.function.Function;
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Stream;
|
|
import java.util.stream.StreamSupport;
|
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.jdbc.core.JdbcTemplate;
|
|
import org.springframework.stereotype.Service;
|
|
|
|
import eu.dnetlib.common.mdstores.MDStoreBackend;
|
|
import eu.dnetlib.common.mdstores.backends.sql.MDStoreSqlBackend;
|
|
import eu.dnetlib.domain.mdstore.MDStore;
|
|
import eu.dnetlib.domain.mdstore.MDStoreCurrentVersion;
|
|
import eu.dnetlib.domain.mdstore.MDStoreType;
|
|
import eu.dnetlib.domain.mdstore.MDStoreVersion;
|
|
import eu.dnetlib.domain.mdstore.MDStoreWithInfo;
|
|
import eu.dnetlib.domain.mdstore.records.MetadataRecord;
|
|
import eu.dnetlib.errors.MDStoreManagerException;
|
|
import eu.dnetlib.services.mdstores.backends.DefaultBackend;
|
|
import eu.dnetlib.services.mdstores.backends.MockBackend;
|
|
import eu.dnetlib.services.mdstores.repository.MDStoreCurrentVersionRepository;
|
|
import eu.dnetlib.services.mdstores.repository.MDStoreRepository;
|
|
import eu.dnetlib.services.mdstores.repository.MDStoreVersionRepository;
|
|
import eu.dnetlib.services.mdstores.repository.MDStoreWithInfoRepository;
|
|
import jakarta.transaction.Transactional;
|
|
|
|
@Service
|
|
public class MDStoreService {
|
|
|
|
@Autowired
|
|
private MDStoreRepository mdstoreRepository;
|
|
@Autowired
|
|
private MDStoreVersionRepository mdstoreVersionRepository;
|
|
@Autowired
|
|
private MDStoreCurrentVersionRepository mdstoreCurrentVersionRepository;
|
|
@Autowired
|
|
private MDStoreWithInfoRepository mdstoreWithInfoRepository;
|
|
@Autowired
|
|
protected JdbcTemplate jdbcTemplate;
|
|
|
|
@Autowired
|
|
private MockBackend mockBackend;
|
|
@Autowired
|
|
private DefaultBackend defaultBackend;
|
|
@Autowired
|
|
private MDStoreSqlBackend sqlBackend;
|
|
|
|
private static final Log log = LogFactory.getLog(MDStoreService.class);
|
|
|
|
public static final String REFRESH_MODE = "REFRESH";
|
|
public static final String INCREMENTAL_MODE = "INCREMENTAL";
|
|
|
|
public List<MDStoreWithInfo> listMdStores() {
|
|
|
|
return StreamSupport.stream(this.mdstoreWithInfoRepository.findAll().spliterator(), false)
|
|
.sorted(Comparator.comparing((Function<MDStoreWithInfo, String>) MDStoreWithInfo::getDatasourceName).thenComparing(MDStoreWithInfo::getId))
|
|
.collect(Collectors.toList());
|
|
}
|
|
|
|
public List<String> listMdStoreIDs() {
|
|
return this.mdstoreRepository.findAll().stream().map(MDStore::getId).sorted().collect(Collectors.toList());
|
|
}
|
|
|
|
public long countMdStores() {
|
|
return this.mdstoreRepository.count();
|
|
}
|
|
|
|
public Iterable<MDStoreVersion> listVersions(final String mdId) {
|
|
return this.mdstoreVersionRepository.findByMdstoreOrderById(mdId);
|
|
}
|
|
|
|
public List<String> listExpiredVersions() {
|
|
return this.jdbcTemplate
|
|
.queryForList("select v.id from mdstore_versions v left outer join mdstore_current_versions cv on (v.id = cv.current_version) where v.writing = false and v.readcount = 0 and cv.mdstore is null", String.class);
|
|
}
|
|
|
|
public MDStoreWithInfo findMdStore(final String mdId) throws MDStoreManagerException {
|
|
return this.mdstoreWithInfoRepository.findById(mdId).orElseThrow(() -> new MDStoreManagerException("Missing mdstore: " + mdId));
|
|
}
|
|
|
|
public MDStoreVersion findVersion(final String versionId) throws MDStoreManagerException {
|
|
return this.mdstoreVersionRepository.findById(versionId).orElseThrow(() -> new MDStoreManagerException("Missing mdstore version: " + versionId));
|
|
}
|
|
|
|
@Transactional
|
|
public String createMDStore(final String format,
|
|
final String layout,
|
|
final String interpretation,
|
|
final MDStoreType type,
|
|
final String dsName,
|
|
final String dsId,
|
|
final String apiId) {
|
|
|
|
final MDStore md = newMDStore(format, layout, interpretation, type, dsName, dsId, apiId, apiId);
|
|
this.mdstoreRepository.save(md);
|
|
|
|
final MDStoreVersion v = newMDStoreVersion(md, false);
|
|
this.mdstoreVersionRepository.save(v);
|
|
this.mdstoreCurrentVersionRepository.save(MDStoreCurrentVersion.newInstance(v));
|
|
|
|
return md.getId();
|
|
}
|
|
|
|
private MDStoreVersion newMDStoreVersion(final MDStore md, final boolean writing) {
|
|
final MDStoreVersion v = new MDStoreVersion();
|
|
|
|
final LocalDateTime now = LocalDateTime.now();
|
|
|
|
final String versionId = md.getId() + "-" + now.toEpochSecond(ZoneOffset.UTC);
|
|
v.setId(versionId);
|
|
v.setMdstore(md.getId());
|
|
v.setLastUpdate(null);
|
|
v.setWriting(writing);
|
|
v.setReadCount(0);
|
|
v.setSize(0);
|
|
v.setLastUpdate(now);
|
|
|
|
selectBackend(md.getType()).completeNewMDStoreVersion(v);
|
|
|
|
return v;
|
|
}
|
|
|
|
@Transactional
|
|
public void deleteMdStore(final String mdId) throws MDStoreManagerException {
|
|
|
|
final MDStore md = this.mdstoreRepository.findById(mdId).orElseThrow(() -> new MDStoreManagerException("MDStore not found: " + mdId));
|
|
|
|
if (this.mdstoreVersionRepository.countByMdstoreAndReadCountGreaterThan(mdId, 0) > 0) {
|
|
log.error("Read transactions found on mdstore: " + mdId);
|
|
throw new MDStoreManagerException("Read transactions found on mdstore: " + mdId);
|
|
}
|
|
|
|
if (this.mdstoreVersionRepository.countByMdstoreAndWriting(mdId, true) > 0) {
|
|
log.error("Write transactions found on mdstore: " + mdId);
|
|
throw new MDStoreManagerException("Write transactions found on mdstore: " + mdId);
|
|
}
|
|
|
|
this.mdstoreCurrentVersionRepository.deleteById(mdId);
|
|
this.mdstoreVersionRepository.deleteByMdstore(mdId);
|
|
this.mdstoreRepository.deleteById(mdId);
|
|
|
|
selectBackend(md.getType()).delete(md);
|
|
}
|
|
|
|
@Transactional
|
|
public MDStoreVersion startReading(final String mdId) throws MDStoreManagerException {
|
|
final MDStoreCurrentVersion cv =
|
|
this.mdstoreCurrentVersionRepository.findById(mdId).orElseThrow(() -> new MDStoreManagerException("Missing mdstore: " + mdId));
|
|
final MDStoreVersion v = this.mdstoreVersionRepository.findById(cv.getCurrentVersion())
|
|
.orElseThrow(() -> new MDStoreManagerException("Missing version: " + cv.getCurrentVersion()));
|
|
v.setReadCount(v.getReadCount() + 1);
|
|
this.mdstoreVersionRepository.save(v);
|
|
return v;
|
|
}
|
|
|
|
@Transactional
|
|
public MDStoreVersion endReading(final String versionId) throws MDStoreManagerException {
|
|
final MDStoreVersion v =
|
|
this.mdstoreVersionRepository.findById(versionId).orElseThrow(() -> new MDStoreManagerException("Version not found: " + versionId));
|
|
v.setReadCount(Math.max(0, v.getReadCount() - 1));
|
|
return v;
|
|
}
|
|
|
|
@Transactional
|
|
public MDStoreVersion resetReading(final String versionId) throws MDStoreManagerException {
|
|
final MDStoreVersion v =
|
|
this.mdstoreVersionRepository.findById(versionId).orElseThrow(() -> new MDStoreManagerException("Version not found: " + versionId));
|
|
v.setReadCount(0);
|
|
return v;
|
|
}
|
|
|
|
@Transactional
|
|
public MDStoreVersion prepareMdStoreVersion(final String mdId) throws MDStoreManagerException {
|
|
final MDStore md = this.mdstoreRepository.findById(mdId).orElseThrow(() -> new MDStoreManagerException("MDStore not found: " + mdId));
|
|
final MDStoreVersion v = newMDStoreVersion(md, true);
|
|
this.mdstoreVersionRepository.save(v);
|
|
return v;
|
|
}
|
|
|
|
public synchronized void deleteExpiredVersions() {
|
|
log.info("Deleting expired version...");
|
|
for (final String versionId : listExpiredVersions()) {
|
|
try {
|
|
deleteMdStoreVersion(versionId, true);
|
|
} catch (final MDStoreManagerException e) {
|
|
log.warn("Error deleteting version " + versionId, e);
|
|
}
|
|
}
|
|
log.info("Done.");
|
|
}
|
|
|
|
@Transactional
|
|
public void deleteMdStoreVersion(final String versionId, final boolean force) throws MDStoreManagerException {
|
|
|
|
final MDStoreVersion v = this.mdstoreVersionRepository.findById(versionId).orElseThrow(() -> new MDStoreManagerException("Version not found"));
|
|
final MDStore md = this.mdstoreRepository.findById(v.getMdstore()).orElseThrow(() -> new MDStoreManagerException("Version not found"));
|
|
|
|
if (this.mdstoreCurrentVersionRepository
|
|
.countByCurrentVersion(versionId) > 0) {
|
|
throw new MDStoreManagerException("I cannot delete this version because it is the current version: " + versionId);
|
|
}
|
|
|
|
if (!force) {
|
|
if (v.isWriting()) { throw new MDStoreManagerException("I cannot delete this version because it is in write mode: " + versionId); }
|
|
if (v.getReadCount() > 0) { throw new MDStoreManagerException("I cannot delete this version because it is in read mode: " + versionId); }
|
|
}
|
|
|
|
this.mdstoreVersionRepository.delete(v);
|
|
|
|
selectBackend(md.getType()).delete(v);
|
|
}
|
|
|
|
public List<MetadataRecord> listVersionRecords(final String versionId, final long limit) throws MDStoreManagerException {
|
|
final MDStoreVersion v =
|
|
this.mdstoreVersionRepository.findById(versionId).orElseThrow(() -> new MDStoreManagerException("Version not found: " + versionId));
|
|
final MDStore md =
|
|
this.mdstoreRepository.findById(v.getMdstore()).orElseThrow(() -> new MDStoreManagerException("MDStore not found: " + v.getMdstore()));
|
|
return selectBackend(md.getType()).listEntries(v, limit);
|
|
}
|
|
|
|
public Stream<MetadataRecord> streamVersionRecords(final String versionId) throws MDStoreManagerException {
|
|
final MDStoreVersion v =
|
|
this.mdstoreVersionRepository.findById(versionId).orElseThrow(() -> new MDStoreManagerException("Version not found: " + versionId));
|
|
final MDStore md =
|
|
this.mdstoreRepository.findById(v.getMdstore()).orElseThrow(() -> new MDStoreManagerException("MDStore not found: " + v.getMdstore()));
|
|
return selectBackend(md.getType()).streamEntries(v);
|
|
}
|
|
|
|
public MDStore newMDStore(
|
|
final String format,
|
|
final String layout,
|
|
final String interpretation,
|
|
final MDStoreType type,
|
|
final String dsName,
|
|
final String dsId,
|
|
final String apiId,
|
|
final String hdfsBasePath) {
|
|
|
|
final String mdId = "md-" + UUID.randomUUID();
|
|
|
|
final MDStore md = new MDStore();
|
|
md.setId(mdId);
|
|
md.setFormat(format);
|
|
md.setLayout(layout);
|
|
md.setType(type);
|
|
md.setInterpretation(interpretation);
|
|
md.setCreationDate(LocalDateTime.now());
|
|
md.setDatasourceName(dsName);
|
|
md.setDatasourceId(dsId);
|
|
md.setApiId(apiId);
|
|
|
|
selectBackend(type).completeNewMDStore(md);
|
|
|
|
return md;
|
|
}
|
|
|
|
public Map<MDStoreType, Set<String>> fixInconsistencies(final boolean delete) throws MDStoreManagerException {
|
|
final Map<MDStoreType, Set<String>> res = new LinkedHashMap<>();
|
|
// res.put(MDStoreType.HDFS, hdfsBackend.fixInconsistencies(delete));
|
|
res.put(MDStoreType.MOCK, this.mockBackend.fixInconsistencies(delete));
|
|
res.put(MDStoreType.SQL_DB, this.sqlBackend.fixInconsistencies(delete));
|
|
// TODO (LOW PRIORITY): ADD HERE THE INVOCATION FOR OTHER MDSTORE TYPE
|
|
|
|
return res;
|
|
}
|
|
|
|
private MDStoreBackend selectBackend(final MDStoreType type) {
|
|
return switch (type) {
|
|
case MOCK -> this.mockBackend;
|
|
case SQL_DB -> this.sqlBackend;
|
|
default -> this.defaultBackend;
|
|
};
|
|
}
|
|
|
|
public long commitMdStoreVersion(final String versionId, final MDStoreType mdStoreType) throws MDStoreManagerException {
|
|
final long count = selectBackend(mdStoreType).countRecords(versionId);
|
|
commitMdStoreVersion(versionId, count);
|
|
return count;
|
|
}
|
|
|
|
@Transactional
|
|
public MDStoreVersion commitMdStoreVersion(final String versionId, final long size) throws MDStoreManagerException {
|
|
final MDStoreVersion v =
|
|
this.mdstoreVersionRepository.findById(versionId).orElseThrow(() -> new MDStoreManagerException("Invalid version: " + versionId));
|
|
this.mdstoreCurrentVersionRepository.save(MDStoreCurrentVersion.newInstance(v));
|
|
v.setWriting(false);
|
|
v.setSize(size);
|
|
v.setLastUpdate(LocalDateTime.now());
|
|
this.mdstoreVersionRepository.save(v);
|
|
|
|
return v;
|
|
}
|
|
|
|
}
|