diff --git a/frontends/dnet-is-application/src/app/wf-confs/wf-conf-single.html b/frontends/dnet-is-application/src/app/wf-confs/wf-conf-single.html
index 1877954c..71ea0db3 100644
--- a/frontends/dnet-is-application/src/app/wf-confs/wf-conf-single.html
+++ b/frontends/dnet-is-application/src/app/wf-confs/wf-conf-single.html
@@ -1,72 +1,74 @@
-
Workflow Configuration does not exist
diff --git a/libs/dnet-data-services/pom.xml b/libs/dnet-data-services/pom.xml
index 4b5978fc..f6049954 100644
--- a/libs/dnet-data-services/pom.xml
+++ b/libs/dnet-data-services/pom.xml
@@ -63,6 +63,7 @@
spring-boot-starter-test
test
+
diff --git a/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/MDStoreService.java b/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/MDStoreService.java
index 41adb299..59820cb6 100644
--- a/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/MDStoreService.java
+++ b/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/MDStoreService.java
@@ -10,6 +10,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.transaction.Transactional;
@@ -241,6 +242,12 @@ public class MDStoreService {
return selectBackend(md.getType()).listEntries(v, limit);
}
+ public Stream
streamVersionRecords(final String versionId) throws MDStoreManagerException {
+ final MDStoreVersion v = mdstoreVersionRepository.findById(versionId).orElseThrow(() -> new MDStoreManagerException("Version not found"));
+ final MDStore md = mdstoreRepository.findById(v.getMdstore()).orElseThrow(() -> new MDStoreManagerException("MDStore not found"));
+ return selectBackend(md.getType()).streamEntries(v);
+ }
+
public MDStore newMDStore(
final String format,
final String layout,
diff --git a/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/MDStoreStreamReader.java b/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/MDStoreStreamReader.java
new file mode 100644
index 00000000..b56adf38
--- /dev/null
+++ b/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/MDStoreStreamReader.java
@@ -0,0 +1,90 @@
+package eu.dnetlib.data.mdstore;
+
+import java.util.Iterator;
+import java.util.Spliterators;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import eu.dnetlib.data.mdstore.model.MDStoreWithInfo;
+import eu.dnetlib.data.mdstore.model.MetadataRecord;
+import eu.dnetlib.errors.MDStoreManagerException;
+
+@Service
+public class MDStoreStreamReader {
+
+ @Autowired
+ private MDStoreService mdStoreService;
+
+ private enum Status {
+ PREPARED,
+ READING,
+ COMPLETED,
+ FAILED
+ }
+
+ // TODO the failure could be throw consuming the stream, so it is necessary to perform a refactoring of this method
+ public Stream prepareMDStoreStream(final String mdstoreId) throws MDStoreManagerException {
+
+ final MDStoreWithInfo mdstore = mdStoreService.findMdStore(mdstoreId);
+ final Iterator innerIterator = mdStoreService.streamVersionRecords(mdstore.getCurrentVersion()).iterator();
+
+ return StreamSupport.stream(Spliterators.spliteratorUnknownSize(new Iterator<>() {
+
+ private Status status = Status.PREPARED;
+
+ @Override
+ public boolean hasNext() {
+ if (innerIterator.hasNext()) {
+ return true;
+ } else {
+ try {
+ complete();
+ return false;
+ } catch (final MDStoreManagerException e) {
+ throw new RuntimeException("Error reading mdstore", e);
+ }
+ }
+ }
+
+ @Override
+ public MetadataRecord next() {
+ try {
+ verifyStart();
+ return innerIterator.next();
+ } catch (final Throwable e) {
+ try {
+ fail();
+ throw new RuntimeException("Error reading mdstore", e);
+ } catch (final MDStoreManagerException e1) {
+ throw new RuntimeException("Error reading mdstore", e);
+ }
+ }
+ }
+
+ private synchronized void verifyStart() throws MDStoreManagerException {
+ if (status == Status.PREPARED) {
+ status = Status.READING;
+ mdStoreService.startReading(mdstoreId);
+ }
+ }
+
+ private synchronized void complete() throws MDStoreManagerException {
+ if (status == Status.PREPARED || status == Status.READING) {
+ status = Status.COMPLETED;
+ mdStoreService.endReading(mdstoreId);
+ }
+ }
+
+ private synchronized void fail() throws MDStoreManagerException {
+ if (status == Status.PREPARED || status == Status.READING) {
+ status = Status.FAILED;
+ mdStoreService.endReading(mdstoreId);
+ }
+ }
+ }, 0), false);
+ }
+
+}
diff --git a/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/backends/DefaultBackend.java b/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/backends/DefaultBackend.java
index 1ce1122b..0edf87e3 100644
--- a/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/backends/DefaultBackend.java
+++ b/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/backends/DefaultBackend.java
@@ -5,6 +5,7 @@ import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
+import java.util.stream.Stream;
import org.springframework.stereotype.Service;
@@ -33,6 +34,11 @@ public class DefaultBackend implements MDStoreBackend {
return new ArrayList<>();
}
+ @Override
+ public Stream streamEntries(final MDStoreVersion version) throws MDStoreManagerException {
+ return Stream.empty();
+ }
+
@Override
public Set listInternalFiles(final MDStoreVersion version) throws MDStoreManagerException {
return new LinkedHashSet<>();
diff --git a/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/backends/HdfsBackend.java b/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/backends/HdfsBackend.java
index 58fb1b0d..21cca761 100644
--- a/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/backends/HdfsBackend.java
+++ b/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/backends/HdfsBackend.java
@@ -3,6 +3,7 @@ package eu.dnetlib.data.mdstore.backends;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
@@ -99,4 +100,14 @@ public class HdfsBackend implements MDStoreBackend {
}
}
+ @Override
+ public Stream streamEntries(final MDStoreVersion version) throws MDStoreManagerException {
+ final String path = version.getParams().getOrDefault("hdfs_path", "").toString();
+ if (StringUtils.isNotBlank(path)) {
+ return hdfsClient.streamParquetFiles(path + "/store", MetadataRecord.class);
+ } else {
+ throw new MDStoreManagerException("hdfs path is missing");
+ }
+ }
+
}
diff --git a/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/backends/MDStoreBackend.java b/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/backends/MDStoreBackend.java
index 4293a698..6dd8d718 100644
--- a/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/backends/MDStoreBackend.java
+++ b/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/backends/MDStoreBackend.java
@@ -2,6 +2,7 @@ package eu.dnetlib.data.mdstore.backends;
import java.util.List;
import java.util.Set;
+import java.util.stream.Stream;
import eu.dnetlib.data.mdstore.model.MDStore;
import eu.dnetlib.data.mdstore.model.MDStoreVersion;
@@ -20,6 +21,8 @@ public interface MDStoreBackend {
List listEntries(MDStoreVersion version, long limit) throws MDStoreManagerException;
+ Stream streamEntries(MDStoreVersion version) throws MDStoreManagerException;
+
Set listInternalFiles(MDStoreVersion version) throws MDStoreManagerException;
Set fixInconsistencies(boolean delete) throws MDStoreManagerException;
diff --git a/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/backends/MockBackend.java b/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/backends/MockBackend.java
index 5b1a1d84..e2a04bd9 100644
--- a/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/backends/MockBackend.java
+++ b/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/backends/MockBackend.java
@@ -6,6 +6,7 @@ import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
+import java.util.stream.Stream;
import org.springframework.stereotype.Service;
@@ -59,6 +60,11 @@ public class MockBackend implements MDStoreBackend {
return list;
}
+ @Override
+ public Stream streamEntries(final MDStoreVersion version) throws MDStoreManagerException {
+ return listEntries(version, 1000).stream();
+ }
+
@Override
public Set listInternalFiles(final MDStoreVersion version) throws MDStoreManagerException {
return new LinkedHashSet<>(Arrays.asList("file1", "file2", "file3", "file4"));
diff --git a/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/hadoop/HdfsClient.java b/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/hadoop/HdfsClient.java
index fc3acf9b..84a05a8f 100644
--- a/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/hadoop/HdfsClient.java
+++ b/libs/dnet-data-services/src/main/java/eu/dnetlib/data/mdstore/hadoop/HdfsClient.java
@@ -9,6 +9,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
+import java.util.stream.Stream;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.logging.Log;
@@ -144,6 +145,52 @@ public class HdfsClient {
return list;
}
+ public Stream streamParquetFiles(final String path, final Class clazz) throws MDStoreManagerException {
+
+ // TODO Re-implement the method without list
+ final List list = new ArrayList<>();
+
+ final Configuration conf = conf();
+
+ final Set fields = new LinkedHashSet<>();
+
+ for (final String f : listContent(path, HdfsClient::isParquetFile)) {
+
+ log.info("Opening parquet file: " + f);
+
+ try (final ParquetReader reader = AvroParquetReader. builder(new Path(f)).withConf(conf).build()) {
+ log.debug("File parquet OPENED");
+
+ final ObjectMapper mapper = new ObjectMapper();
+
+ GenericRecord rec = null;
+ while ((rec = reader.read()) != null) {
+ if (fields.isEmpty()) {
+ rec.getSchema().getFields().forEach(field -> fields.add(field.name()));
+ log.debug("Found schema: " + fields);
+ }
+ final Map map = new LinkedHashMap<>();
+ for (final String field : fields) {
+ final Object v = rec.get(field);
+ map.put(field, v != null ? v.toString() : "");
+ }
+
+ list.add(mapper.convertValue(map, clazz));
+
+ log.debug("added record");
+ }
+ } catch (final FileNotFoundException e) {
+ log.warn("Missing path: " + hdfsBasePath);
+ } catch (final Throwable e) {
+ log.error("Error reading parquet file: " + f, e);
+ throw new MDStoreManagerException("Error reading parquet file: " + f, e);
+ }
+ }
+
+ return list.stream();
+
+ }
+
/*
*
* private String printGroup(final Group g) { final StringWriter sw = new StringWriter();
diff --git a/libs/dnet-is-common/src/main/java/eu/dnetlib/data/mdstore/model/MDStoreType.java b/libs/dnet-is-common/src/main/java/eu/dnetlib/data/mdstore/model/MDStoreType.java
index ecc7e2c3..5a339cf5 100644
--- a/libs/dnet-is-common/src/main/java/eu/dnetlib/data/mdstore/model/MDStoreType.java
+++ b/libs/dnet-is-common/src/main/java/eu/dnetlib/data/mdstore/model/MDStoreType.java
@@ -2,5 +2,6 @@ package eu.dnetlib.data.mdstore.model;
public enum MDStoreType {
HDFS,
- MOCK
+ MOCK,
+ SQL_DB
}