From fc3fa5e3436db1c1144afa9703e2d5e3a4e4a9c2 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 24 Feb 2021 15:07:24 +0100 Subject: [PATCH] implemented mdstore collector plugin --- dhp-common/pom.xml | 4 +++ .../eu/dnetlib/dhp}/common/MdstoreClient.java | 15 +++++++- .../dhp/common/rest/DNetRestClient.java | 15 ++++++++ .../dhp/collection/CollectorWorker.java | 4 +-- ...lugin.java => MDStoreCollectorPlugin.java} | 36 +++++++------------ .../raw/MigrateMongoMdstoresApplication.java | 2 +- 6 files changed, 49 insertions(+), 27 deletions(-) rename {dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw => dhp-common/src/main/java/eu/dnetlib/dhp}/common/MdstoreClient.java (84%) rename dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/{MongoDbCollectorPlugin.java => MDStoreCollectorPlugin.java} (54%) diff --git a/dhp-common/pom.xml b/dhp-common/pom.xml index e2db8b451..7c8be8d3e 100644 --- a/dhp-common/pom.xml +++ b/dhp-common/pom.xml @@ -98,6 +98,10 @@ httpclient + + org.mongodb + mongo-java-driver + eu.dnetlib.dhp diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/MdstoreClient.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/MdstoreClient.java similarity index 84% rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/MdstoreClient.java rename to dhp-common/src/main/java/eu/dnetlib/dhp/common/MdstoreClient.java index a2177935a..236e4d8b0 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/MdstoreClient.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/MdstoreClient.java @@ -1,13 +1,16 @@ -package eu.dnetlib.dhp.oa.graph.raw.common; +package eu.dnetlib.dhp.common; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.stream.StreamSupport; +import com.mongodb.BasicDBObject; +import com.mongodb.QueryBuilder; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -34,6 +37,16 @@ public class MdstoreClient implements Closeable { this.db = getDb(client, dbName); } + public MongoCollection mdStore(final String mdId) { + BasicDBObject query = (BasicDBObject) QueryBuilder.start("mdId").is(mdId).get(); + + final String currentId = Optional.ofNullable(getColl(db, COLL_METADATA_MANAGER, true).find(query)) + .map(r -> r.first()) + .map(d -> d.getString("currentId")) + .orElseThrow(() -> new IllegalArgumentException("cannot find current mdstore id for: " + mdId)); + return getColl(db, currentId, true); + } + public Map validCollections( final String mdFormat, final String mdLayout, final String mdInterpretation) { diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/rest/DNetRestClient.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/rest/DNetRestClient.java index 014f18606..27713d9b5 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/rest/DNetRestClient.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/rest/DNetRestClient.java @@ -11,9 +11,16 @@ import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.stream.Collectors; public class DNetRestClient { + private static final Logger log = LoggerFactory.getLogger(DNetRestClient.class); + private static ObjectMapper mapper = new ObjectMapper(); public static T doGET(final String url, Class clazz) throws Exception { @@ -44,6 +51,14 @@ public class DNetRestClient { private static String doHTTPRequest(final HttpUriRequest r) throws Exception { CloseableHttpClient client = HttpClients.createDefault(); + + log.info("performing HTTP request, method {} on URI {}", r.getMethod(), r.getURI().toString()); + log.info("request headers: {}", + Arrays.asList(r.getAllHeaders()) + .stream() + .map(h -> h.getName() + ":" + h.getValue()) + .collect(Collectors.joining(","))); + CloseableHttpResponse response = client.execute(r); return IOUtils.toString(response.getEntity().getContent()); } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java index a397c4f9d..ef29cb5b1 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java @@ -21,7 +21,7 @@ import eu.dnetlib.dhp.aggregation.common.AggregatorReport; import eu.dnetlib.dhp.aggregation.common.ReporterCallback; import eu.dnetlib.dhp.aggregation.common.ReportingJob; import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; -import eu.dnetlib.dhp.collection.plugin.mongodb.MongoDbCollectorPlugin; +import eu.dnetlib.dhp.collection.plugin.mongodb.MDStoreCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.mongodb.MongoDbDumpCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.oai.OaiCollectorPlugin; @@ -119,7 +119,7 @@ public class CollectorWorker extends ReportingJob { case mdstore_mongodb_dump: return new MongoDbDumpCollectorPlugin(fileSystem); case mdstore_mongodb: - return new MongoDbCollectorPlugin(); + return new MDStoreCollectorPlugin(); default: throw new UnknownCollectorPluginException("plugin is not managed: " + plugin); } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MongoDbCollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MDStoreCollectorPlugin.java similarity index 54% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MongoDbCollectorPlugin.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MDStoreCollectorPlugin.java index 89b92ffa1..33b9111dd 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MongoDbCollectorPlugin.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MDStoreCollectorPlugin.java @@ -7,48 +7,38 @@ import java.util.Spliterators; import java.util.stream.Stream; import java.util.stream.StreamSupport; -import org.bson.Document; - -import com.mongodb.MongoClient; import com.mongodb.client.MongoCollection; -import com.mongodb.client.MongoDatabase; +import eu.dnetlib.dhp.common.MdstoreClient; import eu.dnetlib.dhp.aggregation.common.AggregatorReport; import eu.dnetlib.dhp.collection.ApiDescriptor; import eu.dnetlib.dhp.collection.CollectorException; import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; +import org.bson.Document; -public class MongoDbCollectorPlugin implements CollectorPlugin { +public class MDStoreCollectorPlugin implements CollectorPlugin { - public static final String MONGODB_HOST = "mongodb_host"; - public static final String MONGODB_PORT = "mongodb_port"; - public static final String MONGODB_COLLECTION = "mongodb_collection"; + public static final String MONGODB_BASEURL = "mongodb_baseurl"; public static final String MONGODB_DBNAME = "mongodb_dbname"; + public static final String MDSTORE_ID = "mongodb_collection"; @Override public Stream collect(ApiDescriptor api, AggregatorReport report) throws CollectorException { - final String host = Optional - .ofNullable(api.getParams().get(MONGODB_HOST)) - .orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", MONGODB_HOST))); - - final Integer port = Optional - .ofNullable(api.getParams().get(MONGODB_PORT)) - .map(Integer::parseInt) - .orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", MONGODB_PORT))); + final String mongoBaseUrl = Optional + .ofNullable(api.getParams().get(MONGODB_BASEURL)) + .orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", MONGODB_BASEURL))); final String dbName = Optional .ofNullable(api.getParams().get(MONGODB_DBNAME)) .orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", MONGODB_DBNAME))); - final String collection = Optional - .ofNullable(api.getParams().get(MONGODB_COLLECTION)) - .orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", MONGODB_COLLECTION))); - - final MongoClient mongoClient = new MongoClient(host, port); - final MongoDatabase database = mongoClient.getDatabase(dbName); - final MongoCollection mdstore = database.getCollection(collection); + final String mdId = Optional + .ofNullable(api.getParams().get(MDSTORE_ID)) + .orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", MDSTORE_ID))); + final MdstoreClient client = new MdstoreClient(mongoBaseUrl, dbName); + final MongoCollection mdstore = client.mdStore(mdId); long size = mdstore.count(); return StreamSupport diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java index e7703bf72..9e7e051de 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java @@ -12,7 +12,7 @@ import org.apache.commons.logging.LogFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; -import eu.dnetlib.dhp.oa.graph.raw.common.MdstoreClient; +import eu.dnetlib.dhp.common.MdstoreClient; public class MigrateMongoMdstoresApplication extends AbstractMigrationApplication implements Closeable {