From 0280e506c9dd3cbc2bd23c8323cb5e76e3484cb8 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Thu, 4 May 2023 11:20:39 +0200 Subject: [PATCH] mdstore: add methods --- .../dnetlib/data/mdstore/MDStoreService.java | 34 +++++++----- .../data/mdstore/backends/DefaultBackend.java | 8 +++ .../data/mdstore/backends/HdfsBackend.java | 10 ++++ .../data/mdstore/backends/MDStoreBackend.java | 4 ++ .../data/mdstore/backends/MockBackend.java | 10 ++++ .../nodes/aggregation/MdStoreFeederNode.java | 36 +++++++++++-- .../manager/wf/nodes/stream/DnetStream.java | 52 +++++++++++-------- .../wf/nodes/stream/StreamConsumerNode.java | 35 ++++++------- .../wf/nodes/stream/StreamFilterNode.java | 4 +- 9 files changed, 131 insertions(+), 62 deletions(-) diff --git a/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/MDStoreService.java b/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/MDStoreService.java index e04a28c5..85a6154b 100644 --- a/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/MDStoreService.java +++ b/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/MDStoreService.java @@ -188,18 +188,6 @@ public class MDStoreService { return v; } - @Transactional - public MDStoreVersion commitMdStoreVersion(final String versionId, final long size) throws MDStoreManagerException { - final MDStoreVersion v = mdstoreVersionRepository.findById(versionId).orElseThrow(() -> new MDStoreManagerException("Invalid version: " + versionId)); - mdstoreCurrentVersionRepository.save(MDStoreCurrentVersion.newInstance(v)); - v.setWriting(false); - v.setSize(size); - v.setLastUpdate(LocalDateTime.now()); - mdstoreVersionRepository.save(v); - - return v; - } - public synchronized void deleteExpiredVersions() { log.info("Deleting expired version..."); for (final String versionId : listExpiredVersions()) { @@ -299,4 +287,26 @@ public class MDStoreService { } } + public void addRecord(final String newVersion, final MDStoreType type, final MetadataRecord record) { + selectBackend(type).addRecord(record); + } + + public long commitMdStoreVersion(final String versionId, final MDStoreType mdStoreType) throws MDStoreManagerException { + final long count = selectBackend(mdStoreType).countRecords(versionId); + commitMdStoreVersion(versionId, count); + return count; + } + + @Transactional + public MDStoreVersion commitMdStoreVersion(final String versionId, final long size) throws MDStoreManagerException { + final MDStoreVersion v = mdstoreVersionRepository.findById(versionId).orElseThrow(() -> new MDStoreManagerException("Invalid version: " + versionId)); + mdstoreCurrentVersionRepository.save(MDStoreCurrentVersion.newInstance(v)); + v.setWriting(false); + v.setSize(size); + v.setLastUpdate(LocalDateTime.now()); + mdstoreVersionRepository.save(v); + + return v; + } + } diff --git a/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/backends/DefaultBackend.java b/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/backends/DefaultBackend.java index f3c34490..8f449d78 100644 --- a/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/backends/DefaultBackend.java +++ b/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/backends/DefaultBackend.java @@ -49,4 +49,12 @@ public class DefaultBackend implements MDStoreBackend { return new HashSet<>(); } + @Override + public void addRecord(final MetadataRecord record) {} + + @Override + public long countRecords(final String versionId) { + return 0; + } + } diff --git a/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/backends/HdfsBackend.java b/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/backends/HdfsBackend.java index 6d762f25..91855ffc 100644 --- a/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/backends/HdfsBackend.java +++ b/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/backends/HdfsBackend.java @@ -109,4 +109,14 @@ public class HdfsBackend implements MDStoreBackend { } } + @Override + public void addRecord(final MetadataRecord record) { + throw new RuntimeException("NOT_IMPLEMENTED"); + } + + @Override + public long countRecords(final String versionId) { + throw new RuntimeException("NOT_IMPLEMENTED"); + } + } diff --git a/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/backends/MDStoreBackend.java b/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/backends/MDStoreBackend.java index 5e10b871..78832a68 100644 --- a/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/backends/MDStoreBackend.java +++ b/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/backends/MDStoreBackend.java @@ -27,4 +27,8 @@ public interface MDStoreBackend { Set fixInconsistencies(boolean delete) throws MDStoreManagerException; + void addRecord(MetadataRecord record); + + long countRecords(String versionId); + } diff --git a/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/backends/MockBackend.java b/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/backends/MockBackend.java index 1302ca14..1e8e3907 100644 --- a/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/backends/MockBackend.java +++ b/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/backends/MockBackend.java @@ -76,4 +76,14 @@ public class MockBackend implements MDStoreBackend { return new LinkedHashSet<>(Arrays.asList("1", "2", "3", "4")); } + @Override + public void addRecord(final MetadataRecord record) { + throw new RuntimeException("NOT_IMPLEMENTED"); + } + + @Override + public long countRecords(final String versionId) { + throw new RuntimeException("NOT_IMPLEMENTED"); + } + } diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/aggregation/MdStoreFeederNode.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/aggregation/MdStoreFeederNode.java index 315ca7c4..181eb108 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/aggregation/MdStoreFeederNode.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/aggregation/MdStoreFeederNode.java @@ -1,8 +1,12 @@ package eu.dnetlib.manager.wf.nodes.aggregation; -import java.util.stream.Stream; +import org.springframework.beans.factory.annotation.Autowired; +import eu.dnetlib.data.mdstore.MDStoreService; +import eu.dnetlib.data.mdstore.model.MDStoreType; +import eu.dnetlib.data.mdstore.model.MDStoreWithInfo; import eu.dnetlib.data.mdstore.model.records.MetadataRecord; +import eu.dnetlib.errors.MDStoreManagerException; import eu.dnetlib.manager.wf.annotations.WfInputParam; import eu.dnetlib.manager.wf.annotations.WfNode; import eu.dnetlib.manager.wf.annotations.WfOutputParam; @@ -20,11 +24,33 @@ public class MdStoreFeederNode extends StreamConsumerNode { @WfOutputParam private long total; - @Override - protected void consumeStream(final Stream stream) { - // TODO Auto-generated method stub - total = 0; + @Autowired + private MDStoreService mdStoreService; + private String newVersion; + private MDStoreType mdStoreType; + + @Override + protected void init() throws Exception { + final MDStoreWithInfo md = mdStoreService.findMdStore(mdId); + mdStoreType = md.getType(); + + if (storingType.equals(MDStoreService.REFRESH_MODE)) { + newVersion = mdStoreService.prepareMdStoreVersion(mdId).getId(); + } else if (storingType.equals(MDStoreService.INCREMENTAL_MODE)) { + newVersion = md.getCurrentVersion(); + } else { + throw new MDStoreManagerException("Invalid mode " + storingType); + } } + @Override + protected void consumeRecord(final MetadataRecord input) throws Exception { + mdStoreService.addRecord(newVersion, mdStoreType, input); + } + + @Override + protected void complete() throws Exception { + total = mdStoreService.commitMdStoreVersion(newVersion, mdStoreType); + } } diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/stream/DnetStream.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/stream/DnetStream.java index e473acd5..1e97520e 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/stream/DnetStream.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/stream/DnetStream.java @@ -1,13 +1,19 @@ package eu.dnetlib.manager.wf.nodes.stream; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Stream; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + import eu.dnetlib.manager.wf.workflows.util.SimpleCallable; public class DnetStream { + private static final Log log = LogFactory.getLog(DnetStream.class); + private final Stream stream; private final SimpleCallable before; private final SimpleCallable after; @@ -20,28 +26,6 @@ public class DnetStream { this.abort = abort; } - public Stream getStream() { - return stream; - } - - public void startReading() throws Exception { - if (before != null) { - before.call(); - } - } - - public void endReading() throws Exception { - if (after != null) { - after.call(); - } - } - - public void abortReading() throws Exception { - if (abort != null) { - abort.call(); - } - } - public DnetStream filter(final Predicate predicate) { return new DnetStream<>(stream.filter(predicate), before, after, abort); } @@ -50,4 +34,28 @@ public class DnetStream { return new DnetStream<>(stream.map(mapper), before, after, abort); } + public void consume(final Consumer action) { + try { + if (before != null) { + before.call(); + } + + stream.forEach(action); + + if (after != null) { + after.call(); + } + } catch (final Throwable e) { + log.error("Error reading stream: " + e); + try { + if (abort != null) { + abort.call(); + } + } catch (final Exception e1) { + log.error("Error aborting stream reading: " + e1); + } + throw new RuntimeException(e); + } + } + } diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/stream/StreamConsumerNode.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/stream/StreamConsumerNode.java index 1f5bb97b..edee4618 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/stream/StreamConsumerNode.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/stream/StreamConsumerNode.java @@ -1,39 +1,34 @@ package eu.dnetlib.manager.wf.nodes.stream; -import java.util.stream.Stream; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - import eu.dnetlib.manager.wf.annotations.WfInputParam; import eu.dnetlib.manager.wf.nodes.AbstractJobNode; public abstract class StreamConsumerNode extends AbstractJobNode { - private static final Log log = LogFactory.getLog(StreamConsumerNode.class); - @WfInputParam private DnetStream inputStream; - abstract protected void consumeStream(Stream stream) throws Exception; + abstract protected void init() throws Exception; + + abstract protected void consumeRecord(T input) throws Exception; + + abstract protected void complete() throws Exception; @Override protected final void execute() { try { - inputStream.startReading(); - consumeStream(inputStream.getStream()); - inputStream.endReading(); - } catch (final Throwable e) { - log.error("Error reading stream: " + e); - try { - inputStream.abortReading(); - } catch (final Exception e1) { - log.error("Error aborting stream reading: " + e1); - } + init(); + inputStream.consume(x -> { + try { + consumeRecord(x); + } catch (final Exception e) { + throw new RuntimeException(e); + } + }); + complete(); + } catch (final Exception e) { throw new RuntimeException(e); - } - } } diff --git a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/stream/StreamFilterNode.java b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/stream/StreamFilterNode.java index 66d9e2b6..1ad06a36 100644 --- a/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/stream/StreamFilterNode.java +++ b/libs/dnet-wf-service/src/main/java/eu/dnetlib/manager/wf/nodes/stream/StreamFilterNode.java @@ -27,9 +27,7 @@ public abstract class StreamFilterNode extends AbstractJobNode { throw new RuntimeException(e); } }); - } catch ( - - final Exception e) { + } catch (final Exception e) { throw new RuntimeException(e); }