forked from antonis.lempesis/dnet-hadoop
mdstore collector plugin
This commit is contained in:
parent
dc98c39500
commit
b830e33392
|
@ -9,16 +9,16 @@ import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.stream.StreamSupport;
|
import java.util.stream.StreamSupport;
|
||||||
|
|
||||||
import com.mongodb.BasicDBObject;
|
|
||||||
import com.mongodb.QueryBuilder;
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.bson.Document;
|
import org.bson.Document;
|
||||||
|
|
||||||
import com.google.common.collect.Iterables;
|
import com.google.common.collect.Iterables;
|
||||||
|
import com.mongodb.BasicDBObject;
|
||||||
import com.mongodb.MongoClient;
|
import com.mongodb.MongoClient;
|
||||||
import com.mongodb.MongoClientURI;
|
import com.mongodb.MongoClientURI;
|
||||||
|
import com.mongodb.QueryBuilder;
|
||||||
import com.mongodb.client.MongoCollection;
|
import com.mongodb.client.MongoCollection;
|
||||||
import com.mongodb.client.MongoDatabase;
|
import com.mongodb.client.MongoDatabase;
|
||||||
|
|
||||||
|
@ -40,10 +40,11 @@ public class MdstoreClient implements Closeable {
|
||||||
public MongoCollection<Document> mdStore(final String mdId) {
|
public MongoCollection<Document> mdStore(final String mdId) {
|
||||||
BasicDBObject query = (BasicDBObject) QueryBuilder.start("mdId").is(mdId).get();
|
BasicDBObject query = (BasicDBObject) QueryBuilder.start("mdId").is(mdId).get();
|
||||||
|
|
||||||
final String currentId = Optional.ofNullable(getColl(db, COLL_METADATA_MANAGER, true).find(query))
|
final String currentId = Optional
|
||||||
.map(r -> r.first())
|
.ofNullable(getColl(db, COLL_METADATA_MANAGER, true).find(query))
|
||||||
.map(d -> d.getString("currentId"))
|
.map(r -> r.first())
|
||||||
.orElseThrow(() -> new IllegalArgumentException("cannot find current mdstore id for: " + mdId));
|
.map(d -> d.getString("currentId"))
|
||||||
|
.orElseThrow(() -> new IllegalArgumentException("cannot find current mdstore id for: " + mdId));
|
||||||
return getColl(db, currentId, true);
|
return getColl(db, currentId, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -7,27 +7,29 @@ import java.util.Spliterators;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
import java.util.stream.StreamSupport;
|
import java.util.stream.StreamSupport;
|
||||||
|
|
||||||
|
import org.bson.Document;
|
||||||
|
|
||||||
import com.mongodb.client.MongoCollection;
|
import com.mongodb.client.MongoCollection;
|
||||||
import eu.dnetlib.dhp.common.MdstoreClient;
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.aggregation.common.AggregatorReport;
|
import eu.dnetlib.dhp.aggregation.common.AggregatorReport;
|
||||||
import eu.dnetlib.dhp.collection.ApiDescriptor;
|
import eu.dnetlib.dhp.collection.ApiDescriptor;
|
||||||
import eu.dnetlib.dhp.collection.CollectorException;
|
import eu.dnetlib.dhp.collection.CollectorException;
|
||||||
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
|
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
|
||||||
import org.bson.Document;
|
import eu.dnetlib.dhp.common.MdstoreClient;
|
||||||
|
|
||||||
public class MDStoreCollectorPlugin implements CollectorPlugin {
|
public class MDStoreCollectorPlugin implements CollectorPlugin {
|
||||||
|
|
||||||
public static final String MONGODB_BASEURL = "mongodb_baseurl";
|
|
||||||
public static final String MONGODB_DBNAME = "mongodb_dbname";
|
public static final String MONGODB_DBNAME = "mongodb_dbname";
|
||||||
public static final String MDSTORE_ID = "mongodb_collection";
|
public static final String MDSTORE_ID = "mdstore_id";
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Stream<String> collect(ApiDescriptor api, AggregatorReport report) throws CollectorException {
|
public Stream<String> collect(ApiDescriptor api, AggregatorReport report) throws CollectorException {
|
||||||
|
|
||||||
final String mongoBaseUrl = Optional
|
final String mongoBaseUrl = Optional
|
||||||
.ofNullable(api.getParams().get(MONGODB_BASEURL))
|
.ofNullable(api.getBaseUrl())
|
||||||
.orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", MONGODB_BASEURL)));
|
.orElseThrow(
|
||||||
|
() -> new CollectorException(
|
||||||
|
"missing mongodb baseUrl, expected in eu.dnetlib.dhp.collection.ApiDescriptor.baseUrl"));
|
||||||
|
|
||||||
final String dbName = Optional
|
final String dbName = Optional
|
||||||
.ofNullable(api.getParams().get(MONGODB_DBNAME))
|
.ofNullable(api.getParams().get(MONGODB_DBNAME))
|
||||||
|
|
|
@ -11,8 +11,8 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
|
|
||||||
import eu.dnetlib.dhp.common.MdstoreClient;
|
import eu.dnetlib.dhp.common.MdstoreClient;
|
||||||
|
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
|
||||||
|
|
||||||
public class MigrateMongoMdstoresApplication extends AbstractMigrationApplication
|
public class MigrateMongoMdstoresApplication extends AbstractMigrationApplication
|
||||||
implements Closeable {
|
implements Closeable {
|
||||||
|
|
Loading…
Reference in New Issue