collector plugins

This commit is contained in:
Michele Artini 2023-10-12 14:56:24 +02:00
parent b17843f82d
commit 04d08abbc5
13 changed files with 156 additions and 109 deletions

View File

@ -12,6 +12,8 @@ import eu.dnetlib.common.clients.DnetServiceClientFactory;
import eu.dnetlib.common.clients.DsmClient;
import eu.dnetlib.common.mapping.cleaner.Cleaner;
import eu.dnetlib.common.mapping.cleaner.CleanerFactory;
import eu.dnetlib.domain.dsm.Api;
import eu.dnetlib.domain.dsm.Datasource;
import eu.dnetlib.domain.mdstore.records.MetadataRecord;
import eu.dnetlib.domain.mdstore.records.MetadataRecordImpl;
import eu.dnetlib.wfs.annotations.WfInputParam;
@ -24,10 +26,10 @@ import jakarta.transaction.Transactional;
public class MdCleanerJobNode extends AbstractJobNode {
@WfInputParam
private String dsId;
private Datasource ds;
@WfInputParam
private String apiId;
private Api api;
@WfInputParam
private String inputMdId;
@ -53,7 +55,6 @@ public class MdCleanerJobNode extends AbstractJobNode {
@Override
@Transactional
protected void execute() throws Exception {
final DsmClient dsm = clientFactory.getClient(DsmClient.class);
final Predicate<Document> filter = XpathFilterFactory.createFilter(filterXpath);
@ -79,7 +80,7 @@ public class MdCleanerJobNode extends AbstractJobNode {
})
.forEach(record -> mdstoreService.addRecord(outputMdId, record));
dsm.updateUpdateApiAggregationInfo(apiId, outputMdId, mdstoreService.getSize(outputMdId));
clientFactory.getClient(DsmClient.class).updateUpdateApiAggregationInfo(api.getId(), outputMdId, mdstoreService.getSize(outputMdId));
}

View File

@ -26,10 +26,10 @@ import jakarta.transaction.Transactional;
public class MdCollectIncrementalJobNode extends AbstractJobNode {
@WfInputParam
private String dsId;
private Datasource ds;
@WfInputParam
private String apiId;
private Api api;
@WfInputParam
private String mdId;
@ -55,10 +55,6 @@ public class MdCollectIncrementalJobNode extends AbstractJobNode {
@Override
@Transactional
protected void execute() throws Exception {
final DsmClient dsm = clientFactory.getClient(DsmClient.class);
final Datasource ds = dsm.findDs(dsId);
final Api api = dsm.findApi(apiId);
final LocalDateTime fromDate = overrideFromDate != null ? overrideFromDate : findLastCollDate(api);
final LocalDateTime untilDate = overrideUntilDate != null ? overrideUntilDate : null;
@ -78,7 +74,7 @@ public class MdCollectIncrementalJobNode extends AbstractJobNode {
.map(mdBuilder)
.forEach(record -> mdstoreService.addRecord(mdId, record));
dsm.updateUpdateApiCollectionInfo(api.getId(), mdId, mdstoreService.getSize(mdId));
clientFactory.getClient(DsmClient.class).updateUpdateApiCollectionInfo(api.getId(), mdId, mdstoreService.getSize(mdId));
}
private LocalDateTime findLastCollDate(final Api api) {

View File

@ -25,10 +25,10 @@ import jakarta.transaction.Transactional;
public class MdCollectRefreshJobNode extends AbstractJobNode {
@WfInputParam
private String dsId;
private Datasource ds;
@WfInputParam
private String apiId;
private Api api;
@WfInputParam
private String mdId;
@ -48,10 +48,6 @@ public class MdCollectRefreshJobNode extends AbstractJobNode {
@Override
@Transactional
protected void execute() throws Exception {
final DsmClient dsm = clientFactory.getClient(DsmClient.class);
final Datasource ds = dsm.findDs(dsId);
final Api api = dsm.findApi(apiId);
final Predicate<Document> filter = XpathFilterFactory.createFilter(filterXpath);
final Function<Document, MetadataRecord> mdBuilder = MdBuilderFactory.createMdBuilder(ds, api);
@ -70,7 +66,7 @@ public class MdCollectRefreshJobNode extends AbstractJobNode {
.map(mdBuilder)
.forEach(record -> mdstoreService.addRecord(mdId, record));
dsm.updateUpdateApiCollectionInfo(api.getId(), mdId, mdstoreService.getSize(mdId));
clientFactory.getClient(DsmClient.class).updateUpdateApiCollectionInfo(api.getId(), mdId, mdstoreService.getSize(mdId));
}
}

View File

@ -4,6 +4,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.common.clients.DnetServiceClientFactory;
import eu.dnetlib.common.clients.OaiManagerClient;
import eu.dnetlib.domain.dsm.Api;
import eu.dnetlib.domain.dsm.Datasource;
import eu.dnetlib.domain.oai.OaiConfiguration;
import eu.dnetlib.wfs.annotations.WfInputParam;
import eu.dnetlib.wfs.annotations.WfNode;
@ -15,10 +17,10 @@ import jakarta.transaction.Transactional;
public class MdExportOaiJobNode extends AbstractJobNode {
@WfInputParam
private String dsId;
private Datasource ds;
@WfInputParam
private String apiId;
private Api api;
@WfInputParam
private String inputMdId;

View File

@ -4,6 +4,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import eu.dnetlib.common.clients.DnetServiceClientFactory;
import eu.dnetlib.common.clients.IndexManagerClient;
import eu.dnetlib.domain.dsm.Api;
import eu.dnetlib.domain.dsm.Datasource;
import eu.dnetlib.domain.index.IndexConfiguration;
import eu.dnetlib.domain.mdstore.records.MetadataRecord;
import eu.dnetlib.wfs.annotations.WfInputParam;
@ -16,10 +18,10 @@ import eu.dnetlib.wfs.store.DbMdStoreService;
public class MdIndexJobNode extends AbstractJobNode {
@WfInputParam
private String dsId;
private Datasource ds;
@WfInputParam
private String apiId;
private Api api;
@WfInputParam
private String inputMdId;

View File

@ -14,6 +14,8 @@ import eu.dnetlib.common.clients.DnetServiceClientFactory;
import eu.dnetlib.common.clients.DsmClient;
import eu.dnetlib.common.mapping.RecordTransformer;
import eu.dnetlib.common.mapping.xslt.XsltTransformFactory;
import eu.dnetlib.domain.dsm.Api;
import eu.dnetlib.domain.dsm.Datasource;
import eu.dnetlib.domain.mdstore.records.MetadataRecord;
import eu.dnetlib.domain.mdstore.records.MetadataRecordImpl;
import eu.dnetlib.wfs.annotations.WfInputParam;
@ -26,10 +28,10 @@ import jakarta.transaction.Transactional;
public class MdTransformJobNode extends AbstractJobNode {
@WfInputParam
private String dsId;
private Datasource ds;
@WfInputParam
private String apiId;
private Api api;
@WfInputParam
private String inputMdId;
@ -55,7 +57,6 @@ public class MdTransformJobNode extends AbstractJobNode {
@Override
@Transactional
protected void execute() throws Exception {
final DsmClient dsm = clientFactory.getClient(DsmClient.class);
final Predicate<Document> filter = XpathFilterFactory.createFilter(filterXpath);
@ -83,7 +84,7 @@ public class MdTransformJobNode extends AbstractJobNode {
})
.forEach(record -> mdstoreService.addRecord(outputMdId, record));
dsm.updateUpdateApiAggregationInfo(apiId, outputMdId, mdstoreService.getSize(outputMdId));
clientFactory.getClient(DsmClient.class).updateUpdateApiAggregationInfo(api.getId(), outputMdId, mdstoreService.getSize(outputMdId));
}

View File

@ -1,5 +1,5 @@
package eu.dnetlib.wfs.nodes.index;
public class AbstractIndexerJobNode {
public class CreateIndexConfigurationJobNode {
// TODO
}

View File

@ -0,0 +1,18 @@
package eu.dnetlib.wfs.annotations;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.stereotype.Component;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface CollectorPlugin {
String value();
}

View File

@ -1,23 +1,52 @@
package eu.dnetlib.wfs.collector;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import eu.dnetlib.domain.dsm.Api;
import eu.dnetlib.domain.dsm.ApiParam;
import eu.dnetlib.errors.DnetRuntimeException;
import eu.dnetlib.wfs.annotations.CollectorPlugin;
@Service
public class CollectorService {
@Autowired
private List<DnetCollectorPlugin> plugins;
public Stream<String> collect(final Api api) {
// TODO Auto-generated method stub
return null;
try {
return findPlugin(api.getProtocol()).collect(api.getBaseurl(), toMap(api.getApiParams()), null, null);
} catch (final Exception e) {
throw new DnetRuntimeException("Error collecting api", e);
}
}
public Stream<String> collect(final Api api, final LocalDateTime from, final LocalDateTime until) {
// TODO Auto-generated method stub
return null;
try {
return findPlugin(api.getProtocol()).collect(api.getBaseurl(), toMap(api.getApiParams()), from, until);
} catch (final Exception e) {
throw new DnetRuntimeException("Error collecting api", e);
}
}
private DnetCollectorPlugin findPlugin(final String protocol) {
return plugins.stream()
.filter(p -> p.getClass().isAnnotationPresent(CollectorPlugin.class))
.filter(p -> p.getClass().getAnnotation(CollectorPlugin.class).value().equalsIgnoreCase(protocol))
.findFirst()
.orElseThrow();
}
private Map<String, String> toMap(final Set<ApiParam> apiParams) {
return apiParams.stream().collect(Collectors.toMap(ApiParam::getParam, ApiParam::getValue));
}
}

View File

@ -0,0 +1,11 @@
package eu.dnetlib.wfs.collector;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.stream.Stream;
public interface DnetCollectorPlugin {
Stream<String> collect(String baseUrl, Map<String, String> apiParams, LocalDateTime from, LocalDateTime until) throws Exception;
}

View File

@ -1,78 +0,0 @@
package eu.dnetlib.wfs.collector.filesystem;
import java.io.File;
import java.io.FileInputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.dnetlib.domain.dsm.ApiDesc;
import eu.dnetlib.errors.DnetException;
import eu.dnetlib.utils.DnetStreamSupport;
import eu.dnetlib.utils.XmlCleaner;
/**
* The Class FilesystemIterable.
*
* @author Sandro, Michele, Andrea
*/
public class FileSystemIterable implements Iterable<String> {
/**
* The Constant log.
*/
private static final Log log = LogFactory.getLog(FileSystemIterable.class);
/**
* The base dir.
*/
private File baseDir;
/**
* The extensions.
*/
private String extension;
/**
* Instantiates a new filesystem iterable.
*
* @param api
* the api
* @throws DnetException
* the collector service exception
*/
public FileSystemIterable(final ApiDesc api) throws DnetException {
try {
final String baseUrl = api.getBaseUrl();
final URL basePath = new URL(baseUrl);
this.baseDir = new File(basePath.getPath());
if (!baseDir.exists()) { throw new DnetException(String.format("The base ULR %s, does not exist", basePath.getPath())); }
this.extension = api.getParams().get("extensions");
} catch (final MalformedURLException e) {
throw new DnetException("Filesystem collector failed! ", e);
}
}
@Override
public Iterator<String> iterator() {
final FileSystemIterator fsi = new FileSystemIterator(baseDir.getAbsolutePath(), extension);
return DnetStreamSupport.stream(fsi)
.map(this::loadFile)
.iterator();
}
private String loadFile(final String inputFileName) {
try (FileInputStream fileInputStream = new FileInputStream(inputFileName)) {
final String s = IOUtils.toString(fileInputStream, StandardCharsets.UTF_8.toString());
return XmlCleaner.cleanAllEntities(s.startsWith("\uFEFF") ? s.substring(1) : s);
} catch (final Exception e) {
log.error("Unable to read " + inputFileName);
return "";
}
}
}

View File

@ -0,0 +1,49 @@
package eu.dnetlib.wfs.collector.filesystem;
import java.io.File;
import java.io.FileInputStream;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.stream.Stream;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.dnetlib.errors.DnetException;
import eu.dnetlib.utils.DnetStreamSupport;
import eu.dnetlib.utils.XmlCleaner;
import eu.dnetlib.wfs.annotations.CollectorPlugin;
import eu.dnetlib.wfs.collector.DnetCollectorPlugin;
@CollectorPlugin("filesystem")
public class FilesystemCollectorPlugin implements DnetCollectorPlugin {
private static final Log log = LogFactory.getLog(FilesystemCollectorPlugin.class);
@Override
public Stream<String> collect(final String baseUrl, final Map<String, String> apiParams, final LocalDateTime from, final LocalDateTime until)
throws Exception {
final URL basePath = new URL(baseUrl);
final File baseDir = new File(basePath.getPath());
if (!baseDir.exists()) { throw new DnetException(String.format("The base ULR %s, does not exist", basePath.getPath())); }
final FileSystemIterator fsi = new FileSystemIterator(baseDir.getAbsolutePath(), apiParams.get("extension"));
return DnetStreamSupport.stream(fsi).map(this::loadFile);
}
private String loadFile(final String inputFileName) {
try (FileInputStream fileInputStream = new FileInputStream(inputFileName)) {
final String s = IOUtils.toString(fileInputStream, StandardCharsets.UTF_8.toString());
return XmlCleaner.cleanAllEntities(s.startsWith("\uFEFF") ? s.substring(1) : s);
} catch (final Exception e) {
log.error("Unable to read " + inputFileName);
return "";
}
}
}

View File

@ -0,0 +1,20 @@
package eu.dnetlib.wfs.collector.oai;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.stream.Stream;
import eu.dnetlib.wfs.annotations.CollectorPlugin;
import eu.dnetlib.wfs.collector.DnetCollectorPlugin;
@CollectorPlugin("oai")
public class OaiCollectorPlugin implements DnetCollectorPlugin {
@Override
public Stream<String> collect(final String baseUrl, final Map<String, String> apiParams, final LocalDateTime from, final LocalDateTime until)
throws Exception {
// TODO Auto-generated method stub
return null;
}
}