forked from D-Net/dnet-hadoop
implemented mdstore collector plugin
This commit is contained in:
parent
e7eba9f7e7
commit
fc3fa5e343
|
@ -98,6 +98,10 @@
|
|||
<artifactId>httpclient</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.mongodb</groupId>
|
||||
<artifactId>mongo-java-driver</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>eu.dnetlib.dhp</groupId>
|
||||
|
|
|
@ -1,13 +1,16 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.graph.raw.common;
|
||||
package eu.dnetlib.dhp.common;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
import com.mongodb.BasicDBObject;
|
||||
import com.mongodb.QueryBuilder;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -34,6 +37,16 @@ public class MdstoreClient implements Closeable {
|
|||
this.db = getDb(client, dbName);
|
||||
}
|
||||
|
||||
public MongoCollection<Document> mdStore(final String mdId) {
|
||||
BasicDBObject query = (BasicDBObject) QueryBuilder.start("mdId").is(mdId).get();
|
||||
|
||||
final String currentId = Optional.ofNullable(getColl(db, COLL_METADATA_MANAGER, true).find(query))
|
||||
.map(r -> r.first())
|
||||
.map(d -> d.getString("currentId"))
|
||||
.orElseThrow(() -> new IllegalArgumentException("cannot find current mdstore id for: " + mdId));
|
||||
return getColl(db, currentId, true);
|
||||
}
|
||||
|
||||
public Map<String, String> validCollections(
|
||||
final String mdFormat, final String mdLayout, final String mdInterpretation) {
|
||||
|
|
@ -11,9 +11,16 @@ import org.apache.http.impl.client.CloseableHttpClient;
|
|||
import org.apache.http.impl.client.HttpClients;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class DNetRestClient {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(DNetRestClient.class);
|
||||
|
||||
private static ObjectMapper mapper = new ObjectMapper();
|
||||
|
||||
public static <T> T doGET(final String url, Class<T> clazz) throws Exception {
|
||||
|
@ -44,6 +51,14 @@ public class DNetRestClient {
|
|||
|
||||
private static String doHTTPRequest(final HttpUriRequest r) throws Exception {
|
||||
CloseableHttpClient client = HttpClients.createDefault();
|
||||
|
||||
log.info("performing HTTP request, method {} on URI {}", r.getMethod(), r.getURI().toString());
|
||||
log.info("request headers: {}",
|
||||
Arrays.asList(r.getAllHeaders())
|
||||
.stream()
|
||||
.map(h -> h.getName() + ":" + h.getValue())
|
||||
.collect(Collectors.joining(",")));
|
||||
|
||||
CloseableHttpResponse response = client.execute(r);
|
||||
return IOUtils.toString(response.getEntity().getContent());
|
||||
}
|
||||
|
|
|
@ -21,7 +21,7 @@ import eu.dnetlib.dhp.aggregation.common.AggregatorReport;
|
|||
import eu.dnetlib.dhp.aggregation.common.ReporterCallback;
|
||||
import eu.dnetlib.dhp.aggregation.common.ReportingJob;
|
||||
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
|
||||
import eu.dnetlib.dhp.collection.plugin.mongodb.MongoDbCollectorPlugin;
|
||||
import eu.dnetlib.dhp.collection.plugin.mongodb.MDStoreCollectorPlugin;
|
||||
import eu.dnetlib.dhp.collection.plugin.mongodb.MongoDbDumpCollectorPlugin;
|
||||
import eu.dnetlib.dhp.collection.plugin.oai.OaiCollectorPlugin;
|
||||
|
||||
|
@ -119,7 +119,7 @@ public class CollectorWorker extends ReportingJob {
|
|||
case mdstore_mongodb_dump:
|
||||
return new MongoDbDumpCollectorPlugin(fileSystem);
|
||||
case mdstore_mongodb:
|
||||
return new MongoDbCollectorPlugin();
|
||||
return new MDStoreCollectorPlugin();
|
||||
default:
|
||||
throw new UnknownCollectorPluginException("plugin is not managed: " + plugin);
|
||||
}
|
||||
|
|
|
@ -7,48 +7,38 @@ import java.util.Spliterators;
|
|||
import java.util.stream.Stream;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
import org.bson.Document;
|
||||
|
||||
import com.mongodb.MongoClient;
|
||||
import com.mongodb.client.MongoCollection;
|
||||
import com.mongodb.client.MongoDatabase;
|
||||
import eu.dnetlib.dhp.common.MdstoreClient;
|
||||
|
||||
import eu.dnetlib.dhp.aggregation.common.AggregatorReport;
|
||||
import eu.dnetlib.dhp.collection.ApiDescriptor;
|
||||
import eu.dnetlib.dhp.collection.CollectorException;
|
||||
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
|
||||
import org.bson.Document;
|
||||
|
||||
public class MongoDbCollectorPlugin implements CollectorPlugin {
|
||||
public class MDStoreCollectorPlugin implements CollectorPlugin {
|
||||
|
||||
public static final String MONGODB_HOST = "mongodb_host";
|
||||
public static final String MONGODB_PORT = "mongodb_port";
|
||||
public static final String MONGODB_COLLECTION = "mongodb_collection";
|
||||
public static final String MONGODB_BASEURL = "mongodb_baseurl";
|
||||
public static final String MONGODB_DBNAME = "mongodb_dbname";
|
||||
public static final String MDSTORE_ID = "mongodb_collection";
|
||||
|
||||
@Override
|
||||
public Stream<String> collect(ApiDescriptor api, AggregatorReport report) throws CollectorException {
|
||||
|
||||
final String host = Optional
|
||||
.ofNullable(api.getParams().get(MONGODB_HOST))
|
||||
.orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", MONGODB_HOST)));
|
||||
|
||||
final Integer port = Optional
|
||||
.ofNullable(api.getParams().get(MONGODB_PORT))
|
||||
.map(Integer::parseInt)
|
||||
.orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", MONGODB_PORT)));
|
||||
final String mongoBaseUrl = Optional
|
||||
.ofNullable(api.getParams().get(MONGODB_BASEURL))
|
||||
.orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", MONGODB_BASEURL)));
|
||||
|
||||
final String dbName = Optional
|
||||
.ofNullable(api.getParams().get(MONGODB_DBNAME))
|
||||
.orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", MONGODB_DBNAME)));
|
||||
|
||||
final String collection = Optional
|
||||
.ofNullable(api.getParams().get(MONGODB_COLLECTION))
|
||||
.orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", MONGODB_COLLECTION)));
|
||||
|
||||
final MongoClient mongoClient = new MongoClient(host, port);
|
||||
final MongoDatabase database = mongoClient.getDatabase(dbName);
|
||||
final MongoCollection<Document> mdstore = database.getCollection(collection);
|
||||
final String mdId = Optional
|
||||
.ofNullable(api.getParams().get(MDSTORE_ID))
|
||||
.orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", MDSTORE_ID)));
|
||||
|
||||
final MdstoreClient client = new MdstoreClient(mongoBaseUrl, dbName);
|
||||
final MongoCollection<Document> mdstore = client.mdStore(mdId);
|
||||
long size = mdstore.count();
|
||||
|
||||
return StreamSupport
|
|
@ -12,7 +12,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
|
||||
import eu.dnetlib.dhp.oa.graph.raw.common.MdstoreClient;
|
||||
import eu.dnetlib.dhp.common.MdstoreClient;
|
||||
|
||||
public class MigrateMongoMdstoresApplication extends AbstractMigrationApplication
|
||||
implements Closeable {
|
||||
|
|
Loading…
Reference in New Issue