cleaning expired dumps
This commit is contained in:
parent
d5dac968a8
commit
01d54a3075
6
pom.xml
6
pom.xml
|
@ -9,10 +9,10 @@
|
|||
<relativePath/>
|
||||
</parent>
|
||||
<groupId>eu.dnetlib.apps</groupId>
|
||||
<artifactId>Oai2ftp</artifactId>
|
||||
<artifactId>OaiCollector</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<name>Oai2ftp</name>
|
||||
<description>Spring Boot Application that collects metadata from a generic OAI publisher and stores them in a FTP server</description>
|
||||
<name>OaiCollector</name>
|
||||
<description>Spring Boot Application that collects metadata from a generic OAI publisher</description>
|
||||
<properties>
|
||||
<java.version>17</java.version>
|
||||
</properties>
|
||||
|
|
|
@ -2,7 +2,6 @@ package eu.dnetlib.apps.oai.controller;
|
|||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -38,28 +37,28 @@ public class ApiController {
|
|||
@RequestParam(required = false) final String oaiSet,
|
||||
@RequestParam(required = false) final LocalDateTime oaiFrom,
|
||||
@RequestParam(required = false) final LocalDateTime oaiUntil,
|
||||
@RequestParam(required = false) final Long max,
|
||||
@RequestParam(required = false, defaultValue = "${oai.conf.maxRecords}") final Long max,
|
||||
@RequestParam(required = false) final String notificationEmail) {
|
||||
return service.startCollection(oaiBaseUrl, oaiFormat, oaiSet, oaiFrom, oaiUntil, max, notificationEmail);
|
||||
}
|
||||
|
||||
@GetMapping("/history/{id}")
|
||||
public CollectionInfo getExecutionStatus(@PathVariable final String id) {
|
||||
return service.getStatus(id);
|
||||
public CollectionInfo getCollectionInfo(@PathVariable final String id) {
|
||||
return service.getCollectionInfo(id);
|
||||
}
|
||||
|
||||
@GetMapping("/history")
|
||||
public Map<String, ExecutionStatus> history() {
|
||||
return service.history()
|
||||
.entrySet()
|
||||
.stream()
|
||||
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().getExecutionStatus()));
|
||||
public Map<String, ExecutionStatus> history(@RequestParam(required = false, defaultValue = "false") final boolean all) {
|
||||
return service.history(all);
|
||||
}
|
||||
|
||||
@DeleteMapping("/history")
|
||||
public Map<String, ExecutionStatus> cleanHistory() {
|
||||
service.cleanHistory();
|
||||
return history();
|
||||
public Map<String, ExecutionStatus> cleanHistory(@RequestParam(required = false, defaultValue = "false") final boolean all) throws Exception {
|
||||
if (all) {
|
||||
service.forceExpired();
|
||||
}
|
||||
service.cronCleanJobs();
|
||||
return history(true);
|
||||
}
|
||||
|
||||
@ExceptionHandler(Exception.class)
|
||||
|
|
|
@ -6,6 +6,7 @@ import org.springframework.beans.factory.annotation.Autowired;
|
|||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.core.io.FileSystemResource;
|
||||
import org.springframework.core.io.Resource;
|
||||
import org.springframework.http.ContentDisposition;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
|
@ -26,12 +27,12 @@ public class ZipExporterController {
|
|||
public ResponseEntity<Resource> download(@PathVariable final String id) {
|
||||
final HttpHeaders httpHeaders = new HttpHeaders();
|
||||
|
||||
final File file = new File(service.getStatus(id).getStorageUrl());
|
||||
final File file = new File(service.getCollectionInfo(id).getStorageUrl());
|
||||
|
||||
final FileSystemResource resource = new FileSystemResource(file);
|
||||
httpHeaders.set(HttpHeaders.LAST_MODIFIED, String.valueOf(file.lastModified()));
|
||||
httpHeaders.set(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + file.getName() + "\"");
|
||||
httpHeaders.set(HttpHeaders.CONTENT_LENGTH, String.valueOf(file.length()));
|
||||
httpHeaders.setContentDisposition(ContentDisposition.attachment().filename(file.getName()).build());
|
||||
httpHeaders.setLastModified(file.lastModified());
|
||||
httpHeaders.setContentLength(file.length());
|
||||
|
||||
return ResponseEntity.ok()
|
||||
.headers(httpHeaders)
|
||||
|
|
|
@ -4,5 +4,6 @@ public enum ExecutionStatus {
|
|||
READY,
|
||||
RUNNING,
|
||||
COMPLETED,
|
||||
FAILED
|
||||
FAILED,
|
||||
EXPIRED
|
||||
}
|
||||
|
|
|
@ -1,9 +1,16 @@
|
|||
package eu.dnetlib.apps.oai.repository;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
import org.springframework.data.jpa.repository.JpaRepository;
|
||||
import org.springframework.data.jpa.repository.Modifying;
|
||||
import org.springframework.data.jpa.repository.Query;
|
||||
|
||||
import eu.dnetlib.apps.oai.model.CollectionInfo;
|
||||
|
||||
public interface CollectionInfoRepository extends JpaRepository<CollectionInfo, String> {
|
||||
|
||||
@Modifying
|
||||
@Query("update CollectionInfo c set c.expirationDate = ?1 where c.expirationDate > ?1")
|
||||
void forceExpirationDate(LocalDateTime date);
|
||||
}
|
||||
|
|
|
@ -1,10 +1,13 @@
|
|||
package eu.dnetlib.apps.oai.service;
|
||||
|
||||
import java.io.File;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.Collection;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -21,17 +24,18 @@ import org.springframework.beans.factory.annotation.Autowired;
|
|||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.FileSystemUtils;
|
||||
|
||||
import eu.dnetlib.apps.oai.model.CollectionCall;
|
||||
import eu.dnetlib.apps.oai.model.CollectionInfo;
|
||||
import eu.dnetlib.apps.oai.model.ExecutionStatus;
|
||||
import eu.dnetlib.apps.oai.repository.CollectionInfoRepository;
|
||||
import eu.dnetlib.apps.oai.storage.StorageCleaner;
|
||||
import eu.dnetlib.apps.oai.storage.StorageClient;
|
||||
import eu.dnetlib.apps.oai.storage.StorageClientFactory;
|
||||
import eu.dnetlib.apps.oai.utils.EmailSender;
|
||||
import eu.dnetlib.apps.oai.utils.HttpFetcher;
|
||||
import eu.dnetlib.apps.oai.utils.SimpleUtils;
|
||||
import jakarta.transaction.Transactional;
|
||||
|
||||
@Service
|
||||
public class CollectorService {
|
||||
|
@ -45,14 +49,8 @@ public class CollectorService {
|
|||
@Autowired
|
||||
private StorageClientFactory storageClientFactory;
|
||||
|
||||
@Autowired
|
||||
private StorageCleaner storageCleaner;
|
||||
|
||||
@Value("${oai.conf.execution.expirationTime}")
|
||||
private long fullInfoExpirationTime; // in hours
|
||||
|
||||
@Value("${oai.conf.maxRecords}")
|
||||
private long defaultMaxRecords;
|
||||
@Value("${oai.conf.execution.duration.hours}")
|
||||
private long executionDuration;
|
||||
|
||||
@Autowired
|
||||
private CollectionInfoRepository collectionInfoRepository;
|
||||
|
@ -93,7 +91,7 @@ public class CollectorService {
|
|||
if (max != null && max > 0) {
|
||||
info.setMax(max);
|
||||
} else {
|
||||
info.setMax(defaultMaxRecords);
|
||||
info.setMax(Long.MAX_VALUE);
|
||||
}
|
||||
|
||||
infoMap.put(jobId, info);
|
||||
|
@ -105,7 +103,7 @@ public class CollectorService {
|
|||
oaiCollect(sc, info);
|
||||
info.setExecutionStatus(ExecutionStatus.COMPLETED);
|
||||
info.setEnd(now);
|
||||
info.setExpirationDate(now.plusHours(fullInfoExpirationTime));
|
||||
info.setExpirationDate(now.plusHours(executionDuration));
|
||||
emailSender.notifySuccess(info);
|
||||
} catch (final Throwable e) {
|
||||
info.setExecutionStatus(ExecutionStatus.FAILED);
|
||||
|
@ -162,7 +160,7 @@ public class CollectorService {
|
|||
}
|
||||
}
|
||||
|
||||
public CollectionInfo getStatus(final String jobId) {
|
||||
public CollectionInfo getCollectionInfo(final String jobId) {
|
||||
final CollectionInfo info = infoMap.get(jobId);
|
||||
if (info != null) {
|
||||
return info;
|
||||
|
@ -174,36 +172,61 @@ public class CollectorService {
|
|||
|
||||
@Scheduled(fixedRate = 30, timeUnit = TimeUnit.MINUTES)
|
||||
public void cronCleanJobs() throws Exception {
|
||||
cleanHistory(fullInfoExpirationTime);
|
||||
final Set<String> excluded = infoMap.keySet();
|
||||
storageCleaner.cleanCollectedData(excluded);
|
||||
}
|
||||
|
||||
public void cleanHistory() {
|
||||
cleanHistory(0);
|
||||
}
|
||||
|
||||
private void cleanHistory(final long expirationTime) {
|
||||
final Set<String> toDelete = infoMap.entrySet()
|
||||
final List<CollectionInfo> toClean = collectionInfoRepository.findAll()
|
||||
.stream()
|
||||
.filter(e -> {
|
||||
final ExecutionStatus status = e.getValue().getExecutionStatus();
|
||||
.filter(info -> {
|
||||
final ExecutionStatus status = info.getExecutionStatus();
|
||||
return status == ExecutionStatus.COMPLETED || status == ExecutionStatus.FAILED;
|
||||
})
|
||||
.filter(e -> {
|
||||
final LocalDateTime expDate = e.getValue().getExpirationDate();
|
||||
.filter(info -> {
|
||||
final LocalDateTime expDate = info.getExpirationDate();
|
||||
return expDate != null && expDate.isBefore(LocalDateTime.now());
|
||||
})
|
||||
.map(e -> e.getKey())
|
||||
.collect(Collectors.toSet());
|
||||
.collect(Collectors.toList());
|
||||
|
||||
log.info("Cleaning expired jobs: " + StringUtils.join(toDelete, ", "));
|
||||
for (final CollectionInfo info : toClean) {
|
||||
log.info("[CLEAN] Cleaning expired job: " + info.getId());
|
||||
infoMap.remove(info.getId());
|
||||
|
||||
toDelete.forEach(infoMap::remove);
|
||||
info.setExecutionStatus(ExecutionStatus.EXPIRED);
|
||||
collectionInfoRepository.save(info);
|
||||
|
||||
cleanCollectedData(info.getStorageUrl());
|
||||
}
|
||||
}
|
||||
|
||||
public Map<String, CollectionInfo> history() {
|
||||
return infoMap;
|
||||
public Map<String, ExecutionStatus> history(final boolean includeExpired) {
|
||||
final Map<String, ExecutionStatus> res = new LinkedHashMap<>();
|
||||
res.putAll(history(collectionInfoRepository.findAll(), includeExpired));
|
||||
res.putAll(history(infoMap.values(), includeExpired));
|
||||
return res;
|
||||
}
|
||||
|
||||
private Map<String, ExecutionStatus> history(final Collection<CollectionInfo> infos, final boolean includeExpired) {
|
||||
return infos.stream()
|
||||
.filter(info -> includeExpired || info.getExecutionStatus() != ExecutionStatus.EXPIRED)
|
||||
.collect(Collectors.toMap(CollectionInfo::getId, CollectionInfo::getExecutionStatus));
|
||||
}
|
||||
|
||||
public static void cleanCollectedData(final String storageUrl) throws URISyntaxException {
|
||||
log.info("[CLEAN] - Deleting: " + storageUrl);
|
||||
|
||||
final URI uri = new URI(storageUrl);
|
||||
final String protocol = uri.getScheme();
|
||||
final String path = uri.getPath();
|
||||
final File f = new File(path);
|
||||
|
||||
if (protocol.equalsIgnoreCase("zip") && path.endsWith(".zip")) {
|
||||
f.delete();
|
||||
} else if (protocol.equalsIgnoreCase("file") && f.isDirectory()) {
|
||||
FileSystemUtils.deleteRecursively(f);
|
||||
}
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public void forceExpired() {
|
||||
collectionInfoRepository.forceExpirationDate(LocalDateTime.now());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package eu.dnetlib.apps.oai.storage;
|
|||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -19,19 +20,21 @@ public class FtpStorage extends StorageClient {
|
|||
|
||||
private long currentPage = -1;
|
||||
|
||||
public FtpStorage(final String baseDir,
|
||||
final String jobId,
|
||||
final String ftpServer,
|
||||
final int ftpPort,
|
||||
final boolean ftpSecure,
|
||||
public FtpStorage(final String jobId,
|
||||
final URI storageBasePath,
|
||||
final String ftpUser,
|
||||
final String ftpPassword) {
|
||||
|
||||
super(baseDir, jobId);
|
||||
this.ftp = ftpConnect(ftpServer, ftpPort, ftpSecure);
|
||||
super(jobId, storageBasePath);
|
||||
|
||||
final String protocol = storageBasePath.getScheme();
|
||||
final String host = storageBasePath.getHost();
|
||||
final int port = storageBasePath.getPort();
|
||||
|
||||
this.ftp = ftpConnect(host, port, protocol.equalsIgnoreCase("ftps"));
|
||||
|
||||
ftpLogin(ftpUser, ftpPassword);
|
||||
changeDir(baseDir);
|
||||
changeDir(storageBasePath.getPath());
|
||||
changeDir(jobId);
|
||||
}
|
||||
|
||||
|
@ -123,9 +126,4 @@ public class FtpStorage extends StorageClient {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getStorageUrl() {
|
||||
return getBaseDir() + "/" + getJobId();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package eu.dnetlib.apps.oai.storage;
|
|||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
|
@ -19,10 +20,11 @@ public class LocalStorage extends StorageClient {
|
|||
|
||||
private String rootDir;
|
||||
|
||||
public LocalStorage(final String baseDir, final String jobId) {
|
||||
super(baseDir, jobId);
|
||||
public LocalStorage(final String jobId, final URI storageBasePath) {
|
||||
super(jobId, storageBasePath);
|
||||
|
||||
try {
|
||||
final File d = new File(baseDir + "/" + jobId);
|
||||
final File d = new File(storageBasePath.getPath() + "/" + jobId);
|
||||
FileUtils.forceMkdir(d);
|
||||
this.rootDir = d.getAbsolutePath();
|
||||
} catch (final IOException e) {
|
||||
|
@ -51,9 +53,4 @@ public class LocalStorage extends StorageClient {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getStorageUrl() {
|
||||
return getBaseDir() + "/" + getJobId();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,49 +0,0 @@
|
|||
package eu.dnetlib.apps.oai.storage;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FilenameFilter;
|
||||
import java.net.URI;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.FileSystemUtils;
|
||||
|
||||
@Service
|
||||
public class StorageCleaner {
|
||||
|
||||
private static final Log log = LogFactory.getLog(StorageCleaner.class);
|
||||
|
||||
@Value("${oai.conf.storage.basePath}")
|
||||
private URI storageBasePath;
|
||||
|
||||
public void cleanCollectedData(final Set<String> excluded) {
|
||||
final String protocol = storageBasePath.getScheme();
|
||||
final String path = storageBasePath.getPath();
|
||||
|
||||
if (protocol.equalsIgnoreCase("zip")) {
|
||||
deleteLocal(path, (f, name) -> name.endsWith(".zip"), excluded);
|
||||
} else if (protocol.equalsIgnoreCase("file")) {
|
||||
deleteLocal(path, (f, name) -> f.isDirectory(), excluded);
|
||||
}
|
||||
}
|
||||
|
||||
private void deleteLocal(final String path, final FilenameFilter filter, final Set<String> excluded) {
|
||||
final File baseDir = new File(path);
|
||||
|
||||
if (!baseDir.exists()) { return; }
|
||||
|
||||
for (final File f : baseDir.listFiles(filter)) {
|
||||
if (!excluded.contains(f.getName())) {
|
||||
log.info("Delete expired file: " + f.getName());
|
||||
if (f.isDirectory()) {
|
||||
FileSystemUtils.deleteRecursively(f);
|
||||
} else {
|
||||
f.delete();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,13 +1,15 @@
|
|||
package eu.dnetlib.apps.oai.storage;
|
||||
|
||||
import java.net.URI;
|
||||
|
||||
public abstract class StorageClient {
|
||||
|
||||
private final String baseDir;
|
||||
private final String jobId;
|
||||
private final URI storageBasePath;
|
||||
|
||||
public StorageClient(final String baseDir, final String jobId) {
|
||||
this.baseDir = baseDir;
|
||||
public StorageClient(final String jobId, final URI storageBasePath) {
|
||||
this.jobId = jobId;
|
||||
this.storageBasePath = storageBasePath;
|
||||
}
|
||||
|
||||
public void complete() {};
|
||||
|
@ -16,14 +18,8 @@ public abstract class StorageClient {
|
|||
|
||||
abstract public void saveFile(String filename, String body);
|
||||
|
||||
abstract public String getStorageUrl();
|
||||
|
||||
public String getBaseDir() {
|
||||
return baseDir;
|
||||
}
|
||||
|
||||
public String getJobId() {
|
||||
return jobId;
|
||||
public String getStorageUrl() {
|
||||
return storageBasePath + "/" + jobId;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -20,18 +20,13 @@ public class StorageClientFactory {
|
|||
public StorageClient newClient(final String jobId) {
|
||||
|
||||
final String protocol = storageBasePath.getScheme();
|
||||
final String host = storageBasePath.getHost();
|
||||
final int port = storageBasePath.getPort();
|
||||
final String path = storageBasePath.getPath();
|
||||
|
||||
if (protocol.equalsIgnoreCase("ftp")) {
|
||||
return new FtpStorage(path, jobId, host, port, false, storageUser, storagePassword);
|
||||
} else if (protocol.equalsIgnoreCase("ftps")) {
|
||||
return new FtpStorage(path, jobId, host, port, true, storageUser, storagePassword);
|
||||
if (protocol.equalsIgnoreCase("ftp") || protocol.equalsIgnoreCase("ftps")) {
|
||||
return new FtpStorage(jobId, storageBasePath, storageUser, storagePassword);
|
||||
} else if (protocol.equalsIgnoreCase("file")) {
|
||||
return new LocalStorage(path, jobId);
|
||||
return new LocalStorage(jobId, storageBasePath);
|
||||
} else if (protocol.equalsIgnoreCase("zip")) {
|
||||
return new ZipStorage(path, jobId);
|
||||
return new ZipStorage(jobId, storageBasePath);
|
||||
} else {
|
||||
throw new RuntimeException("Invalid storage protocol: " + protocol + " (valid protocol are: file, ftp and ftps)");
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package eu.dnetlib.apps.oai.storage;
|
|||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.zip.ZipEntry;
|
||||
import java.util.zip.ZipOutputStream;
|
||||
|
||||
|
@ -20,10 +21,11 @@ public class ZipStorage extends StorageClient {
|
|||
private ZipOutputStream zipOut;
|
||||
private long currPage = -1;
|
||||
|
||||
public ZipStorage(final String baseDir, final String jobId) {
|
||||
super(baseDir, jobId);
|
||||
public ZipStorage(final String jobId, final URI storageBasePath) {
|
||||
super(jobId, storageBasePath);
|
||||
|
||||
try {
|
||||
final File rootDir = new File(baseDir);
|
||||
final File rootDir = new File(storageBasePath.getPath());
|
||||
FileUtils.forceMkdir(rootDir);
|
||||
|
||||
this.fos = new FileOutputStream(rootDir.getAbsolutePath() + "/" + jobId + ".zip");
|
||||
|
@ -72,7 +74,7 @@ public class ZipStorage extends StorageClient {
|
|||
|
||||
@Override
|
||||
public String getStorageUrl() {
|
||||
return getBaseDir() + "/" + getJobId() + ".zip";
|
||||
return super.getStorageUrl() + ".zip";
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -4,7 +4,8 @@ swagger.api_title = Oai2Ftp
|
|||
swagger.api_desc = API Documentation
|
||||
swagger.api_version = 0.0.1
|
||||
|
||||
spring.datasource.url=jdbc:h2:mem:
|
||||
#spring.datasource.url=jdbc:h2:mem:
|
||||
spring.datasource.url=jdbc:h2:file:/tmp/oai_dumps_db
|
||||
spring.datasource.username=
|
||||
spring.datasource.password=
|
||||
spring.jpa.show-sql=false
|
||||
|
@ -21,7 +22,7 @@ oai.conf.storage.password = testPwd
|
|||
|
||||
oai.conf.public.basePath = http://localhost:8080/download
|
||||
|
||||
oai.conf.execution.expirationTime = 12
|
||||
oai.conf.execution.duration.hours = 72
|
||||
oai.conf.maxRecords = 1000
|
||||
|
||||
oai.conf.enable.export.api = true
|
||||
|
|
Loading…
Reference in New Issue