datasource and api objects in nodes

This commit is contained in:
Michele Artini 2023-05-05 08:56:28 +02:00
parent 4b95550a99
commit 889cc9da35
15 changed files with 174 additions and 178 deletions

View File

@ -147,8 +147,8 @@ public class DsmApiControllerV1 extends AbstractDsmController {
final StopWatch stop = StopWatch.createStarted();
final Datasource ds = dsmService.getDs(dsId);
final List<Api> apis = dsmService.getApis(dsId);
final Datasource ds = dsmService.findDs(dsId);
final List<Api> apis = dsmService.findApis(dsId);
final List<ApiDetails> api = apis.stream()
.map(DsmMappingUtils::asDetails)
.map(a -> a.setEoscDatasourceType(ds.getEoscDatasourceType()))
@ -211,7 +211,7 @@ public class DsmApiControllerV1 extends AbstractDsmController {
public void updateDatasource(@RequestBody final DatasourceDetailsUpdate d) throws DsmException, DsmNotFoundException {
// initialize with current values from DB
final Datasource ds = dsmService.getDs(d.getId());
final Datasource ds = dsmService.findDs(d.getId());
if (ds == null) { throw new DsmNotFoundException(String.format("ds '%s' does not exist", d.getId())); }

View File

@ -42,7 +42,7 @@ public class DsmApiControllerV2 extends AbstractDsmController {
public SimpleResponse<SimpleDatasourceInfo> firstCollectedAfter(@RequestParam final String fromDate,
@RequestParam(required = false) final String typologyFilter) throws Throwable {
final StopWatch stop = StopWatch.createStarted();
final List<SimpleDatasourceInfo> list = dsmService.getFirstCollectedAfter(fromDate, typologyFilter);
final List<SimpleDatasourceInfo> list = dsmService.findFirstCollectedAfter(fromDate, typologyFilter);
final SimpleResponse<SimpleDatasourceInfo> rsp = ResponseUtils.simpleResponse(list);
return prepareResponse(1, list.size(), stop, rsp);

View File

@ -154,11 +154,11 @@ public class DsmService {
return dsRepository.findAll(spec, PageRequest.of(page, size));
}
public Datasource getDs(final String dsId) throws DsmException {
public Datasource findDs(final String dsId) throws DsmException {
return dsRepository.findById(dsId).orElseThrow(() -> new DsmException("Datasource not found. ID: " + dsId));
}
public Datasource getDsByNsPrefix(final String prefix) throws DsmException {
public Datasource findDsByNsPrefix(final String prefix) throws DsmException {
return dsRepository.findByNamespaceprefix(prefix).orElseThrow(() -> new DsmException("Datasource not found. NS Prefix: " + prefix));
}
@ -177,7 +177,7 @@ public class DsmService {
apiRepository.updateCompatibility(apiId, compliance);
}
public List<Api> getApis(final String dsId) {
public List<Api> findApis(final String dsId) {
return apiRepository.findByDatasource(dsId);
}
@ -342,7 +342,7 @@ public class DsmService {
}
}
public List<SimpleDatasourceInfo> getFirstCollectedAfter(final String fromDate, final String typeFilter) throws DsmException {
public List<SimpleDatasourceInfo> findFirstCollectedAfter(final String fromDate, final String typeFilter) throws DsmException {
try {
if (StringUtils.isNotBlank(typeFilter)) {
return querySql("first_collected_datasources_fromDate_typology.st.sql", rowMapperForSimpleDatasourceInfo(), typeFilter + "%", fromDate);
@ -522,27 +522,30 @@ public class DsmService {
return apiRepository.findById(id).orElseThrow(() -> new DsmException("Api not found. ID: " + id));
}
public void updateUpdateApiCollectionInfo(final String apiId, final String mdId, final long total) {
public Api updateUpdateApiCollectionInfo(final String apiId, final String mdId, final long total) throws DsmException {
if (StringUtils.isNotBlank(mdId)) {
apiRepository.updateLastCollectionInfo(apiId, mdId, new Timestamp(System.currentTimeMillis()), total);
} else {
apiRepository.updateLastCollectionInfo(apiId, null, null, 0);
}
return findApi(apiId);
}
public void updateUpdateApiAggregationInfo(final String apiId, final String mdId, final long total) {
public Api updateUpdateApiAggregationInfo(final String apiId, final String mdId, final long total) throws DsmException {
if (StringUtils.isNotBlank(mdId)) {
apiRepository.updateLastAggregationInfo(apiId, mdId, new Timestamp(System.currentTimeMillis()), total);
} else {
apiRepository.updateLastAggregationInfo(apiId, null, null, 0);
}
return findApi(apiId);
}
public void updateUpdateApiDownloadInfo(final String apiId, final String mdId, final long total) {
public Api updateUpdateApiDownloadInfo(final String apiId, final String mdId, final long total) throws DsmException {
if (StringUtils.isNotBlank(mdId)) {
apiRepository.updateLastDownloadInfo(apiId, mdId, new Timestamp(System.currentTimeMillis()), total);
} else {
apiRepository.updateLastDownloadInfo(apiId, null, null, 0);
}
return findApi(apiId);
}
}

View File

@ -94,7 +94,7 @@ public class WorkflowManagerService implements Stoppable {
}
try {
final String dsName = dsmService.getDs(dsId).getOfficialname();
final String dsName = dsmService.findDs(dsId).getOfficialname();
final WorkflowConfiguration conf = new WorkflowConfiguration();
conf.setId("REPO_HI_" + UUID.randomUUID());

View File

@ -5,7 +5,6 @@ import java.util.stream.Stream;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.data.collect.CollectorService;
import eu.dnetlib.dsm.DsmService;
import eu.dnetlib.dsm.model.Api;
import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfNode;
@ -15,25 +14,13 @@ import eu.dnetlib.manager.wf.nodes.stream.StreamSupplierNode;
public class CollectNode extends StreamSupplierNode<String> {
@WfInputParam
private String dsId;
@WfInputParam
private String apiId;
@Autowired
private DsmService dsmService;
private Api api;
@Autowired
private CollectorService collectorService;
@Override
protected Stream<String> prepareStream() {
final Api api = dsmService.getApis(dsId)
.stream()
.filter(a -> apiId.equals(a.getId()))
.findFirst()
.orElseThrow();
return collectorService.collect(api);
}

View File

@ -6,7 +6,6 @@ import java.util.stream.Stream;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.data.collect.CollectorService;
import eu.dnetlib.dsm.DsmService;
import eu.dnetlib.dsm.model.Api;
import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfNode;
@ -16,10 +15,7 @@ import eu.dnetlib.manager.wf.nodes.stream.StreamSupplierNode;
public class DateRangeCollectNode extends StreamSupplierNode<String> {
@WfInputParam
private String dsId;
@WfInputParam
private String apiId;
private Api api;
@WfInputParam
private LocalDateTime from;
@ -27,22 +23,12 @@ public class DateRangeCollectNode extends StreamSupplierNode<String> {
@WfInputParam
private LocalDateTime until;
@Autowired
private DsmService dsmService;
@Autowired
private CollectorService collectorService;
@Override
protected Stream<String> prepareStream() {
final Api api = dsmService.getApis(dsId)
.stream()
.filter(a -> apiId.equals(a.getId()))
.findFirst()
.orElseThrow();
return collectorService.collect(api, from, until);
}
@Override

View File

@ -2,9 +2,7 @@ package eu.dnetlib.manager.wf.nodes.aggregation;
import java.time.LocalDateTime;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.dsm.DsmService;
import eu.dnetlib.dsm.model.Api;
import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfNode;
import eu.dnetlib.manager.wf.annotations.WfOutputParam;
@ -14,10 +12,7 @@ import eu.dnetlib.manager.wf.nodes.AbstractJobNode;
public class FindDateRangeForIncrementalHarvestingNode extends AbstractJobNode {
@WfInputParam
private String dsId;
@WfInputParam
private String apiId;
private Api api;
@WfInputParam
private LocalDateTime overrideFromDate;
@ -31,9 +26,6 @@ public class FindDateRangeForIncrementalHarvestingNode extends AbstractJobNode {
@WfOutputParam
private LocalDateTime untilDate;
@Autowired
private DsmService dsmService;
@Override
protected void execute() throws Exception {
fromDate = overrideFromDate != null ? overrideFromDate : findLastCollDate();
@ -41,12 +33,7 @@ public class FindDateRangeForIncrementalHarvestingNode extends AbstractJobNode {
}
private LocalDateTime findLastCollDate() {
return dsmService.getApis(dsId)
.stream()
.filter(a -> apiId.equals(a.getId()))
.findFirst()
.map(a -> a.getLastCollectionDate().toLocalDateTime())
.orElse(null);
return api.getLastCollectionDate() != null ? api.getLastCollectionDate().toLocalDateTime() : null;
}
}

View File

@ -8,6 +8,8 @@ import eu.dnetlib.data.mdstore.model.records.MetadataRecord;
import eu.dnetlib.data.mdstore.model.records.MetadataRecordImpl;
import eu.dnetlib.data.mdstore.model.records.Provenance;
import eu.dnetlib.dsm.DsmService;
import eu.dnetlib.dsm.model.Api;
import eu.dnetlib.dsm.model.Datasource;
import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfNode;
import eu.dnetlib.manager.wf.nodes.stream.StreamMapperNode;
@ -16,13 +18,9 @@ import eu.dnetlib.manager.wf.nodes.stream.StreamMapperNode;
public class OpenaireMdBuilderNode extends StreamMapperNode<String, MetadataRecord> {
@WfInputParam
private String dsId;
private Datasource ds;
@WfInputParam
private String dsPrefix;
@WfInputParam
private String dsName;
@WfInputParam
private String apiId;
private Api api;
@WfInputParam
private boolean inferred;
@WfInputParam
@ -47,17 +45,12 @@ public class OpenaireMdBuilderNode extends StreamMapperNode<String, MetadataReco
@Override
protected void init() throws Exception {
xpath = dsmService.getApis(dsId)
.stream()
.filter(a -> apiId.equals(a.getId()))
.findFirst()
.map(a -> a.getMetadataIdentifierPath())
.orElseThrow();
xpath = api.getMetadataIdentifierPath();
provenance = new Provenance();
provenance.setDatasourceId(dsId);
provenance.setDatasourceName(dsName);
provenance.setNsPrefix(dsPrefix);
provenance.setDatasourceId(ds.getId());
provenance.setDatasourceName(ds.getOfficialname());
provenance.setNsPrefix(ds.getNamespaceprefix());
now = System.currentTimeMillis();
}
@ -73,7 +66,7 @@ public class OpenaireMdBuilderNode extends StreamMapperNode<String, MetadataReco
final MetadataRecord md = new MetadataRecordImpl();
md.setId(MetadataRecord.generateIdentifier(origId, dsPrefix));
md.setId(MetadataRecord.generateIdentifier(origId, ds.getNamespaceprefix()));
md.setOriginalId(origId);
md.setBody(doc.asXML());
md.setEncoding("XML");

View File

@ -3,21 +3,25 @@ package eu.dnetlib.manager.wf.nodes.aggregation;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.dsm.DsmService;
import eu.dnetlib.dsm.model.Api;
import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfNode;
import eu.dnetlib.manager.wf.annotations.WfOutputParam;
import eu.dnetlib.manager.wf.nodes.AbstractJobNode;
@WfNode("updateApiExtraFields")
public class UpdateApiExtraFieldsNode extends AbstractJobNode {
@WfInputParam
private String dsId;
@WfInputParam
private String apiId;
@WfOutputParam
private Api api;
@WfInputParam
private String infoType; // COLLECT, TRANSFORM, DOWNLOAD
@WfInputParam
private String mdId;
@WfInputParam
private long total;
@ -29,14 +33,14 @@ public class UpdateApiExtraFieldsNode extends AbstractJobNode {
switch (infoType.toUpperCase()) {
case "COLLECT":
dsmService.updateUpdateApiCollectionInfo(apiId, mdId, total);
api = dsmService.updateUpdateApiCollectionInfo(api.getId(), mdId, total);
break;
case "AGGREGATOR":
case "TRANSFORM":
dsmService.updateUpdateApiAggregationInfo(apiId, mdId, total);
api = dsmService.updateUpdateApiAggregationInfo(api.getId(), mdId, total);
break;
case "DOWNLOAD":
dsmService.updateUpdateApiDownloadInfo(apiId, mdId, total);
api = dsmService.updateUpdateApiDownloadInfo(api.getId(), mdId, total);
break;
default:
throw new RuntimeException("Invalid infoType: " + infoType);

View File

@ -0,0 +1,38 @@
package eu.dnetlib.manager.wf.nodes.ds;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.dsm.DsmService;
import eu.dnetlib.dsm.model.Api;
import eu.dnetlib.dsm.model.Datasource;
import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfNode;
import eu.dnetlib.manager.wf.annotations.WfOutputParam;
import eu.dnetlib.manager.wf.nodes.AbstractJobNode;
@WfNode("loadDatasourceInfo")
public class LoadDatasourceInfoNode extends AbstractJobNode {
@WfInputParam
private String dsId;
@WfInputParam
private String apiId;
@WfOutputParam
private Datasource ds;
@WfOutputParam
private Api api;
@Autowired
private DsmService dsmService;
@Override
protected void execute() throws Exception {
ds = dsmService.findDs(dsId);
api = dsmService.findApi(apiId);
}
}

View File

@ -4,6 +4,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.data.mdstore.MDStoreService;
import eu.dnetlib.data.mdstore.model.MDStoreType;
import eu.dnetlib.dsm.model.Api;
import eu.dnetlib.dsm.model.Datasource;
import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfNode;
import eu.dnetlib.manager.wf.annotations.WfOutputParam;
@ -25,13 +27,10 @@ public class CreateMdStoreNode extends AbstractJobNode {
private String backend;
@WfInputParam
private String dsId;
private Datasource ds;
@WfInputParam
private String dsName;
@WfInputParam
private String apiId;
private Api api;
@WfOutputParam
private String mdId;
@ -41,7 +40,7 @@ public class CreateMdStoreNode extends AbstractJobNode {
@Override
protected void execute() throws Exception {
mdStoreService.createMDStore(format, layout, interpretation, MDStoreType.valueOf(backend), dsName, dsId, apiId);
mdStoreService.createMDStore(format, layout, interpretation, MDStoreType.valueOf(backend), ds.getOfficialname(), ds.getId(), api.getId());
}
}

View File

@ -4,9 +4,9 @@ import java.util.Arrays;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.dsm.DsmService;
import eu.dnetlib.dsm.model.Api;
import eu.dnetlib.dsm.model.Datasource;
import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfNode;
import eu.dnetlib.manager.wf.nodes.AbstractJobNode;
@ -21,24 +21,15 @@ public class VerifyDatasourceNode extends AbstractJobNode {
private String expectedCompatibilities;
@WfInputParam
private String dsId;
private Datasource ds;
@WfInputParam
private String apiId;
@Autowired
private DsmService dsmService;
private Api api;
@Override
protected void execute() throws Exception {
final String type = dsmService.getDs(dsId).getEoscDatasourceType();
final String compatibility = dsmService.getApis(dsId)
.stream()
.filter(a -> apiId.equals(a.getId()))
.map(a -> ObjectUtils.firstNonNull(a.getCompatibilityOverride(), a.getCompatibility()))
.findFirst()
.orElseThrow(() -> new Exception("Missing api: " + apiId));
final String type = ds.getEoscDatasourceType();
final String compatibility = ObjectUtils.firstNonNull(api.getCompatibilityOverride(), api.getCompatibility());
if (StringUtils.isNotBlank(expectedEoscDatasourceTypes)) {
Arrays.stream(expectedEoscDatasourceTypes.split(","))

View File

@ -2,19 +2,11 @@
"parameters": [
{
"name":"dsId",
"description":"Datasource ID"
},
{
"name":"dsPrefix",
"description":"Datasource Prefix"
},
{
"name":"dsName",
"description":"Datasource Name"
"description":"the Datasource ID"
},
{
"name":"apiId",
"description":"Datasource API"
"description":"the Api ID"
},
{
"name":"nativeMdStoreId",

View File

@ -2,19 +2,11 @@
"parameters":[
{
"name":"dsId",
"description":"Datasource ID"
},
{
"name":"dsPrefix",
"description":"Datasource Prefix"
},
{
"name":"dsName",
"description":"Datasource Name"
"description":"the Datasource ID"
},
{
"name":"apiId",
"description":"Datasource API"
"description":"the Api ID"
},
{
"name":"mode",
@ -39,9 +31,38 @@
}
],
"graph":[
{
"name":"LOAD_DS_INFO",
"type":"loadDatasourceInfo",
"start": true,
"input":[
{
"name":"dsId",
"ref":"dsId"
},
{
"name":"apiId",
"ref":"apiId"
}
],
"output":[
{
"name":"ds",
"env":"ds"
},
{
"name":"api",
"env":"api"
}
],
"arcs":[
{
"to":"SELECT_MODE"
}
]
},
{
"name":"SELECT_MODE",
"start":true,
"arcs":[
{
"to":"COLLECT_REFRESH",
@ -58,12 +79,8 @@
"type":"collect",
"input":[
{
"name":"dsId",
"ref":"dsId"
},
{
"name":"apiId",
"ref":"apiId"
"name":"api",
"env":"api"
}
],
"output":[
@ -87,12 +104,8 @@
"type":"findDateRangeForIncrementalHarvesting",
"input":[
{
"name":"dsId",
"ref":"dsId"
},
{
"name":"apiId",
"ref":"apiId"
"name":"api",
"env":"api"
},
{
"name":"from",
@ -124,12 +137,8 @@
"type":"dateRangeCollect",
"input":[
{
"name":"dsId",
"ref":"dsId"
},
{
"name":"apiId",
"ref":"apiId"
"name":"api",
"env":"api"
},
{
"name":"fromDate",
@ -161,24 +170,12 @@
"type":"openaireMdBuilder",
"input":[
{
"name":"dsId",
"ref":"dsId"
"name":"ds",
"env":"ds"
},
{
"name":"dsPrefix",
"ref":"dsPrefix"
},
{
"name":"dsName",
"ref":"dsName"
},
{
"name":"apiId",
"ref":"apiId"
},
{
"name": "xpathForRecordId",
"env" : "xpathForRecordId"
"name":"api",
"env":"api"
},
{
"name":"inputStream",
@ -255,12 +252,8 @@
"type":"updateApiExtraFields",
"input":[
{
"name":"dsId",
"ref":"dsId"
},
{
"name":"apiId",
"ref":"apiId"
"name":"api",
"env":"api"
},
{
"name":"infoType",
@ -274,6 +267,12 @@
"name":"total",
"env":"total"
}
],
"output":[
{
"name":"api",
"env": "api"
}
]
}
]

View File

@ -2,19 +2,11 @@
"parameters":[
{
"name":"dsId",
"description":"Datasource ID"
},
{
"name":"dsPrefix",
"description":"Datasource Prefix"
},
{
"name":"dsName",
"description":"Datasource Name"
"description":"the Datasource ID"
},
{
"name":"apiId",
"description":"Datasource API"
"description":"the Api ID"
},
{
"name":"inputMdStoreId",
@ -26,10 +18,39 @@
}
],
"graph":[
{
"name":"LOAD_DS_INFO",
"type":"loadDatasourceInfo",
"start": true,
"input":[
{
"name":"dsId",
"ref":"dsId"
},
{
"name":"apiId",
"ref":"apiId"
}
],
"output":[
{
"name":"ds",
"env":"ds"
},
{
"name":"api",
"env":"api"
}
],
"arcs":[
{
"to":"FETCH_MDSTORE"
}
]
},
{
"name":"FETCH_MDSTORE",
"type":"fetchMdStore",
"start": true,
"input":[
{
"name":"mdId",
@ -107,12 +128,8 @@
"type":"updateApiExtraFields",
"input":[
{
"name":"dsId",
"ref":"dsId"
},
{
"name":"apiId",
"ref":"apiId"
"name":"api",
"env":"api"
},
{
"name":"infoType",