wf profiles

This commit is contained in:
Michele Artini 2024-01-16 13:52:51 +01:00
parent cc46823e6b
commit b515d39d86
5 changed files with 80 additions and 305 deletions

View File

@ -44,7 +44,7 @@ public class MdCollectIncrementalJobNode extends AbstractJobNode {
@WfInputParam
private LocalDateTime overrideUntilDate;
@WfInputParam
@WfInputParam(optional = true)
private String filterXpath;
@Autowired
@ -60,18 +60,18 @@ public class MdCollectIncrementalJobNode extends AbstractJobNode {
@Transactional
protected void execute() throws Exception {
final LocalDateTime fromDate = overrideFromDate != null ? overrideFromDate : findLastCollDate(api);
final LocalDateTime untilDate = overrideUntilDate != null ? overrideUntilDate : null;
final LocalDateTime fromDate = this.overrideFromDate != null ? this.overrideFromDate : findLastCollDate(this.api);
final LocalDateTime untilDate = this.overrideUntilDate != null ? this.overrideUntilDate : null;
final Predicate<Document> filter = XpathFilterFactory.createFilter(filterXpath);
final Function<Document, MetadataRecord> mdBuilder = MdBuilderFactory.createMdBuilder(ds, api);
final Predicate<Document> filter = XpathFilterFactory.createFilter(this.filterXpath);
final Function<Document, MetadataRecord> mdBuilder = MdBuilderFactory.createMdBuilder(this.ds, this.api);
final MDStoreManagerClient mdstoreManager = clientFactory.getClient(MDStoreManagerClient.class);
final MDStoreManagerClient mdstoreManager = this.clientFactory.getClient(MDStoreManagerClient.class);
final MDStoreWithInfo outputMDStore = mdstoreManager.findMDStore(mdId);
final MDStoreWithInfo outputMDStore = mdstoreManager.findMDStore(this.mdId);
final MDStoreVersion outputVersion = mdstoreManager.findVersion(outputMDStore.getCurrentVersion());
try {
final Stream<MetadataRecord> stream = collectorService.collect(api, fromDate, untilDate)
final Stream<MetadataRecord> stream = this.collectorService.collect(this.api, fromDate, untilDate)
.map(xml -> {
try {
return DocumentHelper.parseText(xml);
@ -82,13 +82,13 @@ public class MdCollectIncrementalJobNode extends AbstractJobNode {
.filter(filter)
.map(mdBuilder);
mdStoreSqlBackend.saveRecords(outputVersion, stream);
this.mdStoreSqlBackend.saveRecords(outputVersion, stream);
final long size = mdStoreSqlBackend.countRecords(outputVersion.getId());
final long size = this.mdStoreSqlBackend.countRecords(outputVersion.getId());
outputVersion.setSize(size);
mdstoreManager.commitVersion(outputVersion);
clientFactory.getClient(DsmClient.class).updateApiCollectionInfo(api.getId(), mdId, size);
this.clientFactory.getClient(DsmClient.class).updateApiCollectionInfo(this.api.getId(), this.mdId, size);
} catch (final Throwable e) {
mdstoreManager.abortVersion(outputVersion);
throw e;

View File

@ -36,7 +36,7 @@ public class MdCollectRefreshJobNode extends AbstractJobNode {
@WfInputParam
private String mdId;
@WfInputParam
@WfInputParam(optional = true)
private String filterXpath;
@Autowired
@ -52,15 +52,15 @@ public class MdCollectRefreshJobNode extends AbstractJobNode {
@Transactional
protected void execute() throws Exception {
final Predicate<Document> filter = XpathFilterFactory.createFilter(filterXpath);
final Function<Document, MetadataRecord> mdBuilder = MdBuilderFactory.createMdBuilder(ds, api);
final Predicate<Document> filter = XpathFilterFactory.createFilter(this.filterXpath);
final Function<Document, MetadataRecord> mdBuilder = MdBuilderFactory.createMdBuilder(this.ds, this.api);
final MDStoreManagerClient mdstoreManager = clientFactory.getClient(MDStoreManagerClient.class);
final MDStoreManagerClient mdstoreManager = this.clientFactory.getClient(MDStoreManagerClient.class);
final MDStoreVersion outputVersion = mdstoreManager.newVersion(mdId);
final MDStoreVersion outputVersion = mdstoreManager.newVersion(this.mdId);
try {
final Stream<MetadataRecord> stream = collectorService.collect(api)
final Stream<MetadataRecord> stream = this.collectorService.collect(this.api)
.map(xml -> {
try {
return DocumentHelper.parseText(xml);
@ -71,13 +71,13 @@ public class MdCollectRefreshJobNode extends AbstractJobNode {
.filter(filter)
.map(mdBuilder);
mdStoreSqlBackend.saveRecords(outputVersion, stream);
this.mdStoreSqlBackend.saveRecords(outputVersion, stream);
final long size = mdStoreSqlBackend.countRecords(outputVersion.getId());
final long size = this.mdStoreSqlBackend.countRecords(outputVersion.getId());
outputVersion.setSize(size);
mdstoreManager.commitVersion(outputVersion);
clientFactory.getClient(DsmClient.class).updateApiCollectionInfo(api.getId(), mdId, size);
this.clientFactory.getClient(DsmClient.class).updateApiCollectionInfo(this.api.getId(), this.mdId, size);
} catch (final Throwable e) {
mdstoreManager.abortVersion(outputVersion);
throw e;

View File

@ -27,7 +27,7 @@ import eu.dnetlib.wfs.annotations.WfNode;
import eu.dnetlib.wfs.utils.XpathFilterFactory;
import jakarta.transaction.Transactional;
@WfNode("md_transform")
@WfNode("md_transform_xslt")
public class MdTransformJobNode extends AbstractJobNode {
@WfInputParam
@ -42,7 +42,7 @@ public class MdTransformJobNode extends AbstractJobNode {
@WfInputParam
private String outputMdId;
@WfInputParam
@WfInputParam(optional = true)
private String filterXpath;
@WfInputParam
@ -61,18 +61,18 @@ public class MdTransformJobNode extends AbstractJobNode {
@Transactional
protected void execute() throws Exception {
final Predicate<Document> filter = XpathFilterFactory.createFilter(filterXpath);
final Predicate<Document> filter = XpathFilterFactory.createFilter(this.filterXpath);
final Map<String, Object> params = new HashMap<>();
// TODO (LOW PRIORITY): which params ?
final RecordTransformer<String, String> xslt = xsltTransformFactory.getTransformer(ruleId, params);
final MDStoreManagerClient mdstoreManager = clientFactory.getClient(MDStoreManagerClient.class);
final RecordTransformer<String, String> xslt = this.xsltTransformFactory.getTransformer(this.ruleId, params);
final MDStoreManagerClient mdstoreManager = this.clientFactory.getClient(MDStoreManagerClient.class);
final MDStoreVersion inputVersion = mdstoreManager.startReading(inputMdId);
final MDStoreVersion outputVersion = mdstoreManager.newVersion(outputMdId);
final MDStoreVersion inputVersion = mdstoreManager.startReading(this.inputMdId);
final MDStoreVersion outputVersion = mdstoreManager.newVersion(this.outputMdId);
try {
final Stream<MetadataRecord> stream = mdStoreSqlBackend.streamEntries(inputVersion)
final Stream<MetadataRecord> stream = this.mdStoreSqlBackend.streamEntries(inputVersion)
.filter(record -> {
try {
final Document doc = DocumentHelper.parseText(record.getBody());
@ -89,13 +89,13 @@ public class MdTransformJobNode extends AbstractJobNode {
return output;
});
mdStoreSqlBackend.saveRecords(outputVersion, stream);
this.mdStoreSqlBackend.saveRecords(outputVersion, stream);
final long size = mdStoreSqlBackend.countRecords(outputVersion.getId());
final long size = this.mdStoreSqlBackend.countRecords(outputVersion.getId());
outputVersion.setSize(size);
mdstoreManager.commitVersion(outputVersion);
clientFactory.getClient(DsmClient.class).updateApiAggregationInfo(api.getId(), outputMdId, size);
this.clientFactory.getClient(DsmClient.class).updateApiAggregationInfo(this.api.getId(), this.outputMdId, size);
} catch (final Throwable e) {
mdstoreManager.abortVersion(outputVersion);
throw e;

View File

@ -21,6 +21,11 @@
"description":"Override the default untilDate (now)",
"type":"DATE",
"required":"false"
},
{
"name":"nativeMdStoreFilterXpath",
"description":"Filter the collected records",
"required":"false"
}
],
"graph":[
@ -53,105 +58,14 @@
"condition":"mode != 'INCREMENTAL'"
},
{
"to":"PREPARE_INCREMENTAL",
"to":"COLLECT_INCREMENTAL",
"condition":"mode == 'INCREMENTAL'"
}
]
},
{
"name":"COLLECT_REFRESH",
"type":"collect",
"input":[
{
"name":"api",
"env":"api"
}
],
"output":[
{
"name":"outputStream",
"env":"collectStream"
},
{
"name": "xpathForRecordId",
"env" : "xpathForRecordId"
}
],
"arcs":[
{
"to":"MD_BUILDER"
}
]
},
{
"name":"PREPARE_INCREMENTAL",
"type":"findDateRangeForIncrementalHarvesting",
"input":[
{
"name":"api",
"env":"api"
},
{
"name":"from",
"ref":"overrideFromDate"
},
{
"name":"until",
"ref":"overrideUntilDate"
}
],
"output":[
{
"name":"from",
"env":"fromDate"
},
{
"name":"until",
"env":"untilDate"
}
],
"arcs":[
{
"to":"COLLECT_INCREMENTAL"
}
]
},
{
"name":"COLLECT_INCREMENTAL",
"type":"dateRangeCollect",
"input":[
{
"name":"api",
"env":"api"
},
{
"name":"fromDate",
"env":"fromDate"
},
{
"name":"untilDate",
"env":"untilDate"
}
],
"output":[
{
"name":"outputStream",
"env":"collectStream"
},
{
"name": "xpathForRecordId",
"env" : "xpathForRecordId"
}
],
"arcs":[
{
"to":"MD_BUILDER"
}
]
},
{
"name":"MD_BUILDER",
"type":"openaireMdBuilder",
"type":"md_collect_refresh",
"input":[
{
"name":"ds",
@ -161,104 +75,49 @@
"name":"api",
"env":"api"
},
{
"name":"inputStream",
"env":"collectStream"
},
{
"name":"inferred",
"value":"false"
},
{
"name":"deletedbyinference",
"value":"false"
},
{
"name":"inferenceprovenance",
"value":""
},
{
"name":"trust",
"value":"0.9"
},
{
"name":"provenanceactionclassname",
"value":"sysimport:crosswalk:repository"
},
{
"name":"provenanceactionclassid",
"value":"sysimport:crosswalk:repository"
}
],
"output":[
{
"name":"outputStream",
"env":"mdBuilderStream"
}
],
"arcs":[
{
"to":"STORE"
}
]
},
{
"name":"STORE",
"type":"feedMdStore",
"input":[
{
"name":"mdId",
"ref":"nativeMdStoreId"
},
{
"name":"inputStream",
"env":"mdBuilderStream"
},
{
"name":"storingType",
"ref":"mode"
"name":"filterXpath",
"ref":"nativeMdStoreFilterXpath"
}
],
"output":[
{
"name":"total",
"env": "total"
}
],
"arcs":[
{
"to":"UPDATE_INFO"
}
]
"output":[],
"arcs":[]
},
{
"name":"UPDATE_INFO",
"type":"updateApiExtraFields",
"name":"COLLECT_INCREMENTAL",
"type":"md_collect_incremental",
"input":[
{
"name":"ds",
"env":"ds"
},
{
"name":"api",
"env":"api"
},
{
"name":"infoType",
"value":"Collect"
},
{
"name":"mdId",
"ref":"nativeMdStoreId"
"env":"nativeMdStoreId"
},
{
"name":"total",
"env":"total"
"name":"filterXpath",
"env":"nativeMdStoreFilterXpath"
},
{
"name":"overrideFromDate",
"env":"overrideFromDate"
},
{
"name":"overrideUntilDate",
"env":"overrideUntilDate"
}
],
"output":[
{
"name":"api",
"env": "api"
}
]
"output":[],
"arcs":[]
}
]
}

View File

@ -7,6 +7,10 @@
{
"name":"cleanedMdStoreId",
"description":"Cleaned Metadata Store ID"
},
{
"name":"xsltRuleId",
"description":"Transformation Rule ID"
}
],
"graph":[
@ -25,27 +29,6 @@
"env":"api"
}
],
"arcs":[
{
"to":"FETCH_MDSTORE"
}
]
},
{
"name":"FETCH_MDSTORE",
"type":"fetchMdStore",
"input":[
{
"name":"mdId",
"ref":"nativeMdStoreId"
}
],
"output":[
{
"name":"outputStream",
"env":"origStream"
}
],
"arcs":[
{
"to":"TRANSFORM"
@ -54,102 +37,35 @@
},
{
"name":"TRANSFORM",
"type":"applyXslt",
"type":"md_transform_xslt",
"input":[
{
"name":"ruleId",
"ref":"ruleId"
"name":"ds",
"env":"ds"
},
{
"name":"inputStream",
"env":"origStream"
}
],
"output":[
{
"name":"outputStream",
"env":"transformStream"
}
],
"arcs":[
{
"to":"STORE"
}
]
},
{
"name":"STORE",
"type":"feedMdStore",
"input":[
{
"name":"mdId",
"ref":"cleanedMdStoreId"
},
{
"name":"inputStream",
"env":"transformStream"
},
{
"name":"storingType",
"value":"REFRESH"
}
],
"output":[
{
"name":"total",
"env": "total"
}
],
"arcs":[
{
"to":"UPDATE_INFO"
}
]
},
{
"name":"UPDATE_INFO",
"type":"updateApiExtraFields",
"input":[
{
"name":"api",
"env":"api"
},
{
"name":"infoType",
"value":"Transform"
"name":"inputMdId",
"ref":"nativeMdStoreId"
},
{
"name":"mdId",
"name":"outputMdId",
"ref":"cleanedMdStoreId"
},
{
"name":"total",
"env":"total"
"name":"ruleId",
"ref":"xsltRuleId"
},
{
"name":"filterXpath",
"ref":"nativeMdStoreFilterXpath"
}
]
],
"output":[],
"arcs":[]
}
]
}