simpleOaiCollectorService/src/main/java/eu/dnetlib/apps/oai/service/CollectorService.java

233 lines
7.2 KiB
Java

package eu.dnetlib.apps.oai.service;
import java.io.File;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.dom4j.Document;
import org.dom4j.DocumentHelper;
import org.dom4j.Node;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.util.FileSystemUtils;
import eu.dnetlib.apps.oai.model.CollectionCall;
import eu.dnetlib.apps.oai.model.CollectionInfo;
import eu.dnetlib.apps.oai.model.ExecutionStatus;
import eu.dnetlib.apps.oai.repository.CollectionInfoRepository;
import eu.dnetlib.apps.oai.storage.StorageClient;
import eu.dnetlib.apps.oai.storage.StorageClientFactory;
import eu.dnetlib.apps.oai.utils.EmailSender;
import eu.dnetlib.apps.oai.utils.HttpFetcher;
import eu.dnetlib.apps.oai.utils.SimpleUtils;
import jakarta.transaction.Transactional;
@Service
public class CollectorService {
private static final Log log = LogFactory.getLog(CollectorService.class);
private final ExecutorService jobExecutor = Executors.newFixedThreadPool(100);
private final Map<String, CollectionInfo> infoMap = new LinkedHashMap<>();
@Autowired
private StorageClientFactory storageClientFactory;
@Value("${oai.conf.execution.duration.hours}")
private long executionDuration;
@Autowired
private CollectionInfoRepository collectionInfoRepository;
@Value("${oai.conf.public.basePath}")
private String publicBasePath;
@Autowired
private EmailSender emailSender;
public CollectionInfo startCollection(final String baseUrl,
final String format,
final String setSpec,
final LocalDateTime from,
final LocalDateTime until,
final Long max,
final String notificationEmail) {
final String jobId = SimpleUtils.generateNewJobId();
final StorageClient sc = storageClientFactory.newClient(jobId);
final CollectionInfo info = new CollectionInfo();
info.setId(jobId);
info.setOaiBaseUrl(baseUrl);
info.setOaiFormat(format);
info.setOaiSet(setSpec);
info.setOaiFrom(from);
info.setOaiUntil(until);
info.setStorageUrl(sc.getStorageUrl());
info.setPublicUrl(null);
info.setStart(LocalDateTime.now());
info.setExecutionStatus(ExecutionStatus.READY);
if (StringUtils.isNotBlank(notificationEmail)) {
info.setNotificationEmail(notificationEmail);
}
if (max != null && max > 0) {
info.setMax(max);
} else {
info.setMax(Long.MAX_VALUE);
}
infoMap.put(jobId, info);
jobExecutor.execute(() -> {
try {
info.setExecutionStatus(ExecutionStatus.RUNNING);
oaiCollect(sc, info);
info.setExecutionStatus(ExecutionStatus.COMPLETED);
if (StringUtils.isNotBlank(publicBasePath)) {
info.setPublicUrl(publicBasePath + "/" + jobId);
}
} catch (final Throwable e) {
info.setExecutionStatus(ExecutionStatus.FAILED);
info.setMessage(e.getMessage() + ": " + ExceptionUtils.getStackTrace(e));
} finally {
sc.complete();
final LocalDateTime now = LocalDateTime.now();
info.setEnd(now);
info.setExpirationDate(now.plusHours(executionDuration));
collectionInfoRepository.save(info);
emailSender.sendNotification(info);
}
});
return info;
}
public void oaiCollect(final StorageClient sc, final CollectionInfo info) throws Exception {
final String baseUrl = info.getOaiBaseUrl();
String url = SimpleUtils.oaiFirstUrl(baseUrl, info.getOaiFormat(), info.getOaiSet(), info.getOaiFrom(), info.getOaiUntil());
long count = 1;
while (StringUtils.isNotBlank(url)) {
final CollectionCall call = new CollectionCall();
call.setUrl(url);
info.getCalls().add(call);
final String xml = HttpFetcher.download(call);
final Document doc = DocumentHelper.parseText(xml);
final List<Node> records = doc.selectNodes("//*[local-name()='ListRecords']/*[local-name()='record']");
call.setNumberOfRecords(records.size());
sc.prepareCurrentPage(count++);
for (final Node n : records) {
if (info.getTotal() < info.getMax()) {
final String id = n.valueOf(".//*[local-name()='header']/*[local-name()='identifier']");
sc.saveFile(SimpleUtils.oaiIdToFilename(id), n.asXML());
info.setTotal(info.getTotal() + 1);
}
}
if (info.getTotal() < info.getMax()) {
final String rtoken = doc.valueOf("//*[local-name()='resumptionToken']").trim();
url = SimpleUtils.oaiNextUrl(baseUrl, rtoken);
} else {
url = "";
}
}
}
public CollectionInfo getCollectionInfo(final String jobId) {
final CollectionInfo info = infoMap.get(jobId);
if (info != null) {
return info;
} else {
return collectionInfoRepository.findById(jobId)
.orElseThrow(() -> new RuntimeException("Invalid id: " + jobId));
}
}
@Scheduled(fixedRate = 30, timeUnit = TimeUnit.MINUTES)
public void cronCleanJobs() throws Exception {
final List<CollectionInfo> toClean = collectionInfoRepository.findAll()
.stream()
.filter(info -> {
final ExecutionStatus status = info.getExecutionStatus();
return status == ExecutionStatus.COMPLETED || status == ExecutionStatus.FAILED;
})
.filter(info -> {
final LocalDateTime expDate = info.getExpirationDate();
return expDate != null && expDate.isBefore(LocalDateTime.now());
})
.collect(Collectors.toList());
for (final CollectionInfo info : toClean) {
log.info("[CLEAN] Cleaning expired job: " + info.getId());
infoMap.remove(info.getId());
info.setExecutionStatus(ExecutionStatus.EXPIRED);
collectionInfoRepository.save(info);
cleanCollectedData(info.getStorageUrl());
}
}
public Map<String, ExecutionStatus> history(final boolean includeExpired) {
final Map<String, ExecutionStatus> res = new LinkedHashMap<>();
res.putAll(history(collectionInfoRepository.findAll(), includeExpired));
res.putAll(history(infoMap.values(), includeExpired));
return res;
}
private Map<String, ExecutionStatus> history(final Collection<CollectionInfo> infos, final boolean includeExpired) {
return infos.stream()
.filter(info -> includeExpired || info.getExecutionStatus() != ExecutionStatus.EXPIRED)
.collect(Collectors.toMap(CollectionInfo::getId, CollectionInfo::getExecutionStatus));
}
public static void cleanCollectedData(final String storageUrl) throws URISyntaxException {
log.info("[CLEAN] Deleting expired storage: " + storageUrl);
final URI uri = new URI(storageUrl);
final String protocol = uri.getScheme();
final String path = uri.getPath();
final File f = new File(path);
if (protocol.equalsIgnoreCase("zip") && path.endsWith(".zip")) {
f.delete();
} else if (protocol.equalsIgnoreCase("file") && f.isDirectory()) {
FileSystemUtils.deleteRecursively(f);
}
}
@Transactional
public void forceExpired() {
collectionInfoRepository.forceExpirationDate(LocalDateTime.now());
}
}