dnet-docker/dnet-app/apps/dnet-wf-aggregation-postgres/src/main/java/eu/dnetlib/wfs/nodes/MdCollectIncrementalJobNode...

103 lines
3.3 KiB
Java

package eu.dnetlib.wfs.nodes;
import java.time.LocalDateTime;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.DocumentHelper;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.common.clients.DnetServiceClientFactory;
import eu.dnetlib.common.clients.DsmClient;
import eu.dnetlib.common.clients.MDStoreManagerClient;
import eu.dnetlib.common.mdstores.backends.sql.MDStoreSqlBackend;
import eu.dnetlib.domain.dsm.Api;
import eu.dnetlib.domain.dsm.Datasource;
import eu.dnetlib.domain.mdstore.MDStoreVersion;
import eu.dnetlib.domain.mdstore.MDStoreWithInfo;
import eu.dnetlib.domain.mdstore.records.MetadataRecord;
import eu.dnetlib.wfs.annotations.WfInputParam;
import eu.dnetlib.wfs.annotations.WfNode;
import eu.dnetlib.wfs.collector.CollectorService;
import eu.dnetlib.wfs.utils.MdBuilderFactory;
import eu.dnetlib.wfs.utils.XpathFilterFactory;
import jakarta.transaction.Transactional;
@WfNode("md_collect_incremental")
public class MdCollectIncrementalJobNode extends ProcessNode {
@WfInputParam
private Datasource ds;
@WfInputParam
private Api api;
@WfInputParam
private String mdId;
@WfInputParam
private LocalDateTime overrideFromDate;
@WfInputParam
private LocalDateTime overrideUntilDate;
@WfInputParam(optional = true)
private String filterXpath;
@Autowired
private DnetServiceClientFactory clientFactory;
@Autowired
private CollectorService collectorService;
@Autowired
private MDStoreSqlBackend mdStoreSqlBackend;
@Override
@Transactional
protected void execute() throws Exception {
final LocalDateTime fromDate = this.overrideFromDate != null ? this.overrideFromDate : findLastCollDate(this.api);
final LocalDateTime untilDate = this.overrideUntilDate != null ? this.overrideUntilDate : null;
final Predicate<Document> filter = XpathFilterFactory.createFilter(this.filterXpath);
final Function<Document, MetadataRecord> mdBuilder = MdBuilderFactory.createMdBuilder(this.ds, this.api);
final MDStoreManagerClient mdstoreManager = this.clientFactory.getClient(MDStoreManagerClient.class);
final MDStoreWithInfo outputMDStore = mdstoreManager.findMDStore(this.mdId);
final MDStoreVersion outputVersion = mdstoreManager.findVersion(outputMDStore.getCurrentVersion());
try {
final Stream<MetadataRecord> stream = this.collectorService.collect(this.api, fromDate, untilDate)
.map(xml -> {
try {
return DocumentHelper.parseText(xml);
} catch (final DocumentException e) {
throw new RuntimeException("Invalid record: " + xml);
}
})
.filter(filter)
.map(mdBuilder);
this.mdStoreSqlBackend.saveRecords(outputVersion, stream);
final long size = this.mdStoreSqlBackend.countRecords(outputVersion.getId());
mdstoreManager.commitVersion(outputVersion.getId(), size);
this.clientFactory.getClient(DsmClient.class).updateApiCollectionInfo(this.api.getId(), this.mdId, size);
} catch (final Throwable e) {
mdstoreManager.abortVersion(outputVersion);
throw e;
}
}
private LocalDateTime findLastCollDate(final Api api) {
return api.getLastCollectionDate() != null ? api.getLastCollectionDate().toLocalDateTime() : null;
}
}