classes related to the collection workflow moved into common package; implemented MongoDB collection plugins

pull/96/head
Claudio Atzori 3 years ago
parent bae029f828
commit 29c6f7e255

@ -1,6 +1,9 @@
package eu.dnetlib.dhp.message;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
@ -30,13 +33,15 @@ public class MessageSender {
private final String workflowId;
private ExecutorService executorService = Executors.newCachedThreadPool();
public MessageSender(final String dnetMessageEndpoint, final String workflowId) {
this.workflowId = workflowId;
this.dnetMessageEndpoint = dnetMessageEndpoint;
}
public void sendMessage(final Message message) {
new Thread(() -> _sendMessage(message)).start();
executorService.submit(() -> _sendMessage(message));
}
public void sendMessage(final Long current, final Long total) {
@ -67,7 +72,6 @@ public class MessageSender {
.setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT_MS)
.setSocketTimeout(SOCKET_TIMEOUT_MS)
.build();
;
try (final CloseableHttpClient client = HttpClients
.custom()

@ -106,7 +106,10 @@
<artifactId>commons-compress</artifactId>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
</dependency>
</dependencies>

@ -18,7 +18,7 @@ import org.apache.hadoop.fs.Path;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.worker.HttpConnector2;
import eu.dnetlib.dhp.collection.HttpConnector2;
/**
* Applies the parsing of a csv file and writes the Serialization of it in hdfs

@ -15,7 +15,7 @@ import org.apache.hadoop.fs.Path;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.worker.HttpConnector2;
import eu.dnetlib.dhp.collection.HttpConnector2;
/**
* Applies the parsing of an excel file and writes the Serialization of it in hdfs

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.collection.worker;
package eu.dnetlib.dhp.collection;
public class CollectorException extends Exception {

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.collection.worker;
package eu.dnetlib.dhp.collection;
import static eu.dnetlib.dhp.utils.DHPUtils.*;
@ -17,15 +17,10 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.annotation.JsonIgnore;
import eu.dnetlib.dhp.application.ApplicationUtils;
public class CollectorPluginReport extends LinkedHashMap<String, String> implements Closeable {
private static final Logger log = LoggerFactory.getLogger(CollectorPluginReport.class);
@JsonIgnore
private FileSystem fs;
@JsonIgnore
private Path path;
@ -38,9 +33,7 @@ public class CollectorPluginReport extends LinkedHashMap<String, String> impleme
}
public CollectorPluginReport(FileSystem fs, Path path) throws IOException {
this.fs = fs;
this.path = path;
this.fos = fs.create(path);
}

@ -1,23 +1,27 @@
package eu.dnetlib.dhp.collection.worker;
package eu.dnetlib.dhp.collection;
import static eu.dnetlib.dhp.common.Constants.SEQUENCE_FILE_NAME;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.DeflateCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.mongodb.MongoDbCollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.mongodb.MongoDbDumpCollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.oai.OaiCollectorPlugin;
import eu.dnetlib.dhp.message.MessageSender;
public class CollectorWorker {
@ -26,7 +30,7 @@ public class CollectorWorker {
private final ApiDescriptor api;
private final Configuration conf;
private final FileSystem fileSystem;
private final MDStoreVersion mdStoreVersion;
@ -38,13 +42,13 @@ public class CollectorWorker {
public CollectorWorker(
final ApiDescriptor api,
final Configuration conf,
final FileSystem fileSystem,
final MDStoreVersion mdStoreVersion,
final HttpClientParams clientParams,
final MessageSender messageSender,
final CollectorPluginReport report) {
this.api = api;
this.conf = conf;
this.fileSystem = fileSystem;
this.mdStoreVersion = mdStoreVersion;
this.clientParams = clientParams;
this.messageSender = messageSender;
@ -56,16 +60,16 @@ public class CollectorWorker {
final String outputPath = mdStoreVersion.getHdfsPath() + SEQUENCE_FILE_NAME;
log.info("outputPath path is {}", outputPath);
final CollectorPlugin plugin = CollectorPluginFactory.getPluginByProtocol(clientParams, api.getProtocol());
final CollectorPlugin plugin = getCollectorPlugin();
final AtomicInteger counter = new AtomicInteger(0);
try (SequenceFile.Writer writer = SequenceFile
.createWriter(
conf,
fileSystem.getConf(),
SequenceFile.Writer.file(new Path(outputPath)),
SequenceFile.Writer.keyClass(IntWritable.class),
SequenceFile.Writer.valueClass(Text.class),
SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new GzipCodec()))) {
SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) {
final IntWritable key = new IntWritable(counter.get());
final Text value = new Text();
plugin
@ -94,4 +98,26 @@ public class CollectorWorker {
}
}
private CollectorPlugin getCollectorPlugin() throws UnknownCollectorPluginException {
switch (StringUtils.lowerCase(StringUtils.trim(api.getProtocol()))) {
case "oai":
return new OaiCollectorPlugin(clientParams);
case "other":
final String plugin = Optional
.ofNullable(api.getParams().get("other_plugin_type"))
.orElseThrow(() -> new UnknownCollectorPluginException("other_plugin_type"));
switch (plugin) {
case "mdstore_mongodb_dump":
return new MongoDbDumpCollectorPlugin(fileSystem);
case "mdstore_mongodb":
return new MongoDbCollectorPlugin();
default:
throw new UnknownCollectorPluginException("Unknown plugin type: " + plugin);
}
default:
throw new UnknownCollectorPluginException("Unknown protocol: " + api.getProtocol());
}
}
}

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.collection.worker;
package eu.dnetlib.dhp.collection;
import static eu.dnetlib.dhp.common.Constants.*;
import static eu.dnetlib.dhp.utils.DHPUtils.*;
@ -9,7 +9,6 @@ import java.util.Optional;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
@ -17,7 +16,6 @@ import org.slf4j.LoggerFactory;
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.message.MessageSender;
/**
@ -32,6 +30,12 @@ public class CollectorWorkerApplication {
private static final Logger log = LoggerFactory.getLogger(CollectorWorkerApplication.class);
private FileSystem fileSystem;
public CollectorWorkerApplication(FileSystem fileSystem) {
this.fileSystem = fileSystem;
}
/**
* @param args
*/
@ -63,6 +67,18 @@ public class CollectorWorkerApplication {
final String workflowId = argumentParser.get("workflowId");
log.info("workflowId is {}", workflowId);
final HttpClientParams clientParams = getClientParams(argumentParser);
final ApiDescriptor api = MAPPER.readValue(apiDescriptor, ApiDescriptor.class);
final FileSystem fileSystem = FileSystem.get(getHadoopConfiguration(hdfsuri));
new CollectorWorkerApplication(fileSystem)
.run(mdStoreVersion, clientParams, api, dnetMessageManagerURL, workflowId);
}
protected void run(String mdStoreVersion, HttpClientParams clientParams, ApiDescriptor api,
String dnetMessageManagerURL, String workflowId) throws IOException {
final MessageSender ms = new MessageSender(dnetMessageManagerURL, workflowId);
final MDStoreVersion currentVersion = MAPPER.readValue(mdStoreVersion, MDStoreVersion.class);
@ -70,13 +86,9 @@ public class CollectorWorkerApplication {
final String reportPath = currentVersion.getHdfsPath() + REPORT_FILE_NAME;
log.info("report path is {}", reportPath);
final HttpClientParams clientParams = getClientParams(argumentParser);
final ApiDescriptor api = MAPPER.readValue(apiDescriptor, ApiDescriptor.class);
final Configuration conf = getHadoopConfiguration(hdfsuri);
try (CollectorPluginReport report = new CollectorPluginReport(FileSystem.get(conf), new Path(reportPath))) {
final CollectorWorker worker = new CollectorWorker(api, conf, currentVersion, clientParams, ms, report);
try (CollectorPluginReport report = new CollectorPluginReport(fileSystem, new Path(reportPath))) {
final CollectorWorker worker = new CollectorWorker(api, fileSystem, currentVersion, clientParams, ms,
report);
worker.collect();
report.setSuccess(true);
} catch (Throwable e) {

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.collection.worker;
package eu.dnetlib.dhp.collection;
import static eu.dnetlib.dhp.common.Constants.REPORT_FILE_NAME;
import static eu.dnetlib.dhp.utils.DHPUtils.*;

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.collection.worker;
package eu.dnetlib.dhp.collection;
/**
* Bundles the http connection parameters driving the client behaviour.

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.collection.worker;
package eu.dnetlib.dhp.collection;
import static eu.dnetlib.dhp.utils.DHPUtils.*;

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.collection.worker;
package eu.dnetlib.dhp.collection;
import java.util.HashMap;
import java.util.HashSet;

@ -4,8 +4,8 @@ package eu.dnetlib.dhp.collection.plugin;
import java.util.stream.Stream;
import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.collection.worker.CollectorException;
import eu.dnetlib.dhp.collection.worker.CollectorPluginReport;
import eu.dnetlib.dhp.collection.CollectorException;
import eu.dnetlib.dhp.collection.CollectorPluginReport;
public interface CollectorPlugin {

@ -0,0 +1,59 @@
package eu.dnetlib.dhp.collection.plugin.mongodb;
import java.util.Optional;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.bson.Document;
import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.collection.CollectorException;
import eu.dnetlib.dhp.collection.CollectorPluginReport;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
public class MongoDbCollectorPlugin implements CollectorPlugin {
public static final String MONGODB_HOST = "mongodb_host";
public static final String MONGODB_PORT = "mongodb_port";
public static final String MONGODB_COLLECTION = "mongodb_collection";
public static final String MONGODB_DBNAME = "mongodb_dbname";
@Override
public Stream<String> collect(ApiDescriptor api, CollectorPluginReport report) throws CollectorException {
final String host = Optional
.ofNullable(api.getParams().get(MONGODB_HOST))
.orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", MONGODB_HOST)));
final Integer port = Optional
.ofNullable(api.getParams().get(MONGODB_PORT))
.map(Integer::parseInt)
.orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", MONGODB_PORT)));
final String dbName = Optional
.ofNullable(api.getParams().get(MONGODB_DBNAME))
.orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", MONGODB_DBNAME)));
final String collection = Optional
.ofNullable(api.getParams().get(MONGODB_COLLECTION))
.orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", MONGODB_COLLECTION)));
final MongoClient mongoClient = new MongoClient(host, port);
final MongoDatabase database = mongoClient.getDatabase(dbName);
final MongoCollection<Document> mdstore = database.getCollection(collection);
long size = mdstore.count();
return StreamSupport
.stream(
Spliterators.spliterator(mdstore.find().iterator(), size, Spliterator.SIZED), false)
.map(doc -> doc.getString("body"));
}
}

@ -0,0 +1,54 @@
package eu.dnetlib.dhp.collection.plugin.mongodb;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.util.Optional;
import java.util.stream.Stream;
import java.util.zip.GZIPInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.collection.CollectorException;
import eu.dnetlib.dhp.collection.CollectorPluginReport;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.utils.DHPUtils;
public class MongoDbDumpCollectorPlugin implements CollectorPlugin {
public static final String PATH_PARAM = "path";
public static final String BODY_JSONPATH = "$.body";
public FileSystem fileSystem;
public MongoDbDumpCollectorPlugin(FileSystem fileSystem) {
this.fileSystem = fileSystem;
}
@Override
public Stream<String> collect(ApiDescriptor api, CollectorPluginReport report) throws CollectorException {
final Path path = Optional
.ofNullable(api.getParams().get("path"))
.map(Path::new)
.orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", PATH_PARAM)));
try {
if (!fileSystem.exists(path)) {
throw new CollectorException("path does not exist: " + path.toString());
}
return new BufferedReader(
new InputStreamReader(new GZIPInputStream(fileSystem.open(path)), Charset.defaultCharset()))
.lines()
.map(s -> DHPUtils.getJPathString(BODY_JSONPATH, s));
} catch (IOException e) {
throw new CollectorException(e);
}
}
}

@ -14,10 +14,10 @@ import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.collection.CollectorException;
import eu.dnetlib.dhp.collection.CollectorPluginReport;
import eu.dnetlib.dhp.collection.HttpClientParams;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.collection.worker.CollectorException;
import eu.dnetlib.dhp.collection.worker.CollectorPluginReport;
import eu.dnetlib.dhp.collection.worker.HttpClientParams;
public class OaiCollectorPlugin implements CollectorPlugin {

@ -16,10 +16,10 @@ import org.dom4j.io.SAXReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.collection.worker.CollectorException;
import eu.dnetlib.dhp.collection.worker.CollectorPluginReport;
import eu.dnetlib.dhp.collection.worker.HttpConnector2;
import eu.dnetlib.dhp.collection.worker.XmlCleaner;
import eu.dnetlib.dhp.collection.CollectorException;
import eu.dnetlib.dhp.collection.CollectorPluginReport;
import eu.dnetlib.dhp.collection.HttpConnector2;
import eu.dnetlib.dhp.collection.XmlCleaner;
public class OaiIterator implements Iterator<String> {

@ -3,9 +3,9 @@ package eu.dnetlib.dhp.collection.plugin.oai;
import java.util.Iterator;
import eu.dnetlib.dhp.collection.worker.CollectorPluginReport;
import eu.dnetlib.dhp.collection.worker.HttpClientParams;
import eu.dnetlib.dhp.collection.worker.HttpConnector2;
import eu.dnetlib.dhp.collection.CollectorPluginReport;
import eu.dnetlib.dhp.collection.HttpClientParams;
import eu.dnetlib.dhp.collection.HttpConnector2;
public class OaiIteratorFactory {

@ -1,20 +0,0 @@
package eu.dnetlib.dhp.collection.worker;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.oai.OaiCollectorPlugin;
public class CollectorPluginFactory {
public static CollectorPlugin getPluginByProtocol(final HttpClientParams clientParams, final String protocol)
throws UnknownCollectorPluginException {
if (protocol == null)
throw new UnknownCollectorPluginException("protocol cannot be null");
switch (protocol.toLowerCase().trim()) {
case "oai":
return new OaiCollectorPlugin(clientParams);
default:
throw new UnknownCollectorPluginException("Unknown protocol");
}
}
}

@ -98,7 +98,7 @@
<action name="CollectionWorker">
<java>
<main-class>eu.dnetlib.dhp.collection.worker.CollectorWorkerApplication</main-class>
<main-class>eu.dnetlib.dhp.collection.CollectorWorkerApplication</main-class>
<java-opts>${collection_java_xmx}</java-opts>
<arg>--apidescriptor</arg><arg>${apiDescription}</arg>
<arg>--namenode</arg><arg>${nameNode}</arg>
@ -118,7 +118,7 @@
<action name="CollectorReport">
<java>
<main-class>eu.dnetlib.dhp.collection.worker.CollectorWorkerReporter</main-class>
<main-class>eu.dnetlib.dhp.collection.CollectorWorkerReporter</main-class>
<java-opts>${collection_java_xmx}</java-opts>
<arg>--mdStoreVersion</arg><arg>${wf:actionData('StartTransaction')['mdStoreVersion']}</arg>
<arg>--namenode</arg><arg>${nameNode}</arg>

@ -13,8 +13,8 @@ import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import eu.dnetlib.dhp.actionmanager.project.utils.EXCELParser;
import eu.dnetlib.dhp.collection.worker.CollectorException;
import eu.dnetlib.dhp.collection.worker.HttpConnector2;
import eu.dnetlib.dhp.collection.CollectorException;
import eu.dnetlib.dhp.collection.HttpConnector2;
@Disabled
public class EXCELParserTest {

@ -1,130 +0,0 @@
package eu.dnetlib.dhp.collection;
import static org.junit.jupiter.api.Assertions.*;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.io.TempDir;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreCurrentVersion;
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.model.mdstore.Provenance;
import eu.dnetlib.dhp.schema.common.ModelSupport;
public class CollectionJobTest {
private static SparkSession spark;
@BeforeAll
public static void beforeAll() {
SparkConf conf = new SparkConf();
conf.setAppName(CollectionJobTest.class.getSimpleName());
conf.setMaster("local");
spark = SparkSession.builder().config(conf).getOrCreate();
}
@AfterAll
public static void afterAll() {
spark.stop();
}
@Test
public void testJSONSerialization() throws Exception {
final String s = IOUtils.toString(getClass().getResourceAsStream("input.json"));
System.out.println("s = " + s);
final ObjectMapper mapper = new ObjectMapper();
MDStoreVersion mi = mapper.readValue(s, MDStoreVersion.class);
assertNotNull(mi);
}
@Test
public void tesCollection(@TempDir Path testDir) throws Exception {
final Provenance provenance = new Provenance("pippo", "puppa", "ns_prefix");
Assertions.assertNotNull(new ObjectMapper().writeValueAsString(provenance));
GenerateNativeStoreSparkJob
.main(
new String[] {
"issm", "true",
"-w", "wid",
"-e", "XML",
"-d", "" + System.currentTimeMillis(),
"-p", new ObjectMapper().writeValueAsString(provenance),
"-x", "./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']",
"-i", this.getClass().getResource("/eu/dnetlib/dhp/collection/native.seq").toString(),
"-o", testDir.toString() + "/store",
"-t", "true",
"-ru", "",
"-rp", "",
"-rh", "",
"-ro", "",
"-rr", ""
});
// TODO introduce useful assertions
}
@Test
public void testGenerationMetadataRecord() throws Exception {
final String xml = IOUtils.toString(this.getClass().getResourceAsStream("./record.xml"));
final MetadataRecord record = GenerateNativeStoreSparkJob
.parseRecord(
xml,
"./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']",
"XML",
new Provenance("foo", "bar", "ns_prefix"),
System.currentTimeMillis(),
null,
null);
assertNotNull(record.getId());
assertNotNull(record.getOriginalId());
}
@Test
public void TestEquals() throws IOException {
final String xml = IOUtils.toString(this.getClass().getResourceAsStream("./record.xml"));
final MetadataRecord record = GenerateNativeStoreSparkJob
.parseRecord(
xml,
"./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']",
"XML",
new Provenance("foo", "bar", "ns_prefix"),
System.currentTimeMillis(),
null,
null);
final MetadataRecord record1 = GenerateNativeStoreSparkJob
.parseRecord(
xml,
"./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']",
"XML",
new Provenance("foo", "bar", "ns_prefix"),
System.currentTimeMillis(),
null,
null);
record.setBody("ciao");
record1.setBody("mondo");
assertNotNull(record);
assertNotNull(record1);
assertEquals(record, record1);
}
}

@ -0,0 +1,113 @@
package eu.dnetlib.dhp.collection;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@ExtendWith(MockitoExtension.class)
public class CollectionWorkflowTest {
private static final Logger log = LoggerFactory.getLogger(CollectionWorkflowTest.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static Path workingDir;
private static DistributedFileSystem fileSystem;
// private static MiniDFSCluster hdfsCluster;
private static ApiDescriptor api;
private static String mdStoreVersion;
private static final String encoding = "XML";
private static final String dateOfCollection = System.currentTimeMillis() + "";
private static final String xpath = "//*[local-name()='header']/*[local-name()='identifier']";
private static String provenance;
private static final String msgMgrUrl = "http://localhost:%s/mock/mvc/dhp/message";
@BeforeAll
protected static void beforeAll() throws Exception {
provenance = IOUtils
.toString(CollectionWorkflowTest.class.getResourceAsStream("/eu/dnetlib/dhp/collection/provenance.json"));
workingDir = Files.createTempDirectory(CollectionWorkflowTest.class.getSimpleName());
log.info("using work dir {}", workingDir);
/*
* Configuration conf = new Configuration(); conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR,
* workingDir.toString()); hdfsCluster = new MiniDFSCluster.Builder(conf).build(); fileSystem =
* hdfsCluster.getFileSystem(); api = OBJECT_MAPPER .readValue(
* IOUtils.toString(CollectionWorkflowTest.class.getResourceAsStream("apiDescriptor.json")),
* ApiDescriptor.class); mdStoreVersion = OBJECT_MAPPER
* .writeValueAsString(prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_1.json"));
*/
}
@AfterAll
protected static void tearDown() {
/*
* hdfsCluster.shutdown(); FileUtil.fullyDelete(workingDir.toFile());
*/
}
/**
<action name="CollectionWorker">
<java>
<main-class>eu.dnetlib.dhp.collection.worker.CollectorWorkerApplication</main-class>
<java-opts>${collection_java_xmx}</java-opts>
<arg>--apidescriptor</arg><arg>${apiDescription}</arg>
<arg>--namenode</arg><arg>${nameNode}</arg>
<arg>--workflowId</arg><arg>${workflowId}</arg>
<arg>--dnetMessageManagerURL</arg><arg>${dnetMessageManagerURL}</arg>
<arg>--mdStoreVersion</arg><arg>${wf:actionData('StartTransaction')['mdStoreVersion']}</arg>
<arg>--maxNumberOfRetry</arg><arg>${maxNumberOfRetry}</arg>
<arg>--requestDelay</arg><arg>${requestDelay}</arg>
<arg>--retryDelay</arg><arg>${retryDelay}</arg>
<arg>--connectTimeOut</arg><arg>${connectTimeOut}</arg>
<arg>--readTimeOut</arg><arg>${readTimeOut}</arg>
<capture-output/>
</java>
<ok to="CollectorReport"/>
<error to="CollectorReport"/>
</action>
*/
// @Test
// @Order(1)
public void testCollectorWorkerApplication() throws Exception {
final HttpClientParams httpClientParams = new HttpClientParams();
// String url = String.format(msgMgrUrl, wireMockServer.port());
// new CollectorWorkerApplication(fileSystem).run(mdStoreVersion, httpClientParams, api, url, "1234");
}
public static MDStoreVersion prepareVersion(String filename) throws IOException {
MDStoreVersion mdstore = OBJECT_MAPPER
.readValue(IOUtils.toString(CollectionWorkflowTest.class.getResource(filename)), MDStoreVersion.class);
mdstore.setHdfsPath(String.format(mdstore.getHdfsPath(), workingDir.toString()));
return mdstore;
}
}

@ -1,8 +1,9 @@
package eu.dnetlib.dhp.aggregation;
package eu.dnetlib.dhp.collection;
import static eu.dnetlib.dhp.common.Constants.MDSTORE_DATA_PATH;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import java.io.File;
import java.io.FileOutputStream;
@ -36,14 +37,14 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
import eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob;
import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.model.mdstore.Provenance;
import eu.dnetlib.dhp.transformation.TransformSparkJobNode;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@ExtendWith(MockitoExtension.class)
public class AggregationJobTest extends AbstractVocabularyTest {
public class GenerateNativeStoreSparkJobTest extends AbstractVocabularyTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@ -58,18 +59,20 @@ public class AggregationJobTest extends AbstractVocabularyTest {
private static final String xpath = "//*[local-name()='header']/*[local-name()='identifier']";
private static String provenance;
private static final Logger log = LoggerFactory.getLogger(AggregationJobTest.class);
private static final Logger log = LoggerFactory.getLogger(GenerateNativeStoreSparkJobTest.class);
@BeforeAll
public static void beforeAll() throws IOException {
provenance = IOUtils
.toString(AggregationJobTest.class.getResourceAsStream("/eu/dnetlib/dhp/collection/provenance.json"));
workingDir = Files.createTempDirectory(AggregationJobTest.class.getSimpleName());
.toString(
GenerateNativeStoreSparkJobTest.class
.getResourceAsStream("/eu/dnetlib/dhp/collection/provenance.json"));
workingDir = Files.createTempDirectory(GenerateNativeStoreSparkJobTest.class.getSimpleName());
log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(AggregationJobTest.class.getSimpleName());
conf.setAppName(GenerateNativeStoreSparkJobTest.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
@ -81,7 +84,7 @@ public class AggregationJobTest extends AbstractVocabularyTest {
encoder = Encoders.bean(MetadataRecord.class);
spark = SparkSession
.builder()
.appName(AggregationJobTest.class.getSimpleName())
.appName(GenerateNativeStoreSparkJobTest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@ -202,6 +205,67 @@ public class AggregationJobTest extends AbstractVocabularyTest {
}
@Test
public void testJSONSerialization() throws Exception {
final String s = IOUtils.toString(getClass().getResourceAsStream("mdStoreVersion_1.json"));
System.out.println("s = " + s);
final ObjectMapper mapper = new ObjectMapper();
MDStoreVersion mi = mapper.readValue(s, MDStoreVersion.class);
assertNotNull(mi);
}
@Test
public void testGenerationMetadataRecord() throws Exception {
final String xml = IOUtils.toString(this.getClass().getResourceAsStream("./record.xml"));
final MetadataRecord record = GenerateNativeStoreSparkJob
.parseRecord(
xml,
"./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']",
"XML",
new Provenance("foo", "bar", "ns_prefix"),
System.currentTimeMillis(),
null,
null);
assertNotNull(record.getId());
assertNotNull(record.getOriginalId());
}
@Test
public void testEquals() throws IOException {
final String xml = IOUtils.toString(this.getClass().getResourceAsStream("./record.xml"));
final MetadataRecord record = GenerateNativeStoreSparkJob
.parseRecord(
xml,
"./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']",
"XML",
new Provenance("foo", "bar", "ns_prefix"),
System.currentTimeMillis(),
null,
null);
final MetadataRecord record1 = GenerateNativeStoreSparkJob
.parseRecord(
xml,
"./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']",
"XML",
new Provenance("foo", "bar", "ns_prefix"),
System.currentTimeMillis(),
null,
null);
record.setBody("ciao");
record1.setBody("mondo");
assertNotNull(record);
assertNotNull(record1);
assertEquals(record, record1);
}
protected void verify(MDStoreVersion mdStoreVersion) throws IOException {
Assertions.assertTrue(new File(mdStoreVersion.getHdfsPath()).exists());
@ -226,7 +290,7 @@ public class AggregationJobTest extends AbstractVocabularyTest {
Assertions.assertEquals(seqFileSize, uniqueIds, "the size must be equal");
}
private MDStoreVersion prepareVersion(String filename) throws IOException {
public MDStoreVersion prepareVersion(String filename) throws IOException {
MDStoreVersion mdstore = OBJECT_MAPPER
.readValue(IOUtils.toString(getClass().getResource(filename)), MDStoreVersion.class);
mdstore.setHdfsPath(String.format(mdstore.getHdfsPath(), workingDir.toString()));

@ -9,20 +9,10 @@ import org.junit.jupiter.api.Test;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.collection.worker.CollectorPluginFactory;
import eu.dnetlib.dhp.collection.worker.HttpClientParams;
@Disabled
public class CollectorWorkerApplicationTests {
@Test
public void testFindPlugin() throws Exception {
final CollectorPluginFactory collectorPluginEnumerator = new CollectorPluginFactory();
final HttpClientParams clientParams = new HttpClientParams();
assertNotNull(collectorPluginEnumerator.getPluginByProtocol(clientParams, "oai"));
assertNotNull(collectorPluginEnumerator.getPluginByProtocol(clientParams, "OAI"));
}
@Test
public void testCollectionOAI() throws Exception {
final ApiDescriptor api = new ApiDescriptor();

@ -8,7 +8,7 @@ import java.io.IOException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import eu.dnetlib.dhp.collection.worker.CollectorPluginReport;
import eu.dnetlib.dhp.collection.CollectorPluginReport;
public class CollectorPluginReportTest {

@ -27,7 +27,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest;
import eu.dnetlib.dhp.aggregation.common.AggregationCounter;
import eu.dnetlib.dhp.collection.CollectionJobTest;
import eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJobTest;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.transformation.xslt.XSLTTransformationFunction;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
@ -40,7 +40,7 @@ public class TransformationJobTest extends AbstractVocabularyTest {
@BeforeAll
public static void beforeAll() throws IOException, ISLookUpException {
SparkConf conf = new SparkConf();
conf.setAppName(CollectionJobTest.class.getSimpleName());
conf.setAppName(GenerateNativeStoreSparkJobTest.class.getSimpleName());
conf.setMaster("local");
spark = SparkSession.builder().config(conf).getOrCreate();
}

@ -0,0 +1,10 @@
{
"id":"api_________::opendoar____::2::0",
"baseUrl":"https://www.alexandria.unisg.ch/cgi/oai2",
"protocol":"oai",
"params": {
"set":"driver",
"metadata_identifier_path":"//*[local-name()\u003d\u0027header\u0027]/*[local-name()\u003d\u0027identifier\u0027]",
"format":"oai_dc"
}
}

@ -79,6 +79,8 @@
<FIELD indexable="true" multivalued="true" name="resultauthor_nt" result="false" stat="false" type="string_ci" xpath="//*[local-name()='entity']/*[local-name()='result']/creator"/>
<FIELD indexable="true" multivalued="true" name="authorid" result="false" stat="false" type="string_ci" xpath="//*[local-name()='entity']/*[local-name()='result']/creator/@*[local-name() != 'rank' and local-name() != 'name' and local-name() != 'surname']"/>
<FIELD indexable="true" multivalued="true" name="authoridtype" result="false" stat="false" type="string_ci" xpath="//*[local-name()='entity']/*[local-name()='result']/creator/@*[local-name() != 'rank' and local-name() != 'name' and local-name() != 'surname']/local-name()"/>
<FIELD indexable="true" multivalued="true" name="authoridtypevalue" result="false" stat="false" type="string_ci" xpath="//*[local-name()='entity']/*[local-name()='result']/creator" value="string-join((./@*[local-name() != 'rank' and local-name() != 'name' and local-name() != 'surname'], ./@*[local-name() != 'rank' and local-name() != 'name' and local-name() != 'surname']/local-name()), '||' )"/>
<FIELD indexable="true" name="resulthostingdatasource" result="false" stat="false" tokenizable="false" value="distinct-values(concat(./@id, '||', ./@name))" xpath="//*[local-name()='entity']/*[local-name()='result']/children/instance/*[local-name()='hostedby']"/>
<FIELD indexable="true" name="resulthostingdatasourceid" result="false" stat="false" tokenizable="false" xpath="distinct-values(//*[local-name()='entity']/*[local-name()='result']/children/instance/*[local-name()='hostedby']/@id)"/>
<FIELD indexable="true" name="resulthostingdatasourcename" result="false" stat="false" tokenizable="false" xpath="distinct-values(//*[local-name()='entity']/*[local-name()='result']/children/instance/*[local-name()='hostedby']/@name)"/>

@ -39,6 +39,8 @@
<creator rank="4">Saykally, Jessica N.</creator>
<creator rank="5">Keeley, Kristen L.</creator>
<creator rank="6">Haris Hatic</creator>
<creator rank="7" name="Miriam" surname="Baglioni" orcid_pending="0000-0002-2273-9004">Baglioni, Miriam</creator>
<creator rank="8" name="Michele" surname="De Bonis" orcid="0000-0002-2273-9004">De Bonis, Michele</creator>
<dateofacceptance>2017-06-01</dateofacceptance>
<description>Withania somnifera has been used in traditional medicine for a variety
of neural disorders. Recently, chronic neurodegenerative conditions have been
@ -115,7 +117,7 @@
<source>Cell Transplantation</source>
<resulttype classid="publication" classname="publication"
schemeid="dnet:result_typologies" schemename="dnet:result_typologies"/>
<resourcetype/>
<resourcetype classid="Book" classname="Book" schemeid="dnet:dataCite_resource" schemename="dnet:dataCite_resource" />
<journal issn="0963-6897" eissn="1555-3892" ep="1201" iss="7" sp="1193" vol="26"
>Cell Transplantation</journal>
<context id="NIH" label="National Institutes of Health" type="funding">

Loading…
Cancel
Save