package eu.dnetlib.apps.oai2ftp.service; import java.time.Duration; import java.time.LocalDateTime; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.dom4j.Document; import org.dom4j.DocumentHelper; import org.dom4j.Node; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import eu.dnetlib.apps.oai2ftp.model.CollectionCall; import eu.dnetlib.apps.oai2ftp.model.CollectionInfo; import eu.dnetlib.apps.oai2ftp.model.ExecutionStatus; import eu.dnetlib.apps.oai2ftp.repository.CollectionLogEntryRepository; import eu.dnetlib.apps.oai2ftp.utils.HttpFetcher; import eu.dnetlib.apps.oai2ftp.utils.SimpleUtils; import eu.dnetlib.apps.oai2ftp.utils.StorageClient; import eu.dnetlib.apps.oai2ftp.utils.StorageClientFactory; @Service public class Oai2FtpService { private static final Log log = LogFactory.getLog(Oai2FtpService.class); private final ExecutorService jobExecutor = Executors.newFixedThreadPool(100); private final Map infoMap = new LinkedHashMap<>(); @Autowired private StorageClientFactory storageClientFactory; @Value("${oai2ftp.conf.execution.expirationTime}") private long fullInfoExpirationTime; // in hours @Autowired private CollectionLogEntryRepository collectionLogEntryRepository; public CollectionInfo startCollection(final String baseUrl, final String format, final String setSpec, final LocalDateTime from, final LocalDateTime until) { final String jobId = SimpleUtils.generateNewJobId(); final StorageClient sc = storageClientFactory.newClientForJob(jobId); final CollectionInfo info = new CollectionInfo(); info.setId(jobId); info.setOaiBaseUrl(baseUrl); info.setOaiFormat(format); info.setOaiSet(setSpec); info.setOaiFrom(from); info.setOaiUntil(until); info.setStart(LocalDateTime.now()); info.setEnd(null); info.setExecutionStatus(ExecutionStatus.READY); info.setTotal(0); info.setMessage(""); infoMap.put(jobId, info); jobExecutor.execute(() -> { try { info.setExecutionStatus(ExecutionStatus.RUNNING); oaiCollect(baseUrl, format, setSpec, from, until, sc, info); info.setExecutionStatus(ExecutionStatus.COMPLETED); } catch (final Throwable e) { info.setExecutionStatus(ExecutionStatus.FAILED); info.setMessage(e.getMessage() + ": " + ExceptionUtils.getStackTrace(e)); } finally { info.setEnd(LocalDateTime.now()); sc.disconnect(); collectionLogEntryRepository.save(SimpleUtils.infoToLog(info)); } }); return info; } public void oaiCollect(final String baseUrl, final String format, final String setSpec, final LocalDateTime from, final LocalDateTime until, final StorageClient sc, final CollectionInfo info) throws Exception { String url = SimpleUtils.oaiFirstUrl(baseUrl, format, setSpec, from, until); while (StringUtils.isNotBlank(url)) { final CollectionCall call = new CollectionCall(); call.setUrl(url); info.getCalls().add(call); final String xml = HttpFetcher.download(call); final Document doc = DocumentHelper.parseText(xml); final List records = doc.selectNodes("//*[local-name()='ListRecords']/*[local-name()='record']"); call.setNumberOfRecords(records.size()); for (final Node n : records) { final String id = n.valueOf(".//*[local-name()='header']/*[local-name()='identifier']"); sc.saveFile(SimpleUtils.oaiIdToFilename(id), n.asXML()); info.setTotal(info.getTotal() + 1); } final String rtoken = doc.valueOf("//*[local-name()='resumptionToken']").trim(); url = SimpleUtils.oaiNextUrl(baseUrl, rtoken); } } public CollectionInfo getStatus(final String jobId) { final CollectionInfo info = infoMap.get(jobId); if (info != null) { return info; } else { return collectionLogEntryRepository.findById(jobId) .map(SimpleUtils::logToInfo) .orElseThrow(() -> new RuntimeException("Invalid id: " + jobId)); } } @Scheduled(fixedRate = 30, timeUnit = TimeUnit.MINUTES) public void cronCleanJobs() throws Exception { final Set toDelete = infoMap.entrySet() .stream() .filter(e -> { final ExecutionStatus status = e.getValue().getExecutionStatus(); return status == ExecutionStatus.COMPLETED || status == ExecutionStatus.FAILED; }) .filter(e -> { final LocalDateTime end = e.getValue().getEnd(); final long hours = Duration.between(end, LocalDateTime.now()).toHours(); return Math.abs(hours) > fullInfoExpirationTime; }) .map(e -> e.getKey()) .collect(Collectors.toSet()); log.info("Cleaning expired jobs: " + StringUtils.join(toDelete, ", ")); toDelete.forEach(infoMap::remove); } }