From 9d725efdc1e692ec694da2757f8a7e7f1a37b14c Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 20 May 2021 18:26:09 +0200 Subject: [PATCH] reverted implementation of the mdstore client --- .../eu/dnetlib/dhp/common/MdstoreClient.java | 139 +++++------------- .../java/eu/dnetlib/dhp/common/MdstoreTx.java | 45 ------ .../mongodb/MDStoreCollectorPlugin.java | 3 +- .../raw/MigrateMongoMdstoresApplication.java | 26 ++-- .../MigrateMongoMdstoresApplicationTest.java | 30 ++-- 5 files changed, 55 insertions(+), 188 deletions(-) delete mode 100644 dhp-common/src/main/java/eu/dnetlib/dhp/common/MdstoreTx.java diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/MdstoreClient.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/MdstoreClient.java index 31525ae369..0bc782ccb9 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/MdstoreClient.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/MdstoreClient.java @@ -3,19 +3,26 @@ package eu.dnetlib.dhp.common; import java.io.Closeable; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; import java.util.stream.StreamSupport; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.bson.Document; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Iterables; -import com.mongodb.*; +import com.mongodb.BasicDBObject; +import com.mongodb.MongoClient; +import com.mongodb.MongoClientURI; +import com.mongodb.QueryBuilder; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; -import com.mongodb.client.MongoIterable; public class MdstoreClient implements Closeable { @@ -24,96 +31,37 @@ public class MdstoreClient implements Closeable { private final MongoClient client; private final MongoDatabase db; - public static final String MD_ID = "mdId"; - public static final String CURRENT_ID = "currentId"; - public static final String EXPIRING = "expiring"; - public static final String ID = "id"; - public static final String LAST_READ = "lastRead"; - - public static final String FORMAT = "format"; - public static final String LAYOUT = "layout"; - public static final String INTERPRETATION = "interpretation"; - - public static final String BODY = "body"; - private static final String COLL_METADATA = "metadata"; private static final String COLL_METADATA_MANAGER = "metadataManager"; - public MdstoreClient(final MongoClient mongoClient, final String dbName) { - this.client = mongoClient; + public MdstoreClient(final String baseUrl, final String dbName) { + this.client = new MongoClient(new MongoClientURI(baseUrl)); this.db = getDb(client, dbName); } - public Iterable mdStoreRecords(final String mdId) { - return recordIterator(mdStore(mdId)); - } - public MongoCollection mdStore(final String mdId) { - final Document mdStoreInfo = getMDStoreInfo(mdId); - final String currentId = mdStoreInfo.getString(CURRENT_ID); - log.info("reading currentId: {}", currentId); + BasicDBObject query = (BasicDBObject) QueryBuilder.start("mdId").is(mdId).get(); + + log.info("querying current mdId: {}", query.toJson()); + + final String currentId = Optional + .ofNullable(getColl(db, COLL_METADATA_MANAGER, true).find(query)) + .map(r -> r.first()) + .map(d -> d.getString("currentId")) + .orElseThrow(() -> new IllegalArgumentException("cannot find current mdstore id for: " + mdId)); + + log.info("currentId: {}", currentId); return getColl(db, currentId, true); } - public MdstoreTx readLock(final String mdId) { - - final Document mdStoreInfo = getMDStoreInfo(mdId); - final List expiring = mdStoreInfo.get(EXPIRING, List.class); - final String currentId = mdStoreInfo.getString(CURRENT_ID); - - log.info("locking collection {}", currentId); - - if (expiring.size() > 0) { - for (Object value : expiring) { - final Document obj = (Document) value; - final String expiringId = (String) obj.get(ID); - if (currentId.equals(expiringId)) { - obj.put(LAST_READ, new Date()); - break; - } - } - } else { - final BasicDBObject readStore = new BasicDBObject(); - readStore.put(ID, currentId); - readStore.put(LAST_READ, new Date()); - expiring.add(readStore); - } - - getColl(db, COLL_METADATA_MANAGER, true) - .findOneAndReplace(new BasicDBObject("_id", mdStoreInfo.get("_id")), mdStoreInfo); - - return new MdstoreTx(this, mdId, currentId); - } - - public void readUnlock(final String mdId, final String currentId) { - - log.info("unlocking collection {}", currentId); - - final Document mdStoreInfo = getMDStoreInfo(mdId); - final List expiring = mdStoreInfo.get(EXPIRING, List.class); - - expiring - .stream() - .filter(d -> currentId.equals(d.getString(ID))) - .findFirst() - .ifPresent(expired -> expiring.remove(expired)); - } - - /** - * Retrieves from the MDStore mongoDB database a snapshot of the [mdID, currentID] pairs. - * @param mdFormat - * @param mdLayout - * @param mdInterpretation - * @return an HashMap of the mdID -> currentID associations. - */ public Map validCollections( final String mdFormat, final String mdLayout, final String mdInterpretation) { final Map transactions = new HashMap<>(); for (final Document entry : getColl(db, COLL_METADATA_MANAGER, true).find()) { - final String mdId = entry.getString(MD_ID); - final String currentId = entry.getString(CURRENT_ID); + final String mdId = entry.getString("mdId"); + final String currentId = entry.getString("currentId"); if (StringUtils.isNoneBlank(mdId, currentId)) { transactions.put(mdId, currentId); } @@ -121,11 +69,11 @@ public class MdstoreClient implements Closeable { final Map res = new HashMap<>(); for (final Document entry : getColl(db, COLL_METADATA, true).find()) { - if (entry.getString(FORMAT).equals(mdFormat) - && entry.getString(LAYOUT).equals(mdLayout) - && entry.getString(INTERPRETATION).equals(mdInterpretation) - && transactions.containsKey(entry.getString(MD_ID))) { - res.put(entry.getString(MD_ID), transactions.get(entry.getString(MD_ID))); + if (entry.getString("format").equals(mdFormat) + && entry.getString("layout").equals(mdLayout) + && entry.getString("interpretation").equals(mdInterpretation) + && transactions.containsKey(entry.getString("mdId"))) { + res.put(entry.getString("mdId"), transactions.get(entry.getString("mdId"))); } } @@ -144,7 +92,9 @@ public class MdstoreClient implements Closeable { private MongoCollection getColl( final MongoDatabase db, final String collName, final boolean abortIfMissing) { if (!Iterables.contains(db.listCollectionNames(), collName)) { - final String err = String.format("Missing collection '%s' in database '%s'", collName, db.getName()); + final String err = String + .format( + String.format("Missing collection '%s' in database '%s'", collName, db.getName())); log.warn(err); if (abortIfMissing) { throw new RuntimeException(err); @@ -155,31 +105,14 @@ public class MdstoreClient implements Closeable { return db.getCollection(collName); } - private Document getMDStoreInfo(final String mdId) { - return Optional - .ofNullable(getColl(db, COLL_METADATA_MANAGER, true)) - .map(metadataManager -> { - BasicDBObject query = (BasicDBObject) QueryBuilder.start(MD_ID).is(mdId).get(); - log.info("querying current mdId: {}", query.toJson()); - return Optional - .ofNullable(metadataManager.find(query)) - .map(MongoIterable::first) - .orElseThrow(() -> new IllegalArgumentException("cannot find current mdstore id for: " + mdId)); - }) - .orElseThrow(() -> new IllegalStateException("missing collection " + COLL_METADATA_MANAGER)); - } - public Iterable listRecords(final String collName) { - return recordIterator(getColl(db, collName, false)); - } - - private Iterable recordIterator(MongoCollection coll) { + final MongoCollection coll = getColl(db, collName, false); return coll == null ? new ArrayList<>() : () -> StreamSupport .stream(coll.find().spliterator(), false) - .filter(e -> e.containsKey(BODY)) - .map(e -> e.getString(BODY)) + .filter(e -> e.containsKey("body")) + .map(e -> e.getString("body")) .iterator(); } diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/MdstoreTx.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/MdstoreTx.java deleted file mode 100644 index c86257cbb4..0000000000 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/MdstoreTx.java +++ /dev/null @@ -1,45 +0,0 @@ - -package eu.dnetlib.dhp.common; - -import java.io.Closeable; -import java.io.IOException; -import java.util.Iterator; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MdstoreTx implements Iterable, Closeable { - - private static final Logger log = LoggerFactory.getLogger(MdstoreTx.class); - - private final MdstoreClient mdstoreClient; - - private final String mdId; - - private final String currentId; - - public MdstoreTx(MdstoreClient mdstoreClient, String mdId, String currentId) { - this.mdstoreClient = mdstoreClient; - this.mdId = mdId; - this.currentId = currentId; - } - - @Override - public Iterator iterator() { - return mdstoreClient.mdStoreRecords(mdId).iterator(); - } - - @Override - public void close() throws IOException { - mdstoreClient.readUnlock(mdId, currentId); - log.info("unlocked collection {}", currentId); - } - - public String getMdId() { - return mdId; - } - - public String getCurrentId() { - return currentId; - } -} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MDStoreCollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MDStoreCollectorPlugin.java index e08fcb4537..a273149835 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MDStoreCollectorPlugin.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MDStoreCollectorPlugin.java @@ -48,8 +48,7 @@ public class MDStoreCollectorPlugin implements CollectorPlugin { .orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", MDSTORE_ID))); log.info("mdId: {}", mdId); - final MongoClient mongoClient = new MongoClient(new MongoClientURI(mongoBaseUrl)); - final MdstoreClient client = new MdstoreClient(mongoClient, dbName); + final MdstoreClient client = new MdstoreClient(mongoBaseUrl, dbName); final MongoCollection mdstore = client.mdStore(mdId); long size = mdstore.count(); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java index a06bbc6fdd..9acdabb37f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java @@ -15,7 +15,6 @@ import com.mongodb.MongoClientURI; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.MdstoreClient; -import eu.dnetlib.dhp.common.MdstoreTx; import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; public class MigrateMongoMdstoresApplication extends AbstractMigrationApplication implements Closeable { @@ -41,35 +40,28 @@ public class MigrateMongoMdstoresApplication extends AbstractMigrationApplicatio final String hdfsPath = parser.get("hdfsPath"); - final MongoClient mongoClient = new MongoClient(new MongoClientURI(mongoBaseUrl)); - - try (MigrateMongoMdstoresApplication app = new MigrateMongoMdstoresApplication(hdfsPath, mongoClient, + try (MigrateMongoMdstoresApplication app = new MigrateMongoMdstoresApplication(hdfsPath, mongoBaseUrl, mongoDb)) { app.execute(mdFormat, mdLayout, mdInterpretation); } } public MigrateMongoMdstoresApplication( - final String hdfsPath, final MongoClient mongoClient, final String mongoDb) throws Exception { + final String hdfsPath, final String mongoBaseUrl, final String mongoDb) throws Exception { super(hdfsPath); - this.mdstoreClient = new MdstoreClient(mongoClient, mongoDb); + this.mdstoreClient = new MdstoreClient(mongoBaseUrl, mongoDb); } - public void execute(final String format, final String layout, final String interpretation) throws IOException { + public void execute(final String format, final String layout, final String interpretation) { final Map colls = mdstoreClient.validCollections(format, layout, interpretation); - log.info("Found {} mdstores", colls.size()); + log.info("Found " + colls.size() + " mdstores"); for (final Entry entry : colls.entrySet()) { - log.info("Processing mdstore {}", entry.getKey()); + log.info("Processing mdstore " + entry.getKey() + " (collection: " + entry.getValue() + ")"); + final String currentColl = entry.getValue(); - final String mdID = entry.getKey(); - try (final MdstoreTx tx = mdstoreClient.readLock(mdID)) { - - log.info("locked collection {}", tx.getCurrentId()); - - for (final String xml : tx) { - emit(xml, String.format("%s-%s-%s", format, layout, interpretation)); - } + for (final String xml : mdstoreClient.listRecords(currentColl)) { + emit(xml, String.format("%s-%s-%s", format, layout, interpretation)); } } } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplicationTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplicationTest.java index ddb5d873f9..fb2c90e5c5 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplicationTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplicationTest.java @@ -10,6 +10,7 @@ import org.apache.commons.lang3.StringUtils; import org.bson.Document; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.api.io.TempDir; @@ -22,6 +23,7 @@ import eu.dnetlib.dhp.common.MdstoreClient; import io.fares.junit.mongodb.MongoExtension; import io.fares.junit.mongodb.MongoForAllExtension; +@Disabled public class MigrateMongoMdstoresApplicationTest { private static final Logger log = LoggerFactory.getLogger(MigrateMongoMdstoresApplicationTest.class); @@ -40,27 +42,13 @@ public class MigrateMongoMdstoresApplicationTest { db.getCollection("metadataManager").insertOne(Document.parse(read("mdstore_metadataManager.json"))); } - @Test - public void test_MigrateMongoMdstoresApplication(@TempDir Path tmpPath) throws Exception { - - final String seqFile = "test_records.seq"; - Path outputPath = tmpPath.resolve(seqFile); - - try (MigrateMongoMdstoresApplication app = new MigrateMongoMdstoresApplication( - outputPath.toString(), - mongo.getMongoClient(), - MongoExtension.UNIT_TEST_DB)) { - app.execute("oai_dc", "store", "native"); - } - - Assertions - .assertTrue( - Files - .list(tmpPath) - .filter(f -> seqFile.contains(f.getFileName().toString())) - .findFirst() - .isPresent()); - } + /* + * @Test public void test_MigrateMongoMdstoresApplication(@TempDir Path tmpPath) throws Exception { final String + * seqFile = "test_records.seq"; Path outputPath = tmpPath.resolve(seqFile); try (MigrateMongoMdstoresApplication + * app = new MigrateMongoMdstoresApplication( outputPath.toString(), mongo.getMongoClient(), + * MongoExtension.UNIT_TEST_DB)) { app.execute("oai_dc", "store", "native"); } Assertions .assertTrue( Files + * .list(tmpPath) .filter(f -> seqFile.contains(f.getFileName().toString())) .findFirst() .isPresent()); } + */ private static String read(String filename) throws IOException { return IOUtils.toString(MigrateMongoMdstoresApplicationTest.class.getResourceAsStream(filename));