diff --git a/dhp-common/pom.xml b/dhp-common/pom.xml
index e2db8b451..7c8be8d3e 100644
--- a/dhp-common/pom.xml
+++ b/dhp-common/pom.xml
@@ -98,6 +98,10 @@
httpclient
+
+ org.mongodb
+ mongo-java-driver
+
eu.dnetlib.dhp
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/MdstoreClient.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/MdstoreClient.java
similarity index 84%
rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/MdstoreClient.java
rename to dhp-common/src/main/java/eu/dnetlib/dhp/common/MdstoreClient.java
index a2177935a..236e4d8b0 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/MdstoreClient.java
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/MdstoreClient.java
@@ -1,13 +1,16 @@
-package eu.dnetlib.dhp.oa.graph.raw.common;
+package eu.dnetlib.dhp.common;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.stream.StreamSupport;
+import com.mongodb.BasicDBObject;
+import com.mongodb.QueryBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -34,6 +37,16 @@ public class MdstoreClient implements Closeable {
this.db = getDb(client, dbName);
}
+ public MongoCollection mdStore(final String mdId) {
+ BasicDBObject query = (BasicDBObject) QueryBuilder.start("mdId").is(mdId).get();
+
+ final String currentId = Optional.ofNullable(getColl(db, COLL_METADATA_MANAGER, true).find(query))
+ .map(r -> r.first())
+ .map(d -> d.getString("currentId"))
+ .orElseThrow(() -> new IllegalArgumentException("cannot find current mdstore id for: " + mdId));
+ return getColl(db, currentId, true);
+ }
+
public Map validCollections(
final String mdFormat, final String mdLayout, final String mdInterpretation) {
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/rest/DNetRestClient.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/rest/DNetRestClient.java
index 014f18606..27713d9b5 100644
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/rest/DNetRestClient.java
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/rest/DNetRestClient.java
@@ -11,9 +11,16 @@ import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.stream.Collectors;
public class DNetRestClient {
+ private static final Logger log = LoggerFactory.getLogger(DNetRestClient.class);
+
private static ObjectMapper mapper = new ObjectMapper();
public static T doGET(final String url, Class clazz) throws Exception {
@@ -44,6 +51,14 @@ public class DNetRestClient {
private static String doHTTPRequest(final HttpUriRequest r) throws Exception {
CloseableHttpClient client = HttpClients.createDefault();
+
+ log.info("performing HTTP request, method {} on URI {}", r.getMethod(), r.getURI().toString());
+ log.info("request headers: {}",
+ Arrays.asList(r.getAllHeaders())
+ .stream()
+ .map(h -> h.getName() + ":" + h.getValue())
+ .collect(Collectors.joining(",")));
+
CloseableHttpResponse response = client.execute(r);
return IOUtils.toString(response.getEntity().getContent());
}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java
index a397c4f9d..ef29cb5b1 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java
@@ -21,7 +21,7 @@ import eu.dnetlib.dhp.aggregation.common.AggregatorReport;
import eu.dnetlib.dhp.aggregation.common.ReporterCallback;
import eu.dnetlib.dhp.aggregation.common.ReportingJob;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
-import eu.dnetlib.dhp.collection.plugin.mongodb.MongoDbCollectorPlugin;
+import eu.dnetlib.dhp.collection.plugin.mongodb.MDStoreCollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.mongodb.MongoDbDumpCollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.oai.OaiCollectorPlugin;
@@ -119,7 +119,7 @@ public class CollectorWorker extends ReportingJob {
case mdstore_mongodb_dump:
return new MongoDbDumpCollectorPlugin(fileSystem);
case mdstore_mongodb:
- return new MongoDbCollectorPlugin();
+ return new MDStoreCollectorPlugin();
default:
throw new UnknownCollectorPluginException("plugin is not managed: " + plugin);
}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MongoDbCollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MDStoreCollectorPlugin.java
similarity index 54%
rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MongoDbCollectorPlugin.java
rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MDStoreCollectorPlugin.java
index 89b92ffa1..33b9111dd 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MongoDbCollectorPlugin.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MDStoreCollectorPlugin.java
@@ -7,48 +7,38 @@ import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
-import org.bson.Document;
-
-import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
-import com.mongodb.client.MongoDatabase;
+import eu.dnetlib.dhp.common.MdstoreClient;
import eu.dnetlib.dhp.aggregation.common.AggregatorReport;
import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.collection.CollectorException;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
+import org.bson.Document;
-public class MongoDbCollectorPlugin implements CollectorPlugin {
+public class MDStoreCollectorPlugin implements CollectorPlugin {
- public static final String MONGODB_HOST = "mongodb_host";
- public static final String MONGODB_PORT = "mongodb_port";
- public static final String MONGODB_COLLECTION = "mongodb_collection";
+ public static final String MONGODB_BASEURL = "mongodb_baseurl";
public static final String MONGODB_DBNAME = "mongodb_dbname";
+ public static final String MDSTORE_ID = "mongodb_collection";
@Override
public Stream collect(ApiDescriptor api, AggregatorReport report) throws CollectorException {
- final String host = Optional
- .ofNullable(api.getParams().get(MONGODB_HOST))
- .orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", MONGODB_HOST)));
-
- final Integer port = Optional
- .ofNullable(api.getParams().get(MONGODB_PORT))
- .map(Integer::parseInt)
- .orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", MONGODB_PORT)));
+ final String mongoBaseUrl = Optional
+ .ofNullable(api.getParams().get(MONGODB_BASEURL))
+ .orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", MONGODB_BASEURL)));
final String dbName = Optional
.ofNullable(api.getParams().get(MONGODB_DBNAME))
.orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", MONGODB_DBNAME)));
- final String collection = Optional
- .ofNullable(api.getParams().get(MONGODB_COLLECTION))
- .orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", MONGODB_COLLECTION)));
-
- final MongoClient mongoClient = new MongoClient(host, port);
- final MongoDatabase database = mongoClient.getDatabase(dbName);
- final MongoCollection mdstore = database.getCollection(collection);
+ final String mdId = Optional
+ .ofNullable(api.getParams().get(MDSTORE_ID))
+ .orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", MDSTORE_ID)));
+ final MdstoreClient client = new MdstoreClient(mongoBaseUrl, dbName);
+ final MongoCollection mdstore = client.mdStore(mdId);
long size = mdstore.count();
return StreamSupport
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java
index e7703bf72..9e7e051de 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java
@@ -12,7 +12,7 @@ import org.apache.commons.logging.LogFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
-import eu.dnetlib.dhp.oa.graph.raw.common.MdstoreClient;
+import eu.dnetlib.dhp.common.MdstoreClient;
public class MigrateMongoMdstoresApplication extends AbstractMigrationApplication
implements Closeable {