simpleOaiCollectorService/src/main/java/eu/dnetlib/apps/Oai2ftp/service/Oai2FtpService.java

165 lines
5.1 KiB
Java

package eu.dnetlib.apps.oai2ftp.service;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.dom4j.Document;
import org.dom4j.DocumentHelper;
import org.dom4j.Node;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import eu.dnetlib.apps.oai2ftp.model.CollectionCall;
import eu.dnetlib.apps.oai2ftp.model.CollectionInfo;
import eu.dnetlib.apps.oai2ftp.model.ExecutionStatus;
import eu.dnetlib.apps.oai2ftp.repository.CollectionLogEntryRepository;
import eu.dnetlib.apps.oai2ftp.utils.FtpClientFactory;
import eu.dnetlib.apps.oai2ftp.utils.FtpClientWrapper;
import eu.dnetlib.apps.oai2ftp.utils.HttpFetcher;
import eu.dnetlib.apps.oai2ftp.utils.SimpleUtils;
@Service
public class Oai2FtpService {
private static final Log log = LogFactory.getLog(Oai2FtpService.class);
private final ExecutorService jobExecutor = Executors.newFixedThreadPool(100);
private final Map<String, CollectionInfo> infoMap = new LinkedHashMap<>();
@Autowired
private FtpClientFactory ftpClientFactory;
@Value("${oai2ftp.conf.execution.expirationTime}")
private long fullInfoExpirationTime; // in hours
@Autowired
private CollectionLogEntryRepository collectionLogEntryRepository;
public CollectionInfo startCollection(final String baseUrl,
final String format,
final String setSpec,
final LocalDateTime from,
final LocalDateTime until) {
final String jobId = SimpleUtils.generateNewJobId();
final FtpClientWrapper ftp = ftpClientFactory.newClientForJob(jobId);
final CollectionInfo info = new CollectionInfo();
info.setId(jobId);
info.setOaiBaseUrl(baseUrl);
info.setOaiFormat(format);
info.setOaiSet(setSpec);
info.setOaiFrom(from);
info.setOaiUntil(until);
info.setStart(LocalDateTime.now());
info.setEnd(null);
info.setExecutionStatus(ExecutionStatus.READY);
info.setTotal(0);
info.setMessage("");
infoMap.put(jobId, info);
jobExecutor.execute(() -> {
try {
info.setExecutionStatus(ExecutionStatus.RUNNING);
oaiCollect(baseUrl, format, setSpec, from, until, ftp, info);
info.setExecutionStatus(ExecutionStatus.COMPLETED);
} catch (final Throwable e) {
info.setExecutionStatus(ExecutionStatus.FAILED);
info.setMessage(e.getMessage() + ": " + ExceptionUtils.getStackTrace(e));
} finally {
info.setEnd(LocalDateTime.now());
ftp.disconnect();
collectionLogEntryRepository.save(SimpleUtils.infoToLog(info));
}
});
return info;
}
public void oaiCollect(final String baseUrl,
final String format,
final String setSpec,
final LocalDateTime from,
final LocalDateTime until,
final FtpClientWrapper ftp,
final CollectionInfo info)
throws Exception {
String url = SimpleUtils.oaiFirstUrl(baseUrl, format, setSpec, from, until);
while (StringUtils.isNotBlank(url)) {
final CollectionCall call = new CollectionCall();
call.setUrl(url);
info.getCalls().add(call);
final String xml = HttpFetcher.download(call);
final Document doc = DocumentHelper.parseText(xml);
final List<Node> records = doc.selectNodes("//*[local-name()='ListRecords']/*[local-name()='record']");
call.setNumberOfRecords(records.size());
for (final Node n : records) {
final String id = n.valueOf("/*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']");
ftp.saveFile(SimpleUtils.oaiIdToFilename(id), n.asXML());
info.setTotal(info.getTotal() + 1);
}
final String rtoken = doc.valueOf("//*[local-name()='resumptionToken']").trim();
url = SimpleUtils.oaiNextUrl(baseUrl, rtoken);
}
}
public CollectionInfo getStatus(final String jobId) {
final CollectionInfo info = infoMap.get(jobId);
if (info != null) {
return info;
} else {
return collectionLogEntryRepository.findById(jobId)
.map(SimpleUtils::logToInfo)
.orElseThrow(() -> new RuntimeException("Invalid id: " + jobId));
}
}
@Scheduled(fixedRate = 30, timeUnit = TimeUnit.MINUTES)
public void cronCleanJobs() throws Exception {
final Set<String> toDelete = infoMap.entrySet()
.stream()
.filter(e -> {
final ExecutionStatus status = e.getValue().getExecutionStatus();
return status == ExecutionStatus.COMPLETED || status == ExecutionStatus.FAILED;
})
.filter(e -> {
final LocalDateTime end = e.getValue().getEnd();
final long hours = Duration.between(end, LocalDateTime.now()).toHours();
return Math.abs(hours) > fullInfoExpirationTime;
})
.map(e -> e.getKey())
.collect(Collectors.toSet());
log.info("Cleaning expired jobs: " + StringUtils.join(toDelete, ", "));
toDelete.forEach(infoMap::remove);
}
}