Merge branch 'hadoop_aggregator' of code-repo.d4science.org:D-Net/dnet-hadoop into hadoop_aggregator
Conflicts: dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.javapull/96/head
commit
b3f5c2351d
@ -1,5 +1,5 @@
|
||||
|
||||
package eu.dnetlib.dhp.collection.worker;
|
||||
package eu.dnetlib.dhp.collection;
|
||||
|
||||
public class CollectorException extends Exception {
|
||||
|
@ -1,5 +1,5 @@
|
||||
|
||||
package eu.dnetlib.dhp.collection.worker;
|
||||
package eu.dnetlib.dhp.collection;
|
||||
|
||||
import static eu.dnetlib.dhp.common.Constants.REPORT_FILE_NAME;
|
||||
import static eu.dnetlib.dhp.utils.DHPUtils.*;
|
@ -1,5 +1,5 @@
|
||||
|
||||
package eu.dnetlib.dhp.collection.worker;
|
||||
package eu.dnetlib.dhp.collection;
|
||||
|
||||
/**
|
||||
* Bundles the http connection parameters driving the client behaviour.
|
@ -1,5 +1,5 @@
|
||||
|
||||
package eu.dnetlib.dhp.collection.worker;
|
||||
package eu.dnetlib.dhp.collection;
|
||||
|
||||
import static eu.dnetlib.dhp.utils.DHPUtils.*;
|
||||
|
@ -1,5 +1,5 @@
|
||||
|
||||
package eu.dnetlib.dhp.collection.worker;
|
||||
package eu.dnetlib.dhp.collection;
|
||||
|
||||
public class UnknownCollectorPluginException extends Exception {
|
||||
|
@ -1,5 +1,5 @@
|
||||
|
||||
package eu.dnetlib.dhp.collection.worker;
|
||||
package eu.dnetlib.dhp.collection;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
@ -0,0 +1,59 @@
|
||||
|
||||
package eu.dnetlib.dhp.collection.plugin.mongodb;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.Spliterator;
|
||||
import java.util.Spliterators;
|
||||
import java.util.stream.Stream;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
import org.bson.Document;
|
||||
|
||||
import com.mongodb.MongoClient;
|
||||
import com.mongodb.client.MongoCollection;
|
||||
import com.mongodb.client.MongoDatabase;
|
||||
|
||||
import eu.dnetlib.dhp.collection.ApiDescriptor;
|
||||
import eu.dnetlib.dhp.collection.CollectorException;
|
||||
import eu.dnetlib.dhp.collection.CollectorPluginReport;
|
||||
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
|
||||
|
||||
public class MongoDbCollectorPlugin implements CollectorPlugin {
|
||||
|
||||
public static final String MONGODB_HOST = "mongodb_host";
|
||||
public static final String MONGODB_PORT = "mongodb_port";
|
||||
public static final String MONGODB_COLLECTION = "mongodb_collection";
|
||||
public static final String MONGODB_DBNAME = "mongodb_dbname";
|
||||
|
||||
@Override
|
||||
public Stream<String> collect(ApiDescriptor api, CollectorPluginReport report) throws CollectorException {
|
||||
|
||||
final String host = Optional
|
||||
.ofNullable(api.getParams().get(MONGODB_HOST))
|
||||
.orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", MONGODB_HOST)));
|
||||
|
||||
final Integer port = Optional
|
||||
.ofNullable(api.getParams().get(MONGODB_PORT))
|
||||
.map(Integer::parseInt)
|
||||
.orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", MONGODB_PORT)));
|
||||
|
||||
final String dbName = Optional
|
||||
.ofNullable(api.getParams().get(MONGODB_DBNAME))
|
||||
.orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", MONGODB_DBNAME)));
|
||||
|
||||
final String collection = Optional
|
||||
.ofNullable(api.getParams().get(MONGODB_COLLECTION))
|
||||
.orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", MONGODB_COLLECTION)));
|
||||
|
||||
final MongoClient mongoClient = new MongoClient(host, port);
|
||||
final MongoDatabase database = mongoClient.getDatabase(dbName);
|
||||
final MongoCollection<Document> mdstore = database.getCollection(collection);
|
||||
|
||||
long size = mdstore.count();
|
||||
|
||||
return StreamSupport
|
||||
.stream(
|
||||
Spliterators.spliterator(mdstore.find().iterator(), size, Spliterator.SIZED), false)
|
||||
.map(doc -> doc.getString("body"));
|
||||
}
|
||||
}
|
@ -0,0 +1,54 @@
|
||||
|
||||
package eu.dnetlib.dhp.collection.plugin.mongodb;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Stream;
|
||||
import java.util.zip.GZIPInputStream;
|
||||
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
import eu.dnetlib.dhp.collection.ApiDescriptor;
|
||||
import eu.dnetlib.dhp.collection.CollectorException;
|
||||
import eu.dnetlib.dhp.collection.CollectorPluginReport;
|
||||
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
|
||||
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||
|
||||
public class MongoDbDumpCollectorPlugin implements CollectorPlugin {
|
||||
|
||||
public static final String PATH_PARAM = "path";
|
||||
public static final String BODY_JSONPATH = "$.body";
|
||||
|
||||
public FileSystem fileSystem;
|
||||
|
||||
public MongoDbDumpCollectorPlugin(FileSystem fileSystem) {
|
||||
this.fileSystem = fileSystem;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<String> collect(ApiDescriptor api, CollectorPluginReport report) throws CollectorException {
|
||||
|
||||
final Path path = Optional
|
||||
.ofNullable(api.getParams().get("path"))
|
||||
.map(Path::new)
|
||||
.orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", PATH_PARAM)));
|
||||
|
||||
try {
|
||||
if (!fileSystem.exists(path)) {
|
||||
throw new CollectorException("path does not exist: " + path.toString());
|
||||
}
|
||||
|
||||
return new BufferedReader(
|
||||
new InputStreamReader(new GZIPInputStream(fileSystem.open(path)), Charset.defaultCharset()))
|
||||
.lines()
|
||||
.map(s -> DHPUtils.getJPathString(BODY_JSONPATH, s));
|
||||
|
||||
} catch (IOException e) {
|
||||
throw new CollectorException(e);
|
||||
}
|
||||
}
|
||||
}
|
@ -1,20 +0,0 @@
|
||||
|
||||
package eu.dnetlib.dhp.collection.worker;
|
||||
|
||||
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
|
||||
import eu.dnetlib.dhp.collection.plugin.oai.OaiCollectorPlugin;
|
||||
|
||||
public class CollectorPluginFactory {
|
||||
|
||||
public static CollectorPlugin getPluginByProtocol(final HttpClientParams clientParams, final String protocol)
|
||||
throws UnknownCollectorPluginException {
|
||||
if (protocol == null)
|
||||
throw new UnknownCollectorPluginException("protocol cannot be null");
|
||||
switch (protocol.toLowerCase().trim()) {
|
||||
case "oai":
|
||||
return new OaiCollectorPlugin(clientParams);
|
||||
default:
|
||||
throw new UnknownCollectorPluginException("Unknown protocol");
|
||||
}
|
||||
}
|
||||
}
|
@ -1,130 +0,0 @@
|
||||
|
||||
package eu.dnetlib.dhp.collection;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.junit.jupiter.api.*;
|
||||
import org.junit.jupiter.api.io.TempDir;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreCurrentVersion;
|
||||
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
|
||||
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
|
||||
import eu.dnetlib.dhp.model.mdstore.Provenance;
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
|
||||
public class CollectionJobTest {
|
||||
|
||||
private static SparkSession spark;
|
||||
|
||||
@BeforeAll
|
||||
public static void beforeAll() {
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.setAppName(CollectionJobTest.class.getSimpleName());
|
||||
conf.setMaster("local");
|
||||
spark = SparkSession.builder().config(conf).getOrCreate();
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
public static void afterAll() {
|
||||
spark.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJSONSerialization() throws Exception {
|
||||
final String s = IOUtils.toString(getClass().getResourceAsStream("input.json"));
|
||||
System.out.println("s = " + s);
|
||||
final ObjectMapper mapper = new ObjectMapper();
|
||||
MDStoreVersion mi = mapper.readValue(s, MDStoreVersion.class);
|
||||
|
||||
assertNotNull(mi);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void tesCollection(@TempDir Path testDir) throws Exception {
|
||||
final Provenance provenance = new Provenance("pippo", "puppa", "ns_prefix");
|
||||
Assertions.assertNotNull(new ObjectMapper().writeValueAsString(provenance));
|
||||
|
||||
GenerateNativeStoreSparkJob
|
||||
.main(
|
||||
new String[] {
|
||||
"issm", "true",
|
||||
"-w", "wid",
|
||||
"-e", "XML",
|
||||
"-d", "" + System.currentTimeMillis(),
|
||||
"-p", new ObjectMapper().writeValueAsString(provenance),
|
||||
"-x", "./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']",
|
||||
"-i", this.getClass().getResource("/eu/dnetlib/dhp/collection/native.seq").toString(),
|
||||
"-o", testDir.toString() + "/store",
|
||||
"-t", "true",
|
||||
"-ru", "",
|
||||
"-rp", "",
|
||||
"-rh", "",
|
||||
"-ro", "",
|
||||
"-rr", ""
|
||||
});
|
||||
|
||||
// TODO introduce useful assertions
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGenerationMetadataRecord() throws Exception {
|
||||
|
||||
final String xml = IOUtils.toString(this.getClass().getResourceAsStream("./record.xml"));
|
||||
|
||||
final MetadataRecord record = GenerateNativeStoreSparkJob
|
||||
.parseRecord(
|
||||
xml,
|
||||
"./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']",
|
||||
"XML",
|
||||
new Provenance("foo", "bar", "ns_prefix"),
|
||||
System.currentTimeMillis(),
|
||||
null,
|
||||
null);
|
||||
|
||||
assertNotNull(record.getId());
|
||||
assertNotNull(record.getOriginalId());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void TestEquals() throws IOException {
|
||||
|
||||
final String xml = IOUtils.toString(this.getClass().getResourceAsStream("./record.xml"));
|
||||
final MetadataRecord record = GenerateNativeStoreSparkJob
|
||||
.parseRecord(
|
||||
xml,
|
||||
"./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']",
|
||||
"XML",
|
||||
new Provenance("foo", "bar", "ns_prefix"),
|
||||
System.currentTimeMillis(),
|
||||
null,
|
||||
null);
|
||||
final MetadataRecord record1 = GenerateNativeStoreSparkJob
|
||||
.parseRecord(
|
||||
xml,
|
||||
"./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']",
|
||||
"XML",
|
||||
new Provenance("foo", "bar", "ns_prefix"),
|
||||
System.currentTimeMillis(),
|
||||
null,
|
||||
null);
|
||||
|
||||
record.setBody("ciao");
|
||||
record1.setBody("mondo");
|
||||
|
||||
assertNotNull(record);
|
||||
assertNotNull(record1);
|
||||
assertEquals(record, record1);
|
||||
}
|
||||
}
|
@ -0,0 +1,113 @@
|
||||
|
||||
package eu.dnetlib.dhp.collection;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.MethodOrderer;
|
||||
import org.junit.jupiter.api.TestMethodOrder;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
|
||||
|
||||
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
public class CollectionWorkflowTest {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(CollectionWorkflowTest.class);
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
private static Path workingDir;
|
||||
|
||||
private static DistributedFileSystem fileSystem;
|
||||
|
||||
// private static MiniDFSCluster hdfsCluster;
|
||||
|
||||
private static ApiDescriptor api;
|
||||
private static String mdStoreVersion;
|
||||
|
||||
private static final String encoding = "XML";
|
||||
private static final String dateOfCollection = System.currentTimeMillis() + "";
|
||||
private static final String xpath = "//*[local-name()='header']/*[local-name()='identifier']";
|
||||
private static String provenance;
|
||||
|
||||
private static final String msgMgrUrl = "http://localhost:%s/mock/mvc/dhp/message";
|
||||
|
||||
@BeforeAll
|
||||
protected static void beforeAll() throws Exception {
|
||||
provenance = IOUtils
|
||||
.toString(CollectionWorkflowTest.class.getResourceAsStream("/eu/dnetlib/dhp/collection/provenance.json"));
|
||||
|
||||
workingDir = Files.createTempDirectory(CollectionWorkflowTest.class.getSimpleName());
|
||||
log.info("using work dir {}", workingDir);
|
||||
|
||||
/*
|
||||
* Configuration conf = new Configuration(); conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR,
|
||||
* workingDir.toString()); hdfsCluster = new MiniDFSCluster.Builder(conf).build(); fileSystem =
|
||||
* hdfsCluster.getFileSystem(); api = OBJECT_MAPPER .readValue(
|
||||
* IOUtils.toString(CollectionWorkflowTest.class.getResourceAsStream("apiDescriptor.json")),
|
||||
* ApiDescriptor.class); mdStoreVersion = OBJECT_MAPPER
|
||||
* .writeValueAsString(prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_1.json"));
|
||||
*/
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
protected static void tearDown() {
|
||||
/*
|
||||
* hdfsCluster.shutdown(); FileUtil.fullyDelete(workingDir.toFile());
|
||||
*/
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
<action name="CollectionWorker">
|
||||
<java>
|
||||
<main-class>eu.dnetlib.dhp.collection.worker.CollectorWorkerApplication</main-class>
|
||||
<java-opts>${collection_java_xmx}</java-opts>
|
||||
<arg>--apidescriptor</arg><arg>${apiDescription}</arg>
|
||||
<arg>--namenode</arg><arg>${nameNode}</arg>
|
||||
<arg>--workflowId</arg><arg>${workflowId}</arg>
|
||||
<arg>--dnetMessageManagerURL</arg><arg>${dnetMessageManagerURL}</arg>
|
||||
<arg>--mdStoreVersion</arg><arg>${wf:actionData('StartTransaction')['mdStoreVersion']}</arg>
|
||||
<arg>--maxNumberOfRetry</arg><arg>${maxNumberOfRetry}</arg>
|
||||
<arg>--requestDelay</arg><arg>${requestDelay}</arg>
|
||||
<arg>--retryDelay</arg><arg>${retryDelay}</arg>
|
||||
<arg>--connectTimeOut</arg><arg>${connectTimeOut}</arg>
|
||||
<arg>--readTimeOut</arg><arg>${readTimeOut}</arg>
|
||||
<capture-output/>
|
||||
</java>
|
||||
<ok to="CollectorReport"/>
|
||||
<error to="CollectorReport"/>
|
||||
</action>
|
||||
*/
|
||||
// @Test
|
||||
// @Order(1)
|
||||
public void testCollectorWorkerApplication() throws Exception {
|
||||
|
||||
final HttpClientParams httpClientParams = new HttpClientParams();
|
||||
|
||||
// String url = String.format(msgMgrUrl, wireMockServer.port());
|
||||
|
||||
// new CollectorWorkerApplication(fileSystem).run(mdStoreVersion, httpClientParams, api, url, "1234");
|
||||
|
||||
}
|
||||
|
||||
public static MDStoreVersion prepareVersion(String filename) throws IOException {
|
||||
MDStoreVersion mdstore = OBJECT_MAPPER
|
||||
.readValue(IOUtils.toString(CollectionWorkflowTest.class.getResource(filename)), MDStoreVersion.class);
|
||||
mdstore.setHdfsPath(String.format(mdstore.getHdfsPath(), workingDir.toString()));
|
||||
return mdstore;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,10 @@
|
||||
{
|
||||
"id":"api_________::opendoar____::2::0",
|
||||
"baseUrl":"https://www.alexandria.unisg.ch/cgi/oai2",
|
||||
"protocol":"oai",
|
||||
"params": {
|
||||
"set":"driver",
|
||||
"metadata_identifier_path":"//*[local-name()\u003d\u0027header\u0027]/*[local-name()\u003d\u0027identifier\u0027]",
|
||||
"format":"oai_dc"
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue