diff --git a/pom.xml b/pom.xml
index a3c7768..ec12627 100644
--- a/pom.xml
+++ b/pom.xml
@@ -9,10 +9,10 @@
eu.dnetlib.apps
- Oai2ftp
+ OaiCollector
0.0.1-SNAPSHOT
- Oai2ftp
- Spring Boot Application that collects metadata from a generic OAI publisher and stores them in a FTP server
+ OaiCollector
+ Spring Boot Application that collects metadata from a generic OAI publisher
17
diff --git a/src/main/java/eu/dnetlib/apps/oai/controller/ApiController.java b/src/main/java/eu/dnetlib/apps/oai/controller/ApiController.java
index d047e97..8f500a0 100644
--- a/src/main/java/eu/dnetlib/apps/oai/controller/ApiController.java
+++ b/src/main/java/eu/dnetlib/apps/oai/controller/ApiController.java
@@ -2,7 +2,6 @@ package eu.dnetlib.apps.oai.controller;
import java.time.LocalDateTime;
import java.util.Map;
-import java.util.stream.Collectors;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
@@ -38,28 +37,28 @@ public class ApiController {
@RequestParam(required = false) final String oaiSet,
@RequestParam(required = false) final LocalDateTime oaiFrom,
@RequestParam(required = false) final LocalDateTime oaiUntil,
- @RequestParam(required = false) final Long max,
+ @RequestParam(required = false, defaultValue = "${oai.conf.maxRecords}") final Long max,
@RequestParam(required = false) final String notificationEmail) {
return service.startCollection(oaiBaseUrl, oaiFormat, oaiSet, oaiFrom, oaiUntil, max, notificationEmail);
}
@GetMapping("/history/{id}")
- public CollectionInfo getExecutionStatus(@PathVariable final String id) {
- return service.getStatus(id);
+ public CollectionInfo getCollectionInfo(@PathVariable final String id) {
+ return service.getCollectionInfo(id);
}
@GetMapping("/history")
- public Map history() {
- return service.history()
- .entrySet()
- .stream()
- .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().getExecutionStatus()));
+ public Map history(@RequestParam(required = false, defaultValue = "false") final boolean all) {
+ return service.history(all);
}
@DeleteMapping("/history")
- public Map cleanHistory() {
- service.cleanHistory();
- return history();
+ public Map cleanHistory(@RequestParam(required = false, defaultValue = "false") final boolean all) throws Exception {
+ if (all) {
+ service.forceExpired();
+ }
+ service.cronCleanJobs();
+ return history(true);
}
@ExceptionHandler(Exception.class)
diff --git a/src/main/java/eu/dnetlib/apps/oai/controller/ZipExporterController.java b/src/main/java/eu/dnetlib/apps/oai/controller/ZipExporterController.java
index 7e3543d..2d6d26d 100644
--- a/src/main/java/eu/dnetlib/apps/oai/controller/ZipExporterController.java
+++ b/src/main/java/eu/dnetlib/apps/oai/controller/ZipExporterController.java
@@ -6,6 +6,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
+import org.springframework.http.ContentDisposition;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
@@ -26,12 +27,12 @@ public class ZipExporterController {
public ResponseEntity download(@PathVariable final String id) {
final HttpHeaders httpHeaders = new HttpHeaders();
- final File file = new File(service.getStatus(id).getStorageUrl());
+ final File file = new File(service.getCollectionInfo(id).getStorageUrl());
final FileSystemResource resource = new FileSystemResource(file);
- httpHeaders.set(HttpHeaders.LAST_MODIFIED, String.valueOf(file.lastModified()));
- httpHeaders.set(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + file.getName() + "\"");
- httpHeaders.set(HttpHeaders.CONTENT_LENGTH, String.valueOf(file.length()));
+ httpHeaders.setContentDisposition(ContentDisposition.attachment().filename(file.getName()).build());
+ httpHeaders.setLastModified(file.lastModified());
+ httpHeaders.setContentLength(file.length());
return ResponseEntity.ok()
.headers(httpHeaders)
diff --git a/src/main/java/eu/dnetlib/apps/oai/model/ExecutionStatus.java b/src/main/java/eu/dnetlib/apps/oai/model/ExecutionStatus.java
index 10b331e..a066f23 100644
--- a/src/main/java/eu/dnetlib/apps/oai/model/ExecutionStatus.java
+++ b/src/main/java/eu/dnetlib/apps/oai/model/ExecutionStatus.java
@@ -4,5 +4,6 @@ public enum ExecutionStatus {
READY,
RUNNING,
COMPLETED,
- FAILED
+ FAILED,
+ EXPIRED
}
diff --git a/src/main/java/eu/dnetlib/apps/oai/repository/CollectionInfoRepository.java b/src/main/java/eu/dnetlib/apps/oai/repository/CollectionInfoRepository.java
index 96ade73..8a6553a 100644
--- a/src/main/java/eu/dnetlib/apps/oai/repository/CollectionInfoRepository.java
+++ b/src/main/java/eu/dnetlib/apps/oai/repository/CollectionInfoRepository.java
@@ -1,9 +1,16 @@
package eu.dnetlib.apps.oai.repository;
+import java.time.LocalDateTime;
+
import org.springframework.data.jpa.repository.JpaRepository;
+import org.springframework.data.jpa.repository.Modifying;
+import org.springframework.data.jpa.repository.Query;
import eu.dnetlib.apps.oai.model.CollectionInfo;
public interface CollectionInfoRepository extends JpaRepository {
+ @Modifying
+ @Query("update CollectionInfo c set c.expirationDate = ?1 where c.expirationDate > ?1")
+ void forceExpirationDate(LocalDateTime date);
}
diff --git a/src/main/java/eu/dnetlib/apps/oai/service/CollectorService.java b/src/main/java/eu/dnetlib/apps/oai/service/CollectorService.java
index 2f7e016..4472798 100644
--- a/src/main/java/eu/dnetlib/apps/oai/service/CollectorService.java
+++ b/src/main/java/eu/dnetlib/apps/oai/service/CollectorService.java
@@ -1,10 +1,13 @@
package eu.dnetlib.apps.oai.service;
+import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.time.LocalDateTime;
+import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -21,17 +24,18 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
+import org.springframework.util.FileSystemUtils;
import eu.dnetlib.apps.oai.model.CollectionCall;
import eu.dnetlib.apps.oai.model.CollectionInfo;
import eu.dnetlib.apps.oai.model.ExecutionStatus;
import eu.dnetlib.apps.oai.repository.CollectionInfoRepository;
-import eu.dnetlib.apps.oai.storage.StorageCleaner;
import eu.dnetlib.apps.oai.storage.StorageClient;
import eu.dnetlib.apps.oai.storage.StorageClientFactory;
import eu.dnetlib.apps.oai.utils.EmailSender;
import eu.dnetlib.apps.oai.utils.HttpFetcher;
import eu.dnetlib.apps.oai.utils.SimpleUtils;
+import jakarta.transaction.Transactional;
@Service
public class CollectorService {
@@ -45,14 +49,8 @@ public class CollectorService {
@Autowired
private StorageClientFactory storageClientFactory;
- @Autowired
- private StorageCleaner storageCleaner;
-
- @Value("${oai.conf.execution.expirationTime}")
- private long fullInfoExpirationTime; // in hours
-
- @Value("${oai.conf.maxRecords}")
- private long defaultMaxRecords;
+ @Value("${oai.conf.execution.duration.hours}")
+ private long executionDuration;
@Autowired
private CollectionInfoRepository collectionInfoRepository;
@@ -93,7 +91,7 @@ public class CollectorService {
if (max != null && max > 0) {
info.setMax(max);
} else {
- info.setMax(defaultMaxRecords);
+ info.setMax(Long.MAX_VALUE);
}
infoMap.put(jobId, info);
@@ -105,7 +103,7 @@ public class CollectorService {
oaiCollect(sc, info);
info.setExecutionStatus(ExecutionStatus.COMPLETED);
info.setEnd(now);
- info.setExpirationDate(now.plusHours(fullInfoExpirationTime));
+ info.setExpirationDate(now.plusHours(executionDuration));
emailSender.notifySuccess(info);
} catch (final Throwable e) {
info.setExecutionStatus(ExecutionStatus.FAILED);
@@ -162,7 +160,7 @@ public class CollectorService {
}
}
- public CollectionInfo getStatus(final String jobId) {
+ public CollectionInfo getCollectionInfo(final String jobId) {
final CollectionInfo info = infoMap.get(jobId);
if (info != null) {
return info;
@@ -174,36 +172,61 @@ public class CollectorService {
@Scheduled(fixedRate = 30, timeUnit = TimeUnit.MINUTES)
public void cronCleanJobs() throws Exception {
- cleanHistory(fullInfoExpirationTime);
- final Set excluded = infoMap.keySet();
- storageCleaner.cleanCollectedData(excluded);
- }
- public void cleanHistory() {
- cleanHistory(0);
- }
-
- private void cleanHistory(final long expirationTime) {
- final Set toDelete = infoMap.entrySet()
+ final List toClean = collectionInfoRepository.findAll()
.stream()
- .filter(e -> {
- final ExecutionStatus status = e.getValue().getExecutionStatus();
+ .filter(info -> {
+ final ExecutionStatus status = info.getExecutionStatus();
return status == ExecutionStatus.COMPLETED || status == ExecutionStatus.FAILED;
})
- .filter(e -> {
- final LocalDateTime expDate = e.getValue().getExpirationDate();
+ .filter(info -> {
+ final LocalDateTime expDate = info.getExpirationDate();
return expDate != null && expDate.isBefore(LocalDateTime.now());
})
- .map(e -> e.getKey())
- .collect(Collectors.toSet());
+ .collect(Collectors.toList());
- log.info("Cleaning expired jobs: " + StringUtils.join(toDelete, ", "));
+ for (final CollectionInfo info : toClean) {
+ log.info("[CLEAN] Cleaning expired job: " + info.getId());
+ infoMap.remove(info.getId());
- toDelete.forEach(infoMap::remove);
+ info.setExecutionStatus(ExecutionStatus.EXPIRED);
+ collectionInfoRepository.save(info);
+
+ cleanCollectedData(info.getStorageUrl());
+ }
}
- public Map history() {
- return infoMap;
+ public Map history(final boolean includeExpired) {
+ final Map res = new LinkedHashMap<>();
+ res.putAll(history(collectionInfoRepository.findAll(), includeExpired));
+ res.putAll(history(infoMap.values(), includeExpired));
+ return res;
+ }
+
+ private Map history(final Collection infos, final boolean includeExpired) {
+ return infos.stream()
+ .filter(info -> includeExpired || info.getExecutionStatus() != ExecutionStatus.EXPIRED)
+ .collect(Collectors.toMap(CollectionInfo::getId, CollectionInfo::getExecutionStatus));
+ }
+
+ public static void cleanCollectedData(final String storageUrl) throws URISyntaxException {
+ log.info("[CLEAN] - Deleting: " + storageUrl);
+
+ final URI uri = new URI(storageUrl);
+ final String protocol = uri.getScheme();
+ final String path = uri.getPath();
+ final File f = new File(path);
+
+ if (protocol.equalsIgnoreCase("zip") && path.endsWith(".zip")) {
+ f.delete();
+ } else if (protocol.equalsIgnoreCase("file") && f.isDirectory()) {
+ FileSystemUtils.deleteRecursively(f);
+ }
+ }
+
+ @Transactional
+ public void forceExpired() {
+ collectionInfoRepository.forceExpirationDate(LocalDateTime.now());
}
}
diff --git a/src/main/java/eu/dnetlib/apps/oai/storage/FtpStorage.java b/src/main/java/eu/dnetlib/apps/oai/storage/FtpStorage.java
index 5f01634..78d5cfe 100644
--- a/src/main/java/eu/dnetlib/apps/oai/storage/FtpStorage.java
+++ b/src/main/java/eu/dnetlib/apps/oai/storage/FtpStorage.java
@@ -3,6 +3,7 @@ package eu.dnetlib.apps.oai.storage;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.net.URI;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -19,19 +20,21 @@ public class FtpStorage extends StorageClient {
private long currentPage = -1;
- public FtpStorage(final String baseDir,
- final String jobId,
- final String ftpServer,
- final int ftpPort,
- final boolean ftpSecure,
+ public FtpStorage(final String jobId,
+ final URI storageBasePath,
final String ftpUser,
final String ftpPassword) {
- super(baseDir, jobId);
- this.ftp = ftpConnect(ftpServer, ftpPort, ftpSecure);
+ super(jobId, storageBasePath);
+
+ final String protocol = storageBasePath.getScheme();
+ final String host = storageBasePath.getHost();
+ final int port = storageBasePath.getPort();
+
+ this.ftp = ftpConnect(host, port, protocol.equalsIgnoreCase("ftps"));
ftpLogin(ftpUser, ftpPassword);
- changeDir(baseDir);
+ changeDir(storageBasePath.getPath());
changeDir(jobId);
}
@@ -123,9 +126,4 @@ public class FtpStorage extends StorageClient {
}
}
- @Override
- public String getStorageUrl() {
- return getBaseDir() + "/" + getJobId();
- }
-
}
diff --git a/src/main/java/eu/dnetlib/apps/oai/storage/LocalStorage.java b/src/main/java/eu/dnetlib/apps/oai/storage/LocalStorage.java
index 3ad8696..9578e16 100644
--- a/src/main/java/eu/dnetlib/apps/oai/storage/LocalStorage.java
+++ b/src/main/java/eu/dnetlib/apps/oai/storage/LocalStorage.java
@@ -3,6 +3,7 @@ package eu.dnetlib.apps.oai.storage;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
+import java.net.URI;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
@@ -19,10 +20,11 @@ public class LocalStorage extends StorageClient {
private String rootDir;
- public LocalStorage(final String baseDir, final String jobId) {
- super(baseDir, jobId);
+ public LocalStorage(final String jobId, final URI storageBasePath) {
+ super(jobId, storageBasePath);
+
try {
- final File d = new File(baseDir + "/" + jobId);
+ final File d = new File(storageBasePath.getPath() + "/" + jobId);
FileUtils.forceMkdir(d);
this.rootDir = d.getAbsolutePath();
} catch (final IOException e) {
@@ -51,9 +53,4 @@ public class LocalStorage extends StorageClient {
}
}
- @Override
- public String getStorageUrl() {
- return getBaseDir() + "/" + getJobId();
- }
-
}
diff --git a/src/main/java/eu/dnetlib/apps/oai/storage/StorageCleaner.java b/src/main/java/eu/dnetlib/apps/oai/storage/StorageCleaner.java
deleted file mode 100644
index 2597ddd..0000000
--- a/src/main/java/eu/dnetlib/apps/oai/storage/StorageCleaner.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package eu.dnetlib.apps.oai.storage;
-
-import java.io.File;
-import java.io.FilenameFilter;
-import java.net.URI;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Service;
-import org.springframework.util.FileSystemUtils;
-
-@Service
-public class StorageCleaner {
-
- private static final Log log = LogFactory.getLog(StorageCleaner.class);
-
- @Value("${oai.conf.storage.basePath}")
- private URI storageBasePath;
-
- public void cleanCollectedData(final Set excluded) {
- final String protocol = storageBasePath.getScheme();
- final String path = storageBasePath.getPath();
-
- if (protocol.equalsIgnoreCase("zip")) {
- deleteLocal(path, (f, name) -> name.endsWith(".zip"), excluded);
- } else if (protocol.equalsIgnoreCase("file")) {
- deleteLocal(path, (f, name) -> f.isDirectory(), excluded);
- }
- }
-
- private void deleteLocal(final String path, final FilenameFilter filter, final Set excluded) {
- final File baseDir = new File(path);
-
- if (!baseDir.exists()) { return; }
-
- for (final File f : baseDir.listFiles(filter)) {
- if (!excluded.contains(f.getName())) {
- log.info("Delete expired file: " + f.getName());
- if (f.isDirectory()) {
- FileSystemUtils.deleteRecursively(f);
- } else {
- f.delete();
- }
- }
- }
- }
-}
diff --git a/src/main/java/eu/dnetlib/apps/oai/storage/StorageClient.java b/src/main/java/eu/dnetlib/apps/oai/storage/StorageClient.java
index 2454467..aecc4f7 100644
--- a/src/main/java/eu/dnetlib/apps/oai/storage/StorageClient.java
+++ b/src/main/java/eu/dnetlib/apps/oai/storage/StorageClient.java
@@ -1,13 +1,15 @@
package eu.dnetlib.apps.oai.storage;
+import java.net.URI;
+
public abstract class StorageClient {
- private final String baseDir;
private final String jobId;
+ private final URI storageBasePath;
- public StorageClient(final String baseDir, final String jobId) {
- this.baseDir = baseDir;
+ public StorageClient(final String jobId, final URI storageBasePath) {
this.jobId = jobId;
+ this.storageBasePath = storageBasePath;
}
public void complete() {};
@@ -16,14 +18,8 @@ public abstract class StorageClient {
abstract public void saveFile(String filename, String body);
- abstract public String getStorageUrl();
-
- public String getBaseDir() {
- return baseDir;
- }
-
- public String getJobId() {
- return jobId;
+ public String getStorageUrl() {
+ return storageBasePath + "/" + jobId;
}
}
diff --git a/src/main/java/eu/dnetlib/apps/oai/storage/StorageClientFactory.java b/src/main/java/eu/dnetlib/apps/oai/storage/StorageClientFactory.java
index f9ccea4..f19dac7 100644
--- a/src/main/java/eu/dnetlib/apps/oai/storage/StorageClientFactory.java
+++ b/src/main/java/eu/dnetlib/apps/oai/storage/StorageClientFactory.java
@@ -20,18 +20,13 @@ public class StorageClientFactory {
public StorageClient newClient(final String jobId) {
final String protocol = storageBasePath.getScheme();
- final String host = storageBasePath.getHost();
- final int port = storageBasePath.getPort();
- final String path = storageBasePath.getPath();
- if (protocol.equalsIgnoreCase("ftp")) {
- return new FtpStorage(path, jobId, host, port, false, storageUser, storagePassword);
- } else if (protocol.equalsIgnoreCase("ftps")) {
- return new FtpStorage(path, jobId, host, port, true, storageUser, storagePassword);
+ if (protocol.equalsIgnoreCase("ftp") || protocol.equalsIgnoreCase("ftps")) {
+ return new FtpStorage(jobId, storageBasePath, storageUser, storagePassword);
} else if (protocol.equalsIgnoreCase("file")) {
- return new LocalStorage(path, jobId);
+ return new LocalStorage(jobId, storageBasePath);
} else if (protocol.equalsIgnoreCase("zip")) {
- return new ZipStorage(path, jobId);
+ return new ZipStorage(jobId, storageBasePath);
} else {
throw new RuntimeException("Invalid storage protocol: " + protocol + " (valid protocol are: file, ftp and ftps)");
}
diff --git a/src/main/java/eu/dnetlib/apps/oai/storage/ZipStorage.java b/src/main/java/eu/dnetlib/apps/oai/storage/ZipStorage.java
index e0bb2db..a52e5d7 100644
--- a/src/main/java/eu/dnetlib/apps/oai/storage/ZipStorage.java
+++ b/src/main/java/eu/dnetlib/apps/oai/storage/ZipStorage.java
@@ -3,6 +3,7 @@ package eu.dnetlib.apps.oai.storage;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.net.URI;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
@@ -20,10 +21,11 @@ public class ZipStorage extends StorageClient {
private ZipOutputStream zipOut;
private long currPage = -1;
- public ZipStorage(final String baseDir, final String jobId) {
- super(baseDir, jobId);
+ public ZipStorage(final String jobId, final URI storageBasePath) {
+ super(jobId, storageBasePath);
+
try {
- final File rootDir = new File(baseDir);
+ final File rootDir = new File(storageBasePath.getPath());
FileUtils.forceMkdir(rootDir);
this.fos = new FileOutputStream(rootDir.getAbsolutePath() + "/" + jobId + ".zip");
@@ -72,7 +74,7 @@ public class ZipStorage extends StorageClient {
@Override
public String getStorageUrl() {
- return getBaseDir() + "/" + getJobId() + ".zip";
+ return super.getStorageUrl() + ".zip";
}
}
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
index 82d2faa..2ca1e0f 100644
--- a/src/main/resources/application.properties
+++ b/src/main/resources/application.properties
@@ -4,7 +4,8 @@ swagger.api_title = Oai2Ftp
swagger.api_desc = API Documentation
swagger.api_version = 0.0.1
-spring.datasource.url=jdbc:h2:mem:
+#spring.datasource.url=jdbc:h2:mem:
+spring.datasource.url=jdbc:h2:file:/tmp/oai_dumps_db
spring.datasource.username=
spring.datasource.password=
spring.jpa.show-sql=false
@@ -21,7 +22,7 @@ oai.conf.storage.password = testPwd
oai.conf.public.basePath = http://localhost:8080/download
-oai.conf.execution.expirationTime = 12
+oai.conf.execution.duration.hours = 72
oai.conf.maxRecords = 1000
oai.conf.enable.export.api = true