collectors

This commit is contained in:
Michele Artini 2023-04-27 12:50:34 +02:00
parent 52cde91251
commit b8e905dc64
10 changed files with 156 additions and 20 deletions

View File

@ -0,0 +1,32 @@
package eu.dnetlib.data.collect;
import java.time.LocalDateTime;
import java.util.List;
import java.util.stream.Stream;
import org.springframework.stereotype.Service;
import eu.dnetlib.data.mdstore.model.records.MetadataRecord;
import eu.dnetlib.dsm.model.Api;
@Service
public class CollectorService {
private List<DnetCollector> collectors;
public Stream<MetadataRecord> collect(final Api api) {
return collectors.stream()
.filter(coll -> coll.protocol().equalsIgnoreCase(api.getProtocol()))
.findFirst()
.map(coll -> coll.collect(api.getBaseurl(), api.getApiParams()))
.orElseThrow();
}
public Stream<MetadataRecord> collect(final Api api, final LocalDateTime from, final LocalDateTime until) {
return collectors.stream()
.filter(coll -> coll.protocol().equalsIgnoreCase(api.getProtocol()))
.findFirst()
.map(coll -> coll.collect(api.getBaseurl(), api.getApiParams(), from, until))
.orElseThrow();
}
}

View File

@ -0,0 +1,17 @@
package eu.dnetlib.data.collect;
import java.time.LocalDateTime;
import java.util.Set;
import java.util.stream.Stream;
import eu.dnetlib.data.mdstore.model.records.MetadataRecord;
import eu.dnetlib.dsm.model.ApiParam;
public interface DnetCollector {
Stream<MetadataRecord> collect(String baseUrl, Set<ApiParam> params);
Stream<MetadataRecord> collect(String baseUrl, Set<ApiParam> params, LocalDateTime from, LocalDateTime until);
String protocol();
}

View File

@ -0,0 +1,29 @@
package eu.dnetlib.data.collect;
import java.time.LocalDateTime;
import java.util.Set;
import java.util.stream.Stream;
import eu.dnetlib.data.mdstore.model.records.MetadataRecord;
import eu.dnetlib.dsm.model.ApiParam;
public class OaiCollector implements DnetCollector {
@Override
public String protocol() {
return "OAI";
}
@Override
public Stream<MetadataRecord> collect(final String baseUrl, final Set<ApiParam> params) {
// TODO Auto-generated method stub
return null;
}
@Override
public Stream<MetadataRecord> collect(final String baseUrl, final Set<ApiParam> params, final LocalDateTime from, final LocalDateTime until) {
// TODO Auto-generated method stub
return null;
}
}

View File

@ -1,9 +1,13 @@
package eu.dnetlib.manager.wf.nodes.aggregation;
import java.time.LocalDateTime;
import java.util.stream.Stream;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.data.collect.CollectorService;
import eu.dnetlib.data.mdstore.model.records.MetadataRecord;
import eu.dnetlib.dsm.DsmService;
import eu.dnetlib.dsm.model.Api;
import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfNode;
import eu.dnetlib.manager.wf.nodes.stream.StreamSupplierNode;
@ -17,16 +21,21 @@ public class CollectNode extends StreamSupplierNode<MetadataRecord> {
@WfInputParam
private String apiId;
@WfInputParam
private LocalDateTime from;
@Autowired
private DsmService dsmService;
@WfInputParam
private LocalDateTime until;
@Autowired
private CollectorService collectorService;
@Override
protected Stream<MetadataRecord> prepareStream() {
// TODO Auto-generated method stub
return null;
final Api api = dsmService.getApis(dsId)
.stream()
.filter(a -> apiId.equals(a.getId()))
.findFirst()
.orElseThrow();
return collectorService.collect(api);
}
}

View File

@ -1,8 +1,14 @@
package eu.dnetlib.manager.wf.nodes.aggregation;
import java.time.LocalDateTime;
import java.util.stream.Stream;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.data.collect.CollectorService;
import eu.dnetlib.data.mdstore.model.records.MetadataRecord;
import eu.dnetlib.dsm.DsmService;
import eu.dnetlib.dsm.model.Api;
import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfNode;
import eu.dnetlib.manager.wf.nodes.stream.StreamSupplierNode;
@ -16,10 +22,28 @@ public class DateRangeCollectNode extends StreamSupplierNode<MetadataRecord> {
@WfInputParam
private String apiId;
@WfInputParam
private LocalDateTime from;
@WfInputParam
private LocalDateTime until;
@Autowired
private DsmService dsmService;
@Autowired
private CollectorService collectorService;
@Override
protected Stream<MetadataRecord> prepareStream() {
// TODO Auto-generated method stub
return null;
final Api api = dsmService.getApis(dsId)
.stream()
.filter(a -> apiId.equals(a.getId()))
.findFirst()
.orElseThrow();
return collectorService.collect(api, from, until);
}
}

View File

@ -2,6 +2,10 @@ package eu.dnetlib.manager.wf.nodes.aggregation;
import java.util.stream.Stream;
import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.data.mdstore.MDStoreService;
import eu.dnetlib.data.mdstore.model.MDStoreVersion;
import eu.dnetlib.data.mdstore.model.records.MetadataRecord;
import eu.dnetlib.manager.wf.annotations.WfInputParam;
import eu.dnetlib.manager.wf.annotations.WfNode;
@ -13,10 +17,13 @@ public class MdStoreFetcherNode extends StreamSupplierNode<MetadataRecord> {
@WfInputParam
private String mdId;
@Autowired
private MDStoreService mdStoreService;
@Override
protected Stream<MetadataRecord> prepareStream() {
// TODO Auto-generated method stub
return null;
protected Stream<MetadataRecord> prepareStream() throws Exception {
final MDStoreVersion version = mdStoreService.startReading(mdId);
return mdStoreService.streamVersionRecords(version.getId());
}
}

View File

@ -10,11 +10,16 @@ public abstract class StreamConsumerNode<T> extends AbstractJobNode {
@WfInputParam
private Stream<T> inputStream;
abstract protected void consumeStream(Stream<T> stream);
abstract protected void consumeStream(Stream<T> stream) throws Exception;
@Override
protected final void execute() {
consumeStream(inputStream);
try {
consumeStream(inputStream);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@ -14,11 +14,16 @@ public abstract class StreamFilterNode<T> extends AbstractJobNode {
@WfOutputParam
private Stream<T> outputStream;
abstract protected Stream<T> filterStream(Stream<T> input);
abstract protected Stream<T> filterStream(Stream<T> input) throws Exception;
@Override
protected final void execute() {
outputStream = filterStream(inputStream);
try {
outputStream = filterStream(inputStream);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@ -14,11 +14,15 @@ public abstract class StreamMapperNode<T, K> extends AbstractJobNode {
@WfOutputParam
private Stream<K> outputStream;
abstract protected Stream<K> mapStream(Stream<T> input);
abstract protected Stream<K> mapStream(Stream<T> input) throws Exception;
@Override
protected void execute() {
outputStream = mapStream(inputStream);
try {
outputStream = mapStream(inputStream);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@ -10,11 +10,15 @@ public abstract class StreamSupplierNode<T> extends AbstractJobNode {
@WfOutputParam
private Stream<T> outputStream;
abstract protected Stream<T> prepareStream();
abstract protected Stream<T> prepareStream() throws Exception;
@Override
protected void execute() {
outputStream = prepareStream();
try {
outputStream = prepareStream();
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
}