mdstore: add methods

This commit is contained in:
Michele Artini 2023-05-04 11:20:39 +02:00
parent 5d949e5aa0
commit 0280e506c9
9 changed files with 131 additions and 62 deletions

View File

@ -188,18 +188,6 @@ public class MDStoreService {
return v;
}
@Transactional
public MDStoreVersion commitMdStoreVersion(final String versionId, final long size) throws MDStoreManagerException {
final MDStoreVersion v = mdstoreVersionRepository.findById(versionId).orElseThrow(() -> new MDStoreManagerException("Invalid version: " + versionId));
mdstoreCurrentVersionRepository.save(MDStoreCurrentVersion.newInstance(v));
v.setWriting(false);
v.setSize(size);
v.setLastUpdate(LocalDateTime.now());
mdstoreVersionRepository.save(v);
return v;
}
public synchronized void deleteExpiredVersions() {
log.info("Deleting expired version...");
for (final String versionId : listExpiredVersions()) {
@ -299,4 +287,26 @@ public class MDStoreService {
}
}
public void addRecord(final String newVersion, final MDStoreType type, final MetadataRecord record) {
selectBackend(type).addRecord(record);
}
public long commitMdStoreVersion(final String versionId, final MDStoreType mdStoreType) throws MDStoreManagerException {
final long count = selectBackend(mdStoreType).countRecords(versionId);
commitMdStoreVersion(versionId, count);
return count;
}
@Transactional
public MDStoreVersion commitMdStoreVersion(final String versionId, final long size) throws MDStoreManagerException {
final MDStoreVersion v = mdstoreVersionRepository.findById(versionId).orElseThrow(() -> new MDStoreManagerException("Invalid version: " + versionId));
mdstoreCurrentVersionRepository.save(MDStoreCurrentVersion.newInstance(v));
v.setWriting(false);
v.setSize(size);
v.setLastUpdate(LocalDateTime.now());
mdstoreVersionRepository.save(v);
return v;
}
}

View File

@ -49,4 +49,12 @@ public class DefaultBackend implements MDStoreBackend {
return new HashSet<>();
}
@Override
public void addRecord(final MetadataRecord record) {}
@Override
public long countRecords(final String versionId) {
return 0;
}
}

View File

@ -109,4 +109,14 @@ public class HdfsBackend implements MDStoreBackend {
}
}
@Override
public void addRecord(final MetadataRecord record) {
throw new RuntimeException("NOT_IMPLEMENTED");
}
@Override
public long countRecords(final String versionId) {
throw new RuntimeException("NOT_IMPLEMENTED");
}
}

View File

@ -27,4 +27,8 @@ public interface MDStoreBackend {
Set<String> fixInconsistencies(boolean delete) throws MDStoreManagerException;
void addRecord(MetadataRecord record);
long countRecords(String versionId);
}

View File

@ -76,4 +76,14 @@ public class MockBackend implements MDStoreBackend {
return new LinkedHashSet<>(Arrays.asList("1", "2", "3", "4"));
}
@Override
public void addRecord(final MetadataRecord record) {
throw new RuntimeException("NOT_IMPLEMENTED");
}
@Override
public long countRecords(final String versionId) {
throw new RuntimeException("NOT_IMPLEMENTED");
}
}

View File

