diff --git a/dnet-app/apps/dnet-wf-aggregation-postgres/src/main/java/eu/dnetlib/wfs/nodes/MdCollectIncrementalJobNode.java b/dnet-app/apps/dnet-wf-aggregation-postgres/src/main/java/eu/dnetlib/wfs/nodes/MdCollectIncrementalJobNode.java index 692adb8..145f60b 100644 --- a/dnet-app/apps/dnet-wf-aggregation-postgres/src/main/java/eu/dnetlib/wfs/nodes/MdCollectIncrementalJobNode.java +++ b/dnet-app/apps/dnet-wf-aggregation-postgres/src/main/java/eu/dnetlib/wfs/nodes/MdCollectIncrementalJobNode.java @@ -44,7 +44,7 @@ public class MdCollectIncrementalJobNode extends AbstractJobNode { @WfInputParam private LocalDateTime overrideUntilDate; - @WfInputParam + @WfInputParam(optional = true) private String filterXpath; @Autowired @@ -60,18 +60,18 @@ public class MdCollectIncrementalJobNode extends AbstractJobNode { @Transactional protected void execute() throws Exception { - final LocalDateTime fromDate = overrideFromDate != null ? overrideFromDate : findLastCollDate(api); - final LocalDateTime untilDate = overrideUntilDate != null ? overrideUntilDate : null; + final LocalDateTime fromDate = this.overrideFromDate != null ? this.overrideFromDate : findLastCollDate(this.api); + final LocalDateTime untilDate = this.overrideUntilDate != null ? this.overrideUntilDate : null; - final Predicate filter = XpathFilterFactory.createFilter(filterXpath); - final Function mdBuilder = MdBuilderFactory.createMdBuilder(ds, api); + final Predicate filter = XpathFilterFactory.createFilter(this.filterXpath); + final Function mdBuilder = MdBuilderFactory.createMdBuilder(this.ds, this.api); - final MDStoreManagerClient mdstoreManager = clientFactory.getClient(MDStoreManagerClient.class); + final MDStoreManagerClient mdstoreManager = this.clientFactory.getClient(MDStoreManagerClient.class); - final MDStoreWithInfo outputMDStore = mdstoreManager.findMDStore(mdId); + final MDStoreWithInfo outputMDStore = mdstoreManager.findMDStore(this.mdId); final MDStoreVersion outputVersion = mdstoreManager.findVersion(outputMDStore.getCurrentVersion()); try { - final Stream stream = collectorService.collect(api, fromDate, untilDate) + final Stream stream = this.collectorService.collect(this.api, fromDate, untilDate) .map(xml -> { try { return DocumentHelper.parseText(xml); @@ -82,13 +82,13 @@ public class MdCollectIncrementalJobNode extends AbstractJobNode { .filter(filter) .map(mdBuilder); - mdStoreSqlBackend.saveRecords(outputVersion, stream); + this.mdStoreSqlBackend.saveRecords(outputVersion, stream); - final long size = mdStoreSqlBackend.countRecords(outputVersion.getId()); + final long size = this.mdStoreSqlBackend.countRecords(outputVersion.getId()); outputVersion.setSize(size); mdstoreManager.commitVersion(outputVersion); - clientFactory.getClient(DsmClient.class).updateApiCollectionInfo(api.getId(), mdId, size); + this.clientFactory.getClient(DsmClient.class).updateApiCollectionInfo(this.api.getId(), this.mdId, size); } catch (final Throwable e) { mdstoreManager.abortVersion(outputVersion); throw e; diff --git a/dnet-app/apps/dnet-wf-aggregation-postgres/src/main/java/eu/dnetlib/wfs/nodes/MdCollectRefreshJobNode.java b/dnet-app/apps/dnet-wf-aggregation-postgres/src/main/java/eu/dnetlib/wfs/nodes/MdCollectRefreshJobNode.java index 21f426e..bfd862a 100644 --- a/dnet-app/apps/dnet-wf-aggregation-postgres/src/main/java/eu/dnetlib/wfs/nodes/MdCollectRefreshJobNode.java +++ b/dnet-app/apps/dnet-wf-aggregation-postgres/src/main/java/eu/dnetlib/wfs/nodes/MdCollectRefreshJobNode.java @@ -36,7 +36,7 @@ public class MdCollectRefreshJobNode extends AbstractJobNode { @WfInputParam private String mdId; - @WfInputParam + @WfInputParam(optional = true) private String filterXpath; @Autowired @@ -52,15 +52,15 @@ public class MdCollectRefreshJobNode extends AbstractJobNode { @Transactional protected void execute() throws Exception { - final Predicate filter = XpathFilterFactory.createFilter(filterXpath); - final Function mdBuilder = MdBuilderFactory.createMdBuilder(ds, api); + final Predicate filter = XpathFilterFactory.createFilter(this.filterXpath); + final Function mdBuilder = MdBuilderFactory.createMdBuilder(this.ds, this.api); - final MDStoreManagerClient mdstoreManager = clientFactory.getClient(MDStoreManagerClient.class); + final MDStoreManagerClient mdstoreManager = this.clientFactory.getClient(MDStoreManagerClient.class); - final MDStoreVersion outputVersion = mdstoreManager.newVersion(mdId); + final MDStoreVersion outputVersion = mdstoreManager.newVersion(this.mdId); try { - final Stream stream = collectorService.collect(api) + final Stream stream = this.collectorService.collect(this.api) .map(xml -> { try { return DocumentHelper.parseText(xml); @@ -71,13 +71,13 @@ public class MdCollectRefreshJobNode extends AbstractJobNode { .filter(filter) .map(mdBuilder); - mdStoreSqlBackend.saveRecords(outputVersion, stream); + this.mdStoreSqlBackend.saveRecords(outputVersion, stream); - final long size = mdStoreSqlBackend.countRecords(outputVersion.getId()); + final long size = this.mdStoreSqlBackend.countRecords(outputVersion.getId()); outputVersion.setSize(size); mdstoreManager.commitVersion(outputVersion); - clientFactory.getClient(DsmClient.class).updateApiCollectionInfo(api.getId(), mdId, size); + this.clientFactory.getClient(DsmClient.class).updateApiCollectionInfo(this.api.getId(), this.mdId, size); } catch (final Throwable e) { mdstoreManager.abortVersion(outputVersion); throw e; diff --git a/dnet-app/apps/dnet-wf-aggregation-postgres/src/main/java/eu/dnetlib/wfs/nodes/MdTransformJobNode.java b/dnet-app/apps/dnet-wf-aggregation-postgres/src/main/java/eu/dnetlib/wfs/nodes/MdTransformJobNode.java index 957c321..c8f8ff9 100644 --- a/dnet-app/apps/dnet-wf-aggregation-postgres/src/main/java/eu/dnetlib/wfs/nodes/MdTransformJobNode.java +++ b/dnet-app/apps/dnet-wf-aggregation-postgres/src/main/java/eu/dnetlib/wfs/nodes/MdTransformJobNode.java @@ -27,7 +27,7 @@ import eu.dnetlib.wfs.annotations.WfNode; import eu.dnetlib.wfs.utils.XpathFilterFactory; import jakarta.transaction.Transactional; -@WfNode("md_transform") +@WfNode("md_transform_xslt") public class MdTransformJobNode extends AbstractJobNode { @WfInputParam @@ -42,7 +42,7 @@ public class MdTransformJobNode extends AbstractJobNode { @WfInputParam private String outputMdId; - @WfInputParam + @WfInputParam(optional = true) private String filterXpath; @WfInputParam @@ -61,18 +61,18 @@ public class MdTransformJobNode extends AbstractJobNode { @Transactional protected void execute() throws Exception { - final Predicate filter = XpathFilterFactory.createFilter(filterXpath); + final Predicate filter = XpathFilterFactory.createFilter(this.filterXpath); final Map params = new HashMap<>(); // TODO (LOW PRIORITY): which params ? - final RecordTransformer xslt = xsltTransformFactory.getTransformer(ruleId, params); - final MDStoreManagerClient mdstoreManager = clientFactory.getClient(MDStoreManagerClient.class); + final RecordTransformer xslt = this.xsltTransformFactory.getTransformer(this.ruleId, params); + final MDStoreManagerClient mdstoreManager = this.clientFactory.getClient(MDStoreManagerClient.class); - final MDStoreVersion inputVersion = mdstoreManager.startReading(inputMdId); - final MDStoreVersion outputVersion = mdstoreManager.newVersion(outputMdId); + final MDStoreVersion inputVersion = mdstoreManager.startReading(this.inputMdId); + final MDStoreVersion outputVersion = mdstoreManager.newVersion(this.outputMdId); try { - final Stream stream = mdStoreSqlBackend.streamEntries(inputVersion) + final Stream stream = this.mdStoreSqlBackend.streamEntries(inputVersion) .filter(record -> { try { final Document doc = DocumentHelper.parseText(record.getBody()); @@ -89,13 +89,13 @@ public class MdTransformJobNode extends AbstractJobNode { return output; }); - mdStoreSqlBackend.saveRecords(outputVersion, stream); + this.mdStoreSqlBackend.saveRecords(outputVersion, stream); - final long size = mdStoreSqlBackend.countRecords(outputVersion.getId()); + final long size = this.mdStoreSqlBackend.countRecords(outputVersion.getId()); outputVersion.setSize(size); mdstoreManager.commitVersion(outputVersion); - clientFactory.getClient(DsmClient.class).updateApiAggregationInfo(api.getId(), outputMdId, size); + this.clientFactory.getClient(DsmClient.class).updateApiAggregationInfo(this.api.getId(), this.outputMdId, size); } catch (final Throwable e) { mdstoreManager.abortVersion(outputVersion); throw e; diff --git a/dnet-app/apps/dnet-wf-manager/src/main/resources/wf_templates/collect.json b/dnet-app/apps/dnet-wf-manager/src/main/resources/wf_templates/collect.json index ad4825d..a89d655 100644 --- a/dnet-app/apps/dnet-wf-manager/src/main/resources/wf_templates/collect.json +++ b/dnet-app/apps/dnet-wf-manager/src/main/resources/wf_templates/collect.json @@ -21,6 +21,11 @@ "description":"Override the default untilDate (now)", "type":"DATE", "required":"false" + }, + { + "name":"nativeMdStoreFilterXpath", + "description":"Filter the collected records", + "required":"false" } ], "graph":[ @@ -53,105 +58,14 @@ "condition":"mode != 'INCREMENTAL'" }, { - "to":"PREPARE_INCREMENTAL", + "to":"COLLECT_INCREMENTAL", "condition":"mode == 'INCREMENTAL'" } ] }, { "name":"COLLECT_REFRESH", - "type":"collect", - "input":[ - { - "name":"api", - "env":"api" - } - ], - "output":[ - { - "name":"outputStream", - "env":"collectStream" - }, - { - "name": "xpathForRecordId", - "env" : "xpathForRecordId" - } - ], - "arcs":[ - { - "to":"MD_BUILDER" - } - ] - }, - { - "name":"PREPARE_INCREMENTAL", - "type":"findDateRangeForIncrementalHarvesting", - "input":[ - { - "name":"api", - "env":"api" - }, - { - "name":"from", - "ref":"overrideFromDate" - }, - { - "name":"until", - "ref":"overrideUntilDate" - } - ], - "output":[ - { - "name":"from", - "env":"fromDate" - }, - { - "name":"until", - "env":"untilDate" - } - ], - "arcs":[ - { - "to":"COLLECT_INCREMENTAL" - } - ] - }, - { - "name":"COLLECT_INCREMENTAL", - "type":"dateRangeCollect", - "input":[ - { - "name":"api", - "env":"api" - }, - { - "name":"fromDate", - "env":"fromDate" - }, - { - "name":"untilDate", - "env":"untilDate" - } - ], - "output":[ - { - "name":"outputStream", - "env":"collectStream" - }, - { - "name": "xpathForRecordId", - "env" : "xpathForRecordId" - } - ], - "arcs":[ - { - "to":"MD_BUILDER" - } - ] - }, - { - "name":"MD_BUILDER", - "type":"openaireMdBuilder", + "type":"md_collect_refresh", "input":[ { "name":"ds", @@ -161,104 +75,49 @@ "name":"api", "env":"api" }, - { - "name":"inputStream", - "env":"collectStream" - }, - { - "name":"inferred", - "value":"false" - }, - { - "name":"deletedbyinference", - "value":"false" - }, - { - "name":"inferenceprovenance", - "value":"" - }, - { - "name":"trust", - "value":"0.9" - }, - { - "name":"provenanceactionclassname", - "value":"sysimport:crosswalk:repository" - }, - { - "name":"provenanceactionclassid", - "value":"sysimport:crosswalk:repository" - } - ], - "output":[ - { - "name":"outputStream", - "env":"mdBuilderStream" - } - ], - "arcs":[ - { - "to":"STORE" - } - ] - }, - { - "name":"STORE", - "type":"feedMdStore", - "input":[ { "name":"mdId", "ref":"nativeMdStoreId" }, { - "name":"inputStream", - "env":"mdBuilderStream" - }, - { - "name":"storingType", - "ref":"mode" + "name":"filterXpath", + "ref":"nativeMdStoreFilterXpath" } ], - "output":[ - { - "name":"total", - "env": "total" - } - ], - "arcs":[ - { - "to":"UPDATE_INFO" - } - ] + "output":[], + "arcs":[] }, { - "name":"UPDATE_INFO", - "type":"updateApiExtraFields", + "name":"COLLECT_INCREMENTAL", + "type":"md_collect_incremental", "input":[ + { + "name":"ds", + "env":"ds" + }, { "name":"api", "env":"api" }, - { - "name":"infoType", - "value":"Collect" - }, { "name":"mdId", - "ref":"nativeMdStoreId" + "env":"nativeMdStoreId" }, { - "name":"total", - "env":"total" + "name":"filterXpath", + "env":"nativeMdStoreFilterXpath" + }, + { + "name":"overrideFromDate", + "env":"overrideFromDate" + }, + { + "name":"overrideUntilDate", + "env":"overrideUntilDate" } ], - "output":[ - { - "name":"api", - "env": "api" - } - ] + "output":[], + "arcs":[] } ] } - diff --git a/dnet-app/apps/dnet-wf-manager/src/main/resources/wf_templates/transform.json b/dnet-app/apps/dnet-wf-manager/src/main/resources/wf_templates/transform.json index 36bb961..a7e1c97 100644 --- a/dnet-app/apps/dnet-wf-manager/src/main/resources/wf_templates/transform.json +++ b/dnet-app/apps/dnet-wf-manager/src/main/resources/wf_templates/transform.json @@ -7,6 +7,10 @@ { "name":"cleanedMdStoreId", "description":"Cleaned Metadata Store ID" + }, + { + "name":"xsltRuleId", + "description":"Transformation Rule ID" } ], "graph":[ @@ -25,27 +29,6 @@ "env":"api" } ], - "arcs":[ - { - "to":"FETCH_MDSTORE" - } - ] - }, - { - "name":"FETCH_MDSTORE", - "type":"fetchMdStore", - "input":[ - { - "name":"mdId", - "ref":"nativeMdStoreId" - } - ], - "output":[ - { - "name":"outputStream", - "env":"origStream" - } - ], "arcs":[ { "to":"TRANSFORM" @@ -54,102 +37,35 @@ }, { "name":"TRANSFORM", - "type":"applyXslt", + "type":"md_transform_xslt", "input":[ { - "name":"ruleId", - "ref":"ruleId" + "name":"ds", + "env":"ds" }, - { - "name":"inputStream", - "env":"origStream" - } - ], - "output":[ - { - "name":"outputStream", - "env":"transformStream" - } - ], - "arcs":[ - { - "to":"STORE" - } - ] - }, - { - "name":"STORE", - "type":"feedMdStore", - "input":[ - { - "name":"mdId", - "ref":"cleanedMdStoreId" - }, - { - "name":"inputStream", - "env":"transformStream" - }, - { - "name":"storingType", - "value":"REFRESH" - } - ], - "output":[ - { - "name":"total", - "env": "total" - } - ], - "arcs":[ - { - "to":"UPDATE_INFO" - } - ] - }, - { - "name":"UPDATE_INFO", - "type":"updateApiExtraFields", - "input":[ { "name":"api", "env":"api" }, { - "name":"infoType", - "value":"Transform" + "name":"inputMdId", + "ref":"nativeMdStoreId" }, { - "name":"mdId", + "name":"outputMdId", "ref":"cleanedMdStoreId" }, { - "name":"total", - "env":"total" + "name":"ruleId", + "ref":"xsltRuleId" + }, + { + "name":"filterXpath", + "ref":"nativeMdStoreFilterXpath" } - ] + ], + "output":[], + "arcs":[] } ] } - - - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file