some aggregation nodes

This commit is contained in:
Michele Artini 2023-04-28 11:41:22 +02:00
parent 8a242eb4b5
commit 37517dcfb9
6 changed files with 95 additions and 14 deletions

View File

@ -9,6 +9,7 @@ import static eu.dnetlib.dsm.utils.DsmMappingUtils.createId;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.util.HashSet;
import java.util.List;
@ -521,4 +522,15 @@ public class DsmService {
return apiRepository.findById(id).orElseThrow(() -> new DsmException("Api not found. ID: " + id));
}
public void updateUpdateApiCollectionInfo(final String apiId, final String mdId, final long total) {
apiRepository.updateLastCollectionInfo(apiId, mdId, new Timestamp(System.currentTimeMillis()), total);
}
public void updateUpdateApiAggregationInfo(final String apiId, final String mdId, final long total) {
apiRepository.updateLastAggregationInfo(apiId, mdId, new Timestamp(System.currentTimeMillis()), total);
}
public void updateUpdateApiDownloadInfo(final String apiId, final String mdId, final long total) {
apiRepository.updateLastDownloadInfo(apiId, mdId, new Timestamp(System.currentTimeMillis()), total);
}
}

View File

@ -1,5 +1,6 @@
package eu.dnetlib.dsm.repository;
import java.sql.Timestamp;
import java.util.List;
import javax.transaction.Transactional;
@ -49,4 +50,19 @@ public interface ApiRepository extends JpaRepository<Api, String> {
@Query("update #{#entityName} d set d.removable = ?2 where d.datasource = ?1")
void setRemovable(String id, boolean removable);
@Modifying
@Transactional
@Query("update #{#entityName} set (lastCollectionMdid, lastCollectionDate, lastCollectionTotal) = (?2, ?3, ?4) where id = ?1")
void updateLastCollectionInfo(String apiId, String mdId, Timestamp date, long total);
@Modifying
@Transactional
@Query("update #{#entityName} set (lastAggregationMdid, lastAggregationDate, lastAggregationTotal) = (?2, ?3, ?4) where id = ?1")
void updateLastAggregationInfo(String apiId, String mdId, Timestamp date, long total);
@Modifying
@Transactional
@Query("update #{#entityName} set (lastDownloadMdid, lastDownloadDate, lastDownloadTotal) = (?2, ?3, ?4) where id = ?1")
void updateLastDownloadInfo(String apiId, String mdId, Timestamp date, long total);
}

View File

@ -2,6 +2,9 @@ package eu.dnetlib.manager.wf.nodes.aggregation;
import java.time.LocalDateTime;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.dsm.DsmService;
import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfNode;
import eu.dnetlib.manager.wf.annotations.WfOutputParam;
@ -10,25 +13,40 @@ import eu.dnetlib.manager.wf.nodes.AbstractJobNode;
@WfNode("findDateRangeForIncrementalHarvesting")
public class FindDateRangeForIncrementalHarvestingNode extends AbstractJobNode {
@WfInputParam
private String dsId;
@WfInputParam
private String apiId;
@WfInputParam
private LocalDateTime overrideFromDate;
@WfInputParam
private LocalDateTime overrideUntilDate;
@WfOutputParam
private LocalDateTime fromDate;
@WfOutputParam
private LocalDateTime untilDate;
@Autowired
private DsmService dsmService;
@Override
protected void execute() {
fromDate = overrideFromDate != null ? overrideFromDate : findLastAggrDate();
untilDate = overrideUntilDate != null ? overrideUntilDate : LocalDateTime.now();
fromDate = overrideFromDate != null ? overrideFromDate : findLastCollDate();
untilDate = overrideUntilDate != null ? overrideUntilDate : null;
}
private LocalDateTime findLastAggrDate() {
// TODO Auto-generated method stub
return null;
private LocalDateTime findLastCollDate() {
return dsmService.getApis(dsId)
.stream()
.filter(a -> apiId.equals(a.getId()))
.findFirst()
.map(a -> a.getLastCollectionDate().toLocalDateTime())
.orElse(null);
}
}

View File

@ -1,9 +1,14 @@
package eu.dnetlib.manager.wf.nodes.aggregation;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.data.mapping.RecordTransformer;
import eu.dnetlib.data.mapping.xslt.XsltTransformFactory;
import eu.dnetlib.data.mdstore.model.records.MetadataRecord;
import eu.dnetlib.data.mdstore.model.records.MetadataRecordImpl;
import eu.dnetlib.manager.wf.annotations.WfInputParam;
@ -16,21 +21,24 @@ public class MetadataXsltTransformNode extends StreamMapperNode<MetadataRecord,
@WfInputParam
private String ruleId;
// @Autowired
// private CleanerFactory cleanerFactory;
@Autowired
private XsltTransformFactory xsltTransformFactory;
@Override
protected Stream<MetadataRecord> mapStream(final Stream<MetadataRecord> input) {
try {
// final Cleaner cleaner = cleanerFactory.obtainCleaningRule(ruleId);
final Map<String, String> params = new HashMap<>();
// TODO: which params ?
final RecordTransformer<String, String> xslt = xsltTransformFactory.getTransformer(ruleId, params);
final long now = System.currentTimeMillis();
return input.map(in -> {
final MetadataRecord out = new MetadataRecordImpl();
BeanUtils.copyProperties(in, out);
// out.setBody(cleaner.transform(in.getBody()));
out.setDateOfTransformation(System.currentTimeMillis());
out.setBody(xslt.transform(in.getBody()));
out.setDateOfTransformation(now);
return out;
});
} catch (final Exception e) {
throw new RuntimeException(e);

View File

@ -1,7 +1,8 @@
package eu.dnetlib.manager.wf.nodes.aggregation;
import java.time.LocalDateTime;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.dsm.DsmService;
import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfNode;
import eu.dnetlib.manager.wf.nodes.AbstractJobNode;
@ -14,15 +15,33 @@ public class UpdateApiExtraFieldsNode extends AbstractJobNode {
@WfInputParam
private String apiId;
@WfInputParam
private String infoType; // COLLECT or TRANSFORM
private String infoType; // COLLECT, TRANSFORM, DOWNLOAD
@WfInputParam
private String mdId;
@WfInputParam
private long total;
@Autowired
private DsmService dsmService;
@Override
protected void execute() {
final LocalDateTime date = LocalDateTime.now();
switch (infoType.toUpperCase()) {
case "COLLECT":
dsmService.updateUpdateApiCollectionInfo(apiId, mdId, total);
break;
case "AGGREGATOR":
case "TRANSFORM":
dsmService.updateUpdateApiAggregationInfo(apiId, mdId, total);
break;
case "DOWNLOAD":
dsmService.updateUpdateApiDownloadInfo(apiId, mdId, total);
break;
default:
throw new RuntimeException("Invalid infoType: " + infoType);
}
// TODO Auto-generated method stub
}

View File

@ -86,6 +86,14 @@
"name":"PREPARE_INCREMENTAL",
"type":"findDateRangeForIncrementalHarvesting",
"input":[
{
"name":"dsId",
"ref":"dsId"
},
{
"name":"apiId",
"ref":"apiId"
},
{
"name":"from",
"ref":"overrideFromDate"