@ -1,8 +1,12 @@
package eu.dnetlib.manager.wf.nodes.aggregation;
import java.util.stream.Stream;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.data.mdstore.MDStoreService;
import eu.dnetlib.data.mdstore.model.MDStoreType;
import eu.dnetlib.data.mdstore.model.MDStoreWithInfo;
import eu.dnetlib.data.mdstore.model.records.MetadataRecord;
import eu.dnetlib.errors.MDStoreManagerException;
import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfNode;
import eu.dnetlib.manager.wf.annotations.WfOutputParam;
@ -20,11 +24,33 @@ public class MdStoreFeederNode extends StreamConsumerNode<MetadataRecord> {
@WfOutputParam
private long total;
@Override
protected void consumeStream(final Stream<MetadataRecord> stream) {
// TODO Auto-generated method stub
total = 0;
@Autowired
private MDStoreService mdStoreService;
private String newVersion;
private MDStoreType mdStoreType;
@Override
protected void init() throws Exception {
final MDStoreWithInfo md = mdStoreService.findMdStore(mdId);
mdStoreType = md.getType();
if (storingType.equals(MDStoreService.REFRESH_MODE)) {
newVersion = mdStoreService.prepareMdStoreVersion(mdId).getId();
} else if (storingType.equals(MDStoreService.INCREMENTAL_MODE)) {
newVersion = md.getCurrentVersion();
} else {
throw new MDStoreManagerException("Invalid mode " + storingType);
}
}
@Override
protected void consumeRecord(final MetadataRecord input) throws Exception {
mdStoreService.addRecord(newVersion, mdStoreType, input);
}
@Override
protected void complete() throws Exception {
total = mdStoreService.commitMdStoreVersion(newVersion, mdStoreType);
}
}

View File

@ -1,13 +1,19 @@
package eu.dnetlib.manager.wf.nodes.stream;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.dnetlib.manager.wf.workflows.util.SimpleCallable;
public class DnetStream<T> {
private static final Log log = LogFactory.getLog(DnetStream.class);
private final Stream<T> stream;
private final SimpleCallable before;
private final SimpleCallable after;
@ -20,28 +26,6 @@ public class DnetStream<T> {
this.abort = abort;
}
public Stream<T> getStream() {
return stream;
}
public void startReading() throws Exception {
if (before != null) {
before.call();
}
}
public void endReading() throws Exception {
if (after != null) {
after.call();
}
}
public void abortReading() throws Exception {
if (abort != null) {
abort.call();
}
}
public DnetStream<T> filter(final Predicate<T> predicate) {
return new DnetStream<>(stream.filter(predicate), before, after, abort);
}
@ -50,4 +34,28 @@ public class DnetStream<T> {
return new DnetStream<>(stream.map(mapper), before, after, abort);
}
public void consume(final Consumer<T> action) {
try {
if (before != null) {
before.call();
}
stream.forEach(action);
if (after != null) {
after.call();
}
} catch (final Throwable e) {
log.error("Error reading stream: " + e);
try {
if (abort != null) {
abort.call();
}
} catch (final Exception e1) {
log.error("Error aborting stream reading: " + e1);
}
throw new RuntimeException(e);
}
}
}

View File

@ -1,39 +1,34 @@
package eu.dnetlib.manager.wf.nodes.stream;
import java.util.stream.Stream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.nodes.AbstractJobNode;
public abstract class StreamConsumerNode<T> extends AbstractJobNode {
private static final Log log = LogFactory.getLog(StreamConsumerNode.class);
@WfInputParam
private DnetStream<T> inputStream;
abstract protected void consumeStream(Stream<T> stream) throws Exception;
abstract protected void init() throws Exception;
abstract protected void consumeRecord(T input) throws Exception;
abstract protected void complete() throws Exception;
@Override
protected final void execute() {
try {
inputStream.startReading();
consumeStream(inputStream.getStream());
inputStream.endReading();
} catch (final Throwable e) {
log.error("Error reading stream: " + e);
try {
inputStream.abortReading();
} catch (final Exception e1) {
log.error("Error aborting stream reading: " + e1);
}
init();
inputStream.consume(x -> {
try {
consumeRecord(x);
} catch (final Exception e) {
throw new RuntimeException(e);
}
});
complete();
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@ -27,9 +27,7 @@ public abstract class StreamFilterNode<T> extends AbstractJobNode {
throw new RuntimeException(e);
}
});
} catch (
final Exception e) {
} catch (final Exception e) {
throw new RuntimeException(e);
}