diff --git a/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/DsmService.java b/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/DsmService.java index f9a1c8a3..b5a28623 100644 --- a/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/DsmService.java +++ b/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/DsmService.java @@ -9,6 +9,7 @@ import static eu.dnetlib.dsm.utils.DsmMappingUtils.createId; import java.io.IOException; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.sql.Timestamp; import java.time.LocalDate; import java.util.HashSet; import java.util.List; @@ -521,4 +522,15 @@ public class DsmService { return apiRepository.findById(id).orElseThrow(() -> new DsmException("Api not found. ID: " + id)); } + public void updateUpdateApiCollectionInfo(final String apiId, final String mdId, final long total) { + apiRepository.updateLastCollectionInfo(apiId, mdId, new Timestamp(System.currentTimeMillis()), total); + } + + public void updateUpdateApiAggregationInfo(final String apiId, final String mdId, final long total) { + apiRepository.updateLastAggregationInfo(apiId, mdId, new Timestamp(System.currentTimeMillis()), total); + } + + public void updateUpdateApiDownloadInfo(final String apiId, final String mdId, final long total) { + apiRepository.updateLastDownloadInfo(apiId, mdId, new Timestamp(System.currentTimeMillis()), total); + } } diff --git a/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/repository/ApiRepository.java b/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/repository/ApiRepository.java index 1783974f..5812228d 100644 --- a/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/repository/ApiRepository.java +++ b/libs/dnet-is-services/src/main/java/eu/dnetlib/dsm/repository/ApiRepository.java @@ -1,5 +1,6 @@ package eu.dnetlib.dsm.repository; +import java.sql.Timestamp; import java.util.List; import javax.transaction.Transactional; @@ -49,4 +50,19 @@ public interface ApiRepository extends JpaRepository { @Query("update #{#entityName} d set d.removable = ?2 where d.datasource = ?1") void setRemovable(String id, boolean removable); + @Modifying + @Transactional + @Query("update #{#entityName} set (lastCollectionMdid, lastCollectionDate, lastCollectionTotal) = (?2, ?3, ?4) where id = ?1") + void updateLastCollectionInfo(String apiId, String mdId, Timestamp date, long total); + + @Modifying + @Transactional + @Query("update #{#entityName} set (lastAggregationMdid, lastAggregationDate, lastAggregationTotal) = (?2, ?3, ?4) where id = ?1") + void updateLastAggregationInfo(String apiId, String mdId, Timestamp date, long total); + + @Modifying + @Transactional + @Query("update #{#entityName} set (lastDownloadMdid, lastDownloadDate, lastDownloadTotal) = (?2, ?3, ?4) where id = ?1") + void updateLastDownloadInfo(String apiId, String mdId, Timestamp date, long total); + } diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/aggregation/FindDateRangeForIncrementalHarvestingNode.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/aggregation/FindDateRangeForIncrementalHarvestingNode.java index 4f9cd9fe..0986c123 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/aggregation/FindDateRangeForIncrementalHarvestingNode.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/aggregation/FindDateRangeForIncrementalHarvestingNode.java @@ -2,6 +2,9 @@ package eu.dnetlib.manager.wf.nodes.aggregation; import java.time.LocalDateTime; +import org.springframework.beans.factory.annotation.Autowired; + +import eu.dnetlib.dsm.DsmService; import eu.dnetlib.manager.wf.annotations.WfInputParam; import eu.dnetlib.manager.wf.annotations.WfNode; import eu.dnetlib.manager.wf.annotations.WfOutputParam; @@ -10,25 +13,40 @@ import eu.dnetlib.manager.wf.nodes.AbstractJobNode; @WfNode("findDateRangeForIncrementalHarvesting") public class FindDateRangeForIncrementalHarvestingNode extends AbstractJobNode { + @WfInputParam + private String dsId; + + @WfInputParam + private String apiId; + @WfInputParam private LocalDateTime overrideFromDate; + @WfInputParam private LocalDateTime overrideUntilDate; @WfOutputParam private LocalDateTime fromDate; + @WfOutputParam private LocalDateTime untilDate; + @Autowired + private DsmService dsmService; + @Override protected void execute() { - fromDate = overrideFromDate != null ? overrideFromDate : findLastAggrDate(); - untilDate = overrideUntilDate != null ? overrideUntilDate : LocalDateTime.now(); + fromDate = overrideFromDate != null ? overrideFromDate : findLastCollDate(); + untilDate = overrideUntilDate != null ? overrideUntilDate : null; } - private LocalDateTime findLastAggrDate() { - // TODO Auto-generated method stub - return null; + private LocalDateTime findLastCollDate() { + return dsmService.getApis(dsId) + .stream() + .filter(a -> apiId.equals(a.getId())) + .findFirst() + .map(a -> a.getLastCollectionDate().toLocalDateTime()) + .orElse(null); } } diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/aggregation/MetadataXsltTransformNode.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/aggregation/MetadataXsltTransformNode.java index fc011de5..a536ae84 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/aggregation/MetadataXsltTransformNode.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/aggregation/MetadataXsltTransformNode.java @@ -1,9 +1,14 @@ package eu.dnetlib.manager.wf.nodes.aggregation; +import java.util.HashMap; +import java.util.Map; import java.util.stream.Stream; import org.springframework.beans.BeanUtils; +import org.springframework.beans.factory.annotation.Autowired; +import eu.dnetlib.data.mapping.RecordTransformer; +import eu.dnetlib.data.mapping.xslt.XsltTransformFactory; import eu.dnetlib.data.mdstore.model.records.MetadataRecord; import eu.dnetlib.data.mdstore.model.records.MetadataRecordImpl; import eu.dnetlib.manager.wf.annotations.WfInputParam; @@ -16,21 +21,24 @@ public class MetadataXsltTransformNode extends StreamMapperNode mapStream(final Stream input) { try { - // final Cleaner cleaner = cleanerFactory.obtainCleaningRule(ruleId); + final Map params = new HashMap<>(); + // TODO: which params ? + final RecordTransformer xslt = xsltTransformFactory.getTransformer(ruleId, params); + + final long now = System.currentTimeMillis(); return input.map(in -> { final MetadataRecord out = new MetadataRecordImpl(); BeanUtils.copyProperties(in, out); - // out.setBody(cleaner.transform(in.getBody())); - out.setDateOfTransformation(System.currentTimeMillis()); + out.setBody(xslt.transform(in.getBody())); + out.setDateOfTransformation(now); return out; - }); } catch (final Exception e) { throw new RuntimeException(e); diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/aggregation/UpdateApiExtraFieldsNode.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/aggregation/UpdateApiExtraFieldsNode.java index 4318161c..af5db8d7 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/aggregation/UpdateApiExtraFieldsNode.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/aggregation/UpdateApiExtraFieldsNode.java @@ -1,7 +1,8 @@ package eu.dnetlib.manager.wf.nodes.aggregation; -import java.time.LocalDateTime; +import org.springframework.beans.factory.annotation.Autowired; +import eu.dnetlib.dsm.DsmService; import eu.dnetlib.manager.wf.annotations.WfInputParam; import eu.dnetlib.manager.wf.annotations.WfNode; import eu.dnetlib.manager.wf.nodes.AbstractJobNode; @@ -14,15 +15,33 @@ public class UpdateApiExtraFieldsNode extends AbstractJobNode { @WfInputParam private String apiId; @WfInputParam - private String infoType; // COLLECT or TRANSFORM + private String infoType; // COLLECT, TRANSFORM, DOWNLOAD @WfInputParam private String mdId; @WfInputParam private long total; + @Autowired + private DsmService dsmService; + @Override protected void execute() { - final LocalDateTime date = LocalDateTime.now(); + + switch (infoType.toUpperCase()) { + case "COLLECT": + dsmService.updateUpdateApiCollectionInfo(apiId, mdId, total); + break; + case "AGGREGATOR": + case "TRANSFORM": + dsmService.updateUpdateApiAggregationInfo(apiId, mdId, total); + break; + case "DOWNLOAD": + dsmService.updateUpdateApiDownloadInfo(apiId, mdId, total); + break; + default: + throw new RuntimeException("Invalid infoType: " + infoType); + } + // TODO Auto-generated method stub } diff --git a/libs/dnet-wf-service/src/main/resources/wf_templates/collect.json b/libs/dnet-wf-service/src/main/resources/wf_templates/collect.json index b1ffca02..ba791dd7 100644 --- a/libs/dnet-wf-service/src/main/resources/wf_templates/collect.json +++ b/libs/dnet-wf-service/src/main/resources/wf_templates/collect.json @@ -86,6 +86,14 @@ "name":"PREPARE_INCREMENTAL", "type":"findDateRangeForIncrementalHarvesting", "input":[ + { + "name":"dsId", + "ref":"dsId" + }, + { + "name":"apiId", + "ref":"apiId" + }, { "name":"from", "ref":"overrideFromDate"