From 01d54a307570fd8f7747c12d02fc24ff21978162 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Fri, 22 Sep 2023 12:43:23 +0200 Subject: [PATCH] cleaning expired dumps --- pom.xml | 6 +- .../apps/oai/controller/ApiController.java | 23 +++-- .../oai/controller/ZipExporterController.java | 9 +- .../apps/oai/model/ExecutionStatus.java | 3 +- .../repository/CollectionInfoRepository.java | 7 ++ .../apps/oai/service/CollectorService.java | 89 ++++++++++++------- .../dnetlib/apps/oai/storage/FtpStorage.java | 24 +++-- .../apps/oai/storage/LocalStorage.java | 13 ++- .../apps/oai/storage/StorageCleaner.java | 49 ---------- .../apps/oai/storage/StorageClient.java | 18 ++-- .../oai/storage/StorageClientFactory.java | 13 +-- .../dnetlib/apps/oai/storage/ZipStorage.java | 10 ++- src/main/resources/application.properties | 5 +- 13 files changed, 120 insertions(+), 149 deletions(-) delete mode 100644 src/main/java/eu/dnetlib/apps/oai/storage/StorageCleaner.java diff --git a/pom.xml b/pom.xml index a3c7768..ec12627 100644 --- a/pom.xml +++ b/pom.xml @@ -9,10 +9,10 @@ eu.dnetlib.apps - Oai2ftp + OaiCollector 0.0.1-SNAPSHOT - Oai2ftp - Spring Boot Application that collects metadata from a generic OAI publisher and stores them in a FTP server + OaiCollector + Spring Boot Application that collects metadata from a generic OAI publisher 17 diff --git a/src/main/java/eu/dnetlib/apps/oai/controller/ApiController.java b/src/main/java/eu/dnetlib/apps/oai/controller/ApiController.java index d047e97..8f500a0 100644 --- a/src/main/java/eu/dnetlib/apps/oai/controller/ApiController.java +++ b/src/main/java/eu/dnetlib/apps/oai/controller/ApiController.java @@ -2,7 +2,6 @@ package eu.dnetlib.apps.oai.controller; import java.time.LocalDateTime; import java.util.Map; -import java.util.stream.Collectors; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.logging.Log; @@ -38,28 +37,28 @@ public class ApiController { @RequestParam(required = false) final String oaiSet, @RequestParam(required = false) final LocalDateTime oaiFrom, @RequestParam(required = false) final LocalDateTime oaiUntil, - @RequestParam(required = false) final Long max, + @RequestParam(required = false, defaultValue = "${oai.conf.maxRecords}") final Long max, @RequestParam(required = false) final String notificationEmail) { return service.startCollection(oaiBaseUrl, oaiFormat, oaiSet, oaiFrom, oaiUntil, max, notificationEmail); } @GetMapping("/history/{id}") - public CollectionInfo getExecutionStatus(@PathVariable final String id) { - return service.getStatus(id); + public CollectionInfo getCollectionInfo(@PathVariable final String id) { + return service.getCollectionInfo(id); } @GetMapping("/history") - public Map history() { - return service.history() - .entrySet() - .stream() - .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().getExecutionStatus())); + public Map history(@RequestParam(required = false, defaultValue = "false") final boolean all) { + return service.history(all); } @DeleteMapping("/history") - public Map cleanHistory() { - service.cleanHistory(); - return history(); + public Map cleanHistory(@RequestParam(required = false, defaultValue = "false") final boolean all) throws Exception { + if (all) { + service.forceExpired(); + } + service.cronCleanJobs(); + return history(true); } @ExceptionHandler(Exception.class) diff --git a/src/main/java/eu/dnetlib/apps/oai/controller/ZipExporterController.java b/src/main/java/eu/dnetlib/apps/oai/controller/ZipExporterController.java index 7e3543d..2d6d26d 100644 --- a/src/main/java/eu/dnetlib/apps/oai/controller/ZipExporterController.java +++ b/src/main/java/eu/dnetlib/apps/oai/controller/ZipExporterController.java @@ -6,6 +6,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.core.io.FileSystemResource; import org.springframework.core.io.Resource; +import org.springframework.http.ContentDisposition; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; @@ -26,12 +27,12 @@ public class ZipExporterController { public ResponseEntity download(@PathVariable final String id) { final HttpHeaders httpHeaders = new HttpHeaders(); - final File file = new File(service.getStatus(id).getStorageUrl()); + final File file = new File(service.getCollectionInfo(id).getStorageUrl()); final FileSystemResource resource = new FileSystemResource(file); - httpHeaders.set(HttpHeaders.LAST_MODIFIED, String.valueOf(file.lastModified())); - httpHeaders.set(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + file.getName() + "\""); - httpHeaders.set(HttpHeaders.CONTENT_LENGTH, String.valueOf(file.length())); + httpHeaders.setContentDisposition(ContentDisposition.attachment().filename(file.getName()).build()); + httpHeaders.setLastModified(file.lastModified()); + httpHeaders.setContentLength(file.length()); return ResponseEntity.ok() .headers(httpHeaders) diff --git a/src/main/java/eu/dnetlib/apps/oai/model/ExecutionStatus.java b/src/main/java/eu/dnetlib/apps/oai/model/ExecutionStatus.java index 10b331e..a066f23 100644 --- a/src/main/java/eu/dnetlib/apps/oai/model/ExecutionStatus.java +++ b/src/main/java/eu/dnetlib/apps/oai/model/ExecutionStatus.java @@ -4,5 +4,6 @@ public enum ExecutionStatus { READY, RUNNING, COMPLETED, - FAILED + FAILED, + EXPIRED } diff --git a/src/main/java/eu/dnetlib/apps/oai/repository/CollectionInfoRepository.java b/src/main/java/eu/dnetlib/apps/oai/repository/CollectionInfoRepository.java index 96ade73..8a6553a 100644 --- a/src/main/java/eu/dnetlib/apps/oai/repository/CollectionInfoRepository.java +++ b/src/main/java/eu/dnetlib/apps/oai/repository/CollectionInfoRepository.java @@ -1,9 +1,16 @@ package eu.dnetlib.apps.oai.repository; +import java.time.LocalDateTime; + import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Modifying; +import org.springframework.data.jpa.repository.Query; import eu.dnetlib.apps.oai.model.CollectionInfo; public interface CollectionInfoRepository extends JpaRepository { + @Modifying + @Query("update CollectionInfo c set c.expirationDate = ?1 where c.expirationDate > ?1") + void forceExpirationDate(LocalDateTime date); } diff --git a/src/main/java/eu/dnetlib/apps/oai/service/CollectorService.java b/src/main/java/eu/dnetlib/apps/oai/service/CollectorService.java index 2f7e016..4472798 100644 --- a/src/main/java/eu/dnetlib/apps/oai/service/CollectorService.java +++ b/src/main/java/eu/dnetlib/apps/oai/service/CollectorService.java @@ -1,10 +1,13 @@ package eu.dnetlib.apps.oai.service; +import java.io.File; +import java.net.URI; +import java.net.URISyntaxException; import java.time.LocalDateTime; +import java.util.Collection; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -21,17 +24,18 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; +import org.springframework.util.FileSystemUtils; import eu.dnetlib.apps.oai.model.CollectionCall; import eu.dnetlib.apps.oai.model.CollectionInfo; import eu.dnetlib.apps.oai.model.ExecutionStatus; import eu.dnetlib.apps.oai.repository.CollectionInfoRepository; -import eu.dnetlib.apps.oai.storage.StorageCleaner; import eu.dnetlib.apps.oai.storage.StorageClient; import eu.dnetlib.apps.oai.storage.StorageClientFactory; import eu.dnetlib.apps.oai.utils.EmailSender; import eu.dnetlib.apps.oai.utils.HttpFetcher; import eu.dnetlib.apps.oai.utils.SimpleUtils; +import jakarta.transaction.Transactional; @Service public class CollectorService { @@ -45,14 +49,8 @@ public class CollectorService { @Autowired private StorageClientFactory storageClientFactory; - @Autowired - private StorageCleaner storageCleaner; - - @Value("${oai.conf.execution.expirationTime}") - private long fullInfoExpirationTime; // in hours - - @Value("${oai.conf.maxRecords}") - private long defaultMaxRecords; + @Value("${oai.conf.execution.duration.hours}") + private long executionDuration; @Autowired private CollectionInfoRepository collectionInfoRepository; @@ -93,7 +91,7 @@ public class CollectorService { if (max != null && max > 0) { info.setMax(max); } else { - info.setMax(defaultMaxRecords); + info.setMax(Long.MAX_VALUE); } infoMap.put(jobId, info); @@ -105,7 +103,7 @@ public class CollectorService { oaiCollect(sc, info); info.setExecutionStatus(ExecutionStatus.COMPLETED); info.setEnd(now); - info.setExpirationDate(now.plusHours(fullInfoExpirationTime)); + info.setExpirationDate(now.plusHours(executionDuration)); emailSender.notifySuccess(info); } catch (final Throwable e) { info.setExecutionStatus(ExecutionStatus.FAILED); @@ -162,7 +160,7 @@ public class CollectorService { } } - public CollectionInfo getStatus(final String jobId) { + public CollectionInfo getCollectionInfo(final String jobId) { final CollectionInfo info = infoMap.get(jobId); if (info != null) { return info; @@ -174,36 +172,61 @@ public class CollectorService { @Scheduled(fixedRate = 30, timeUnit = TimeUnit.MINUTES) public void cronCleanJobs() throws Exception { - cleanHistory(fullInfoExpirationTime); - final Set excluded = infoMap.keySet(); - storageCleaner.cleanCollectedData(excluded); - } - public void cleanHistory() { - cleanHistory(0); - } - - private void cleanHistory(final long expirationTime) { - final Set toDelete = infoMap.entrySet() + final List toClean = collectionInfoRepository.findAll() .stream() - .filter(e -> { - final ExecutionStatus status = e.getValue().getExecutionStatus(); + .filter(info -> { + final ExecutionStatus status = info.getExecutionStatus(); return status == ExecutionStatus.COMPLETED || status == ExecutionStatus.FAILED; }) - .filter(e -> { - final LocalDateTime expDate = e.getValue().getExpirationDate(); + .filter(info -> { + final LocalDateTime expDate = info.getExpirationDate(); return expDate != null && expDate.isBefore(LocalDateTime.now()); }) - .map(e -> e.getKey()) - .collect(Collectors.toSet()); + .collect(Collectors.toList()); - log.info("Cleaning expired jobs: " + StringUtils.join(toDelete, ", ")); + for (final CollectionInfo info : toClean) { + log.info("[CLEAN] Cleaning expired job: " + info.getId()); + infoMap.remove(info.getId()); - toDelete.forEach(infoMap::remove); + info.setExecutionStatus(ExecutionStatus.EXPIRED); + collectionInfoRepository.save(info); + + cleanCollectedData(info.getStorageUrl()); + } } - public Map history() { - return infoMap; + public Map history(final boolean includeExpired) { + final Map res = new LinkedHashMap<>(); + res.putAll(history(collectionInfoRepository.findAll(), includeExpired)); + res.putAll(history(infoMap.values(), includeExpired)); + return res; + } + + private Map history(final Collection infos, final boolean includeExpired) { + return infos.stream() + .filter(info -> includeExpired || info.getExecutionStatus() != ExecutionStatus.EXPIRED) + .collect(Collectors.toMap(CollectionInfo::getId, CollectionInfo::getExecutionStatus)); + } + + public static void cleanCollectedData(final String storageUrl) throws URISyntaxException { + log.info("[CLEAN] - Deleting: " + storageUrl); + + final URI uri = new URI(storageUrl); + final String protocol = uri.getScheme(); + final String path = uri.getPath(); + final File f = new File(path); + + if (protocol.equalsIgnoreCase("zip") && path.endsWith(".zip")) { + f.delete(); + } else if (protocol.equalsIgnoreCase("file") && f.isDirectory()) { + FileSystemUtils.deleteRecursively(f); + } + } + + @Transactional + public void forceExpired() { + collectionInfoRepository.forceExpirationDate(LocalDateTime.now()); } } diff --git a/src/main/java/eu/dnetlib/apps/oai/storage/FtpStorage.java b/src/main/java/eu/dnetlib/apps/oai/storage/FtpStorage.java index 5f01634..78d5cfe 100644 --- a/src/main/java/eu/dnetlib/apps/oai/storage/FtpStorage.java +++ b/src/main/java/eu/dnetlib/apps/oai/storage/FtpStorage.java @@ -3,6 +3,7 @@ package eu.dnetlib.apps.oai.storage; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.net.URI; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -19,19 +20,21 @@ public class FtpStorage extends StorageClient { private long currentPage = -1; - public FtpStorage(final String baseDir, - final String jobId, - final String ftpServer, - final int ftpPort, - final boolean ftpSecure, + public FtpStorage(final String jobId, + final URI storageBasePath, final String ftpUser, final String ftpPassword) { - super(baseDir, jobId); - this.ftp = ftpConnect(ftpServer, ftpPort, ftpSecure); + super(jobId, storageBasePath); + + final String protocol = storageBasePath.getScheme(); + final String host = storageBasePath.getHost(); + final int port = storageBasePath.getPort(); + + this.ftp = ftpConnect(host, port, protocol.equalsIgnoreCase("ftps")); ftpLogin(ftpUser, ftpPassword); - changeDir(baseDir); + changeDir(storageBasePath.getPath()); changeDir(jobId); } @@ -123,9 +126,4 @@ public class FtpStorage extends StorageClient { } } - @Override - public String getStorageUrl() { - return getBaseDir() + "/" + getJobId(); - } - } diff --git a/src/main/java/eu/dnetlib/apps/oai/storage/LocalStorage.java b/src/main/java/eu/dnetlib/apps/oai/storage/LocalStorage.java index 3ad8696..9578e16 100644 --- a/src/main/java/eu/dnetlib/apps/oai/storage/LocalStorage.java +++ b/src/main/java/eu/dnetlib/apps/oai/storage/LocalStorage.java @@ -3,6 +3,7 @@ package eu.dnetlib.apps.oai.storage; import java.io.File; import java.io.FileWriter; import java.io.IOException; +import java.net.URI; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; @@ -19,10 +20,11 @@ public class LocalStorage extends StorageClient { private String rootDir; - public LocalStorage(final String baseDir, final String jobId) { - super(baseDir, jobId); + public LocalStorage(final String jobId, final URI storageBasePath) { + super(jobId, storageBasePath); + try { - final File d = new File(baseDir + "/" + jobId); + final File d = new File(storageBasePath.getPath() + "/" + jobId); FileUtils.forceMkdir(d); this.rootDir = d.getAbsolutePath(); } catch (final IOException e) { @@ -51,9 +53,4 @@ public class LocalStorage extends StorageClient { } } - @Override - public String getStorageUrl() { - return getBaseDir() + "/" + getJobId(); - } - } diff --git a/src/main/java/eu/dnetlib/apps/oai/storage/StorageCleaner.java b/src/main/java/eu/dnetlib/apps/oai/storage/StorageCleaner.java deleted file mode 100644 index 2597ddd..0000000 --- a/src/main/java/eu/dnetlib/apps/oai/storage/StorageCleaner.java +++ /dev/null @@ -1,49 +0,0 @@ -package eu.dnetlib.apps.oai.storage; - -import java.io.File; -import java.io.FilenameFilter; -import java.net.URI; -import java.util.Set; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Service; -import org.springframework.util.FileSystemUtils; - -@Service -public class StorageCleaner { - - private static final Log log = LogFactory.getLog(StorageCleaner.class); - - @Value("${oai.conf.storage.basePath}") - private URI storageBasePath; - - public void cleanCollectedData(final Set excluded) { - final String protocol = storageBasePath.getScheme(); - final String path = storageBasePath.getPath(); - - if (protocol.equalsIgnoreCase("zip")) { - deleteLocal(path, (f, name) -> name.endsWith(".zip"), excluded); - } else if (protocol.equalsIgnoreCase("file")) { - deleteLocal(path, (f, name) -> f.isDirectory(), excluded); - } - } - - private void deleteLocal(final String path, final FilenameFilter filter, final Set excluded) { - final File baseDir = new File(path); - - if (!baseDir.exists()) { return; } - - for (final File f : baseDir.listFiles(filter)) { - if (!excluded.contains(f.getName())) { - log.info("Delete expired file: " + f.getName()); - if (f.isDirectory()) { - FileSystemUtils.deleteRecursively(f); - } else { - f.delete(); - } - } - } - } -} diff --git a/src/main/java/eu/dnetlib/apps/oai/storage/StorageClient.java b/src/main/java/eu/dnetlib/apps/oai/storage/StorageClient.java index 2454467..aecc4f7 100644 --- a/src/main/java/eu/dnetlib/apps/oai/storage/StorageClient.java +++ b/src/main/java/eu/dnetlib/apps/oai/storage/StorageClient.java @@ -1,13 +1,15 @@ package eu.dnetlib.apps.oai.storage; +import java.net.URI; + public abstract class StorageClient { - private final String baseDir; private final String jobId; + private final URI storageBasePath; - public StorageClient(final String baseDir, final String jobId) { - this.baseDir = baseDir; + public StorageClient(final String jobId, final URI storageBasePath) { this.jobId = jobId; + this.storageBasePath = storageBasePath; } public void complete() {}; @@ -16,14 +18,8 @@ public abstract class StorageClient { abstract public void saveFile(String filename, String body); - abstract public String getStorageUrl(); - - public String getBaseDir() { - return baseDir; - } - - public String getJobId() { - return jobId; + public String getStorageUrl() { + return storageBasePath + "/" + jobId; } } diff --git a/src/main/java/eu/dnetlib/apps/oai/storage/StorageClientFactory.java b/src/main/java/eu/dnetlib/apps/oai/storage/StorageClientFactory.java index f9ccea4..f19dac7 100644 --- a/src/main/java/eu/dnetlib/apps/oai/storage/StorageClientFactory.java +++ b/src/main/java/eu/dnetlib/apps/oai/storage/StorageClientFactory.java @@ -20,18 +20,13 @@ public class StorageClientFactory { public StorageClient newClient(final String jobId) { final String protocol = storageBasePath.getScheme(); - final String host = storageBasePath.getHost(); - final int port = storageBasePath.getPort(); - final String path = storageBasePath.getPath(); - if (protocol.equalsIgnoreCase("ftp")) { - return new FtpStorage(path, jobId, host, port, false, storageUser, storagePassword); - } else if (protocol.equalsIgnoreCase("ftps")) { - return new FtpStorage(path, jobId, host, port, true, storageUser, storagePassword); + if (protocol.equalsIgnoreCase("ftp") || protocol.equalsIgnoreCase("ftps")) { + return new FtpStorage(jobId, storageBasePath, storageUser, storagePassword); } else if (protocol.equalsIgnoreCase("file")) { - return new LocalStorage(path, jobId); + return new LocalStorage(jobId, storageBasePath); } else if (protocol.equalsIgnoreCase("zip")) { - return new ZipStorage(path, jobId); + return new ZipStorage(jobId, storageBasePath); } else { throw new RuntimeException("Invalid storage protocol: " + protocol + " (valid protocol are: file, ftp and ftps)"); } diff --git a/src/main/java/eu/dnetlib/apps/oai/storage/ZipStorage.java b/src/main/java/eu/dnetlib/apps/oai/storage/ZipStorage.java index e0bb2db..a52e5d7 100644 --- a/src/main/java/eu/dnetlib/apps/oai/storage/ZipStorage.java +++ b/src/main/java/eu/dnetlib/apps/oai/storage/ZipStorage.java @@ -3,6 +3,7 @@ package eu.dnetlib.apps.oai.storage; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.net.URI; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; @@ -20,10 +21,11 @@ public class ZipStorage extends StorageClient { private ZipOutputStream zipOut; private long currPage = -1; - public ZipStorage(final String baseDir, final String jobId) { - super(baseDir, jobId); + public ZipStorage(final String jobId, final URI storageBasePath) { + super(jobId, storageBasePath); + try { - final File rootDir = new File(baseDir); + final File rootDir = new File(storageBasePath.getPath()); FileUtils.forceMkdir(rootDir); this.fos = new FileOutputStream(rootDir.getAbsolutePath() + "/" + jobId + ".zip"); @@ -72,7 +74,7 @@ public class ZipStorage extends StorageClient { @Override public String getStorageUrl() { - return getBaseDir() + "/" + getJobId() + ".zip"; + return super.getStorageUrl() + ".zip"; } } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 82d2faa..2ca1e0f 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -4,7 +4,8 @@ swagger.api_title = Oai2Ftp swagger.api_desc = API Documentation swagger.api_version = 0.0.1 -spring.datasource.url=jdbc:h2:mem: +#spring.datasource.url=jdbc:h2:mem: +spring.datasource.url=jdbc:h2:file:/tmp/oai_dumps_db spring.datasource.username= spring.datasource.password= spring.jpa.show-sql=false @@ -21,7 +22,7 @@ oai.conf.storage.password = testPwd oai.conf.public.basePath = http://localhost:8080/download -oai.conf.execution.expirationTime = 12 +oai.conf.execution.duration.hours = 72 oai.conf.maxRecords = 1000 oai.conf.enable.export.api = true