From 923d19ea8e0f1421706e78c74856b34f1ff85097 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Tue, 4 May 2021 18:06:21 +0200 Subject: [PATCH] mdstore read lock/unlock when bulk copying records from mongodb to hdfs --- .../eu/dnetlib/dhp/common/MdstoreClient.java | 139 +++++++++++++----- .../java/eu/dnetlib/dhp/common/MdstoreTx.java | 45 ++++++ .../mongodb/MDStoreCollectorPlugin.java | 5 +- dhp-workflows/dhp-graph-mapper/pom.xml | 5 + .../raw/MigrateMongoMdstoresApplication.java | 38 +++-- .../MigrateMongoMdstoresApplicationTest.java | 75 ++++++++++ .../dhp/oa/graph/raw/mdstore_metadata.json | 8 + .../oa/graph/raw/mdstore_metadataManager.json | 1 + .../dhp/oa/graph/raw/mdstore_record.json | 6 + pom.xml | 5 + 10 files changed, 276 insertions(+), 51 deletions(-) create mode 100644 dhp-common/src/main/java/eu/dnetlib/dhp/common/MdstoreTx.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplicationTest.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/mdstore_metadata.json create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/mdstore_metadataManager.json create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/mdstore_record.json diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/MdstoreClient.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/MdstoreClient.java index 0bc782ccb..31525ae36 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/MdstoreClient.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/MdstoreClient.java @@ -3,26 +3,19 @@ package eu.dnetlib.dhp.common; import java.io.Closeable; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; +import java.util.*; import java.util.stream.StreamSupport; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.bson.Document; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Iterables; -import com.mongodb.BasicDBObject; -import com.mongodb.MongoClient; -import com.mongodb.MongoClientURI; -import com.mongodb.QueryBuilder; +import com.mongodb.*; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; +import com.mongodb.client.MongoIterable; public class MdstoreClient implements Closeable { @@ -31,37 +24,96 @@ public class MdstoreClient implements Closeable { private final MongoClient client; private final MongoDatabase db; + public static final String MD_ID = "mdId"; + public static final String CURRENT_ID = "currentId"; + public static final String EXPIRING = "expiring"; + public static final String ID = "id"; + public static final String LAST_READ = "lastRead"; + + public static final String FORMAT = "format"; + public static final String LAYOUT = "layout"; + public static final String INTERPRETATION = "interpretation"; + + public static final String BODY = "body"; + private static final String COLL_METADATA = "metadata"; private static final String COLL_METADATA_MANAGER = "metadataManager"; - public MdstoreClient(final String baseUrl, final String dbName) { - this.client = new MongoClient(new MongoClientURI(baseUrl)); + public MdstoreClient(final MongoClient mongoClient, final String dbName) { + this.client = mongoClient; this.db = getDb(client, dbName); } + public Iterable mdStoreRecords(final String mdId) { + return recordIterator(mdStore(mdId)); + } + public MongoCollection mdStore(final String mdId) { - BasicDBObject query = (BasicDBObject) QueryBuilder.start("mdId").is(mdId).get(); - - log.info("querying current mdId: {}", query.toJson()); - - final String currentId = Optional - .ofNullable(getColl(db, COLL_METADATA_MANAGER, true).find(query)) - .map(r -> r.first()) - .map(d -> d.getString("currentId")) - .orElseThrow(() -> new IllegalArgumentException("cannot find current mdstore id for: " + mdId)); - - log.info("currentId: {}", currentId); + final Document mdStoreInfo = getMDStoreInfo(mdId); + final String currentId = mdStoreInfo.getString(CURRENT_ID); + log.info("reading currentId: {}", currentId); return getColl(db, currentId, true); } + public MdstoreTx readLock(final String mdId) { + + final Document mdStoreInfo = getMDStoreInfo(mdId); + final List expiring = mdStoreInfo.get(EXPIRING, List.class); + final String currentId = mdStoreInfo.getString(CURRENT_ID); + + log.info("locking collection {}", currentId); + + if (expiring.size() > 0) { + for (Object value : expiring) { + final Document obj = (Document) value; + final String expiringId = (String) obj.get(ID); + if (currentId.equals(expiringId)) { + obj.put(LAST_READ, new Date()); + break; + } + } + } else { + final BasicDBObject readStore = new BasicDBObject(); + readStore.put(ID, currentId); + readStore.put(LAST_READ, new Date()); + expiring.add(readStore); + } + + getColl(db, COLL_METADATA_MANAGER, true) + .findOneAndReplace(new BasicDBObject("_id", mdStoreInfo.get("_id")), mdStoreInfo); + + return new MdstoreTx(this, mdId, currentId); + } + + public void readUnlock(final String mdId, final String currentId) { + + log.info("unlocking collection {}", currentId); + + final Document mdStoreInfo = getMDStoreInfo(mdId); + final List expiring = mdStoreInfo.get(EXPIRING, List.class); + + expiring + .stream() + .filter(d -> currentId.equals(d.getString(ID))) + .findFirst() + .ifPresent(expired -> expiring.remove(expired)); + } + + /** + * Retrieves from the MDStore mongoDB database a snapshot of the [mdID, currentID] pairs. + * @param mdFormat + * @param mdLayout + * @param mdInterpretation + * @return an HashMap of the mdID -> currentID associations. + */ public Map validCollections( final String mdFormat, final String mdLayout, final String mdInterpretation) { final Map transactions = new HashMap<>(); for (final Document entry : getColl(db, COLL_METADATA_MANAGER, true).find()) { - final String mdId = entry.getString("mdId"); - final String currentId = entry.getString("currentId"); + final String mdId = entry.getString(MD_ID); + final String currentId = entry.getString(CURRENT_ID); if (StringUtils.isNoneBlank(mdId, currentId)) { transactions.put(mdId, currentId); } @@ -69,11 +121,11 @@ public class MdstoreClient implements Closeable { final Map res = new HashMap<>(); for (final Document entry : getColl(db, COLL_METADATA, true).find()) { - if (entry.getString("format").equals(mdFormat) - && entry.getString("layout").equals(mdLayout) - && entry.getString("interpretation").equals(mdInterpretation) - && transactions.containsKey(entry.getString("mdId"))) { - res.put(entry.getString("mdId"), transactions.get(entry.getString("mdId"))); + if (entry.getString(FORMAT).equals(mdFormat) + && entry.getString(LAYOUT).equals(mdLayout) + && entry.getString(INTERPRETATION).equals(mdInterpretation) + && transactions.containsKey(entry.getString(MD_ID))) { + res.put(entry.getString(MD_ID), transactions.get(entry.getString(MD_ID))); } } @@ -92,9 +144,7 @@ public class MdstoreClient implements Closeable { private MongoCollection getColl( final MongoDatabase db, final String collName, final boolean abortIfMissing) { if (!Iterables.contains(db.listCollectionNames(), collName)) { - final String err = String - .format( - String.format("Missing collection '%s' in database '%s'", collName, db.getName())); + final String err = String.format("Missing collection '%s' in database '%s'", collName, db.getName()); log.warn(err); if (abortIfMissing) { throw new RuntimeException(err); @@ -105,14 +155,31 @@ public class MdstoreClient implements Closeable { return db.getCollection(collName); } + private Document getMDStoreInfo(final String mdId) { + return Optional + .ofNullable(getColl(db, COLL_METADATA_MANAGER, true)) + .map(metadataManager -> { + BasicDBObject query = (BasicDBObject) QueryBuilder.start(MD_ID).is(mdId).get(); + log.info("querying current mdId: {}", query.toJson()); + return Optional + .ofNullable(metadataManager.find(query)) + .map(MongoIterable::first) + .orElseThrow(() -> new IllegalArgumentException("cannot find current mdstore id for: " + mdId)); + }) + .orElseThrow(() -> new IllegalStateException("missing collection " + COLL_METADATA_MANAGER)); + } + public Iterable listRecords(final String collName) { - final MongoCollection coll = getColl(db, collName, false); + return recordIterator(getColl(db, collName, false)); + } + + private Iterable recordIterator(MongoCollection coll) { return coll == null ? new ArrayList<>() : () -> StreamSupport .stream(coll.find().spliterator(), false) - .filter(e -> e.containsKey("body")) - .map(e -> e.getString("body")) + .filter(e -> e.containsKey(BODY)) + .map(e -> e.getString(BODY)) .iterator(); } diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/MdstoreTx.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/MdstoreTx.java new file mode 100644 index 000000000..c86257cbb --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/MdstoreTx.java @@ -0,0 +1,45 @@ + +package eu.dnetlib.dhp.common; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Iterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MdstoreTx implements Iterable, Closeable { + + private static final Logger log = LoggerFactory.getLogger(MdstoreTx.class); + + private final MdstoreClient mdstoreClient; + + private final String mdId; + + private final String currentId; + + public MdstoreTx(MdstoreClient mdstoreClient, String mdId, String currentId) { + this.mdstoreClient = mdstoreClient; + this.mdId = mdId; + this.currentId = currentId; + } + + @Override + public Iterator iterator() { + return mdstoreClient.mdStoreRecords(mdId).iterator(); + } + + @Override + public void close() throws IOException { + mdstoreClient.readUnlock(mdId, currentId); + log.info("unlocked collection {}", currentId); + } + + public String getMdId() { + return mdId; + } + + public String getCurrentId() { + return currentId; + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MDStoreCollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MDStoreCollectorPlugin.java index 549c59720..e08fcb453 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MDStoreCollectorPlugin.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MDStoreCollectorPlugin.java @@ -11,6 +11,8 @@ import org.bson.Document; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.mongodb.MongoClient; +import com.mongodb.MongoClientURI; import com.mongodb.client.MongoCollection; import eu.dnetlib.dhp.aggregation.common.AggregatorReport; @@ -46,7 +48,8 @@ public class MDStoreCollectorPlugin implements CollectorPlugin { .orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", MDSTORE_ID))); log.info("mdId: {}", mdId); - final MdstoreClient client = new MdstoreClient(mongoBaseUrl, dbName); + final MongoClient mongoClient = new MongoClient(new MongoClientURI(mongoBaseUrl)); + final MdstoreClient client = new MdstoreClient(mongoClient, dbName); final MongoCollection mdstore = client.mdStore(mdId); long size = mdstore.count(); diff --git a/dhp-workflows/dhp-graph-mapper/pom.xml b/dhp-workflows/dhp-graph-mapper/pom.xml index 5e1c670c3..135516e3e 100644 --- a/dhp-workflows/dhp-graph-mapper/pom.xml +++ b/dhp-workflows/dhp-graph-mapper/pom.xml @@ -91,6 +91,11 @@ org.mongodb mongo-java-driver + + io.fares.junit.mongodb + mongodb-junit-test + test + dom4j dom4j diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java index 50042b569..a06bbc6fd 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java @@ -7,17 +7,20 @@ import java.util.Map; import java.util.Map.Entry; import org.apache.commons.io.IOUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.mongodb.MongoClient; +import com.mongodb.MongoClientURI; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.MdstoreClient; +import eu.dnetlib.dhp.common.MdstoreTx; import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; -public class MigrateMongoMdstoresApplication extends AbstractMigrationApplication - implements Closeable { +public class MigrateMongoMdstoresApplication extends AbstractMigrationApplication implements Closeable { - private static final Log log = LogFactory.getLog(MigrateMongoMdstoresApplication.class); + private static final Logger log = LoggerFactory.getLogger(MigrateMongoMdstoresApplication.class); private final MdstoreClient mdstoreClient; @@ -38,28 +41,35 @@ public class MigrateMongoMdstoresApplication extends AbstractMigrationApplicatio final String hdfsPath = parser.get("hdfsPath"); - try (MigrateMongoMdstoresApplication app = new MigrateMongoMdstoresApplication(hdfsPath, mongoBaseUrl, + final MongoClient mongoClient = new MongoClient(new MongoClientURI(mongoBaseUrl)); + + try (MigrateMongoMdstoresApplication app = new MigrateMongoMdstoresApplication(hdfsPath, mongoClient, mongoDb)) { app.execute(mdFormat, mdLayout, mdInterpretation); } } public MigrateMongoMdstoresApplication( - final String hdfsPath, final String mongoBaseUrl, final String mongoDb) throws Exception { + final String hdfsPath, final MongoClient mongoClient, final String mongoDb) throws Exception { super(hdfsPath); - this.mdstoreClient = new MdstoreClient(mongoBaseUrl, mongoDb); + this.mdstoreClient = new MdstoreClient(mongoClient, mongoDb); } - public void execute(final String format, final String layout, final String interpretation) { + public void execute(final String format, final String layout, final String interpretation) throws IOException { final Map colls = mdstoreClient.validCollections(format, layout, interpretation); - log.info("Found " + colls.size() + " mdstores"); + log.info("Found {} mdstores", colls.size()); for (final Entry entry : colls.entrySet()) { - log.info("Processing mdstore " + entry.getKey() + " (collection: " + entry.getValue() + ")"); - final String currentColl = entry.getValue(); + log.info("Processing mdstore {}", entry.getKey()); - for (final String xml : mdstoreClient.listRecords(currentColl)) { - emit(xml, String.format("%s-%s-%s", format, layout, interpretation)); + final String mdID = entry.getKey(); + try (final MdstoreTx tx = mdstoreClient.readLock(mdID)) { + + log.info("locked collection {}", tx.getCurrentId()); + + for (final String xml : tx) { + emit(xml, String.format("%s-%s-%s", format, layout, interpretation)); + } } } } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplicationTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplicationTest.java new file mode 100644 index 000000000..46ad6ce2b --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplicationTest.java @@ -0,0 +1,75 @@ + +package eu.dnetlib.dhp.oa.graph.raw; + +import com.mongodb.client.MongoDatabase; +import eu.dnetlib.dhp.common.MdstoreClient; +import io.fares.junit.mongodb.MongoExtension; +import io.fares.junit.mongodb.MongoForAllExtension; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.bson.Document; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +public class MigrateMongoMdstoresApplicationTest { + + private static final Logger log = LoggerFactory.getLogger(MigrateMongoMdstoresApplicationTest.class); + + public static final String COLL_NAME = "9eed8a4d-bb41-47c3-987f-9d06aee0dec0::1453898911558"; + + @RegisterExtension + public static MongoForAllExtension mongo = MongoForAllExtension.defaultMongo(); + + @BeforeAll + public static void setUp() throws IOException { + MongoDatabase db = mongo.getMongoClient().getDatabase(MongoExtension.UNIT_TEST_DB); + + db.getCollection(COLL_NAME).insertOne(Document.parse(read("mdstore_record.json"))); + db.getCollection("metadata").insertOne(Document.parse(read("mdstore_metadata.json"))); + db.getCollection("metadataManager").insertOne(Document.parse(read("mdstore_metadataManager.json"))); + } + + + @Test + public void test_MdstoreClient() throws IOException { + try(MdstoreClient client = new MdstoreClient(mongo.getMongoClient(), MongoExtension.UNIT_TEST_DB)) { + for (String xml : client.listRecords(COLL_NAME)) { + Assertions.assertTrue(StringUtils.isNotBlank(xml)); + } + } + } + + @Test + public void test_MigrateMongoMdstoresApplication(@TempDir Path tmpPath) throws Exception { + + final String seqFile = "test_records.seq"; + Path outputPath = tmpPath.resolve(seqFile); + + try (MigrateMongoMdstoresApplication app = new MigrateMongoMdstoresApplication( + outputPath.toString(), + mongo.getMongoClient(), + MongoExtension.UNIT_TEST_DB)) { + app.execute("oai_dc", "store", "native"); + } + + Assertions.assertTrue( + Files.list(tmpPath) + .filter(f -> seqFile.contains(f.getFileName().toString())) + .findFirst() + .isPresent()); + } + + private static String read(String filename) throws IOException { + return IOUtils.toString(MigrateMongoMdstoresApplicationTest.class.getResourceAsStream(filename)); + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/mdstore_metadata.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/mdstore_metadata.json new file mode 100644 index 000000000..1d81f7b58 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/mdstore_metadata.json @@ -0,0 +1,8 @@ +{ + "_id" : ObjectId("53cf633a8274eb9536b614de"), + "mdId" : "9eed8a4d-bb41-47c3-987f-9d06aee0dec0_TURTdG9yZURTUmVzb3VyY2VzL01EU3RvcmVEU1Jlc291cmNlVHlwZQ==", + "format" : "oai_dc", + "layout" : "store", + "interpretation" : "native", + "size" : 1 +} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/mdstore_metadataManager.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/mdstore_metadataManager.json new file mode 100644 index 000000000..50c8bf7f2 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/mdstore_metadataManager.json @@ -0,0 +1 @@ +{ "_id" : ObjectId("53cf63678274eb9536b61940"), "mdId" : "9eed8a4d-bb41-47c3-987f-9d06aee0dec0_TURTdG9yZURTUmVzb3VyY2VzL01EU3RvcmVEU1Jlc291cmNlVHlwZQ==", "currentId" : "9eed8a4d-bb41-47c3-987f-9d06aee0dec0::1453898911558", "expiring" : [ { "id" : "9eed8a4d-bb41-47c3-987f-9d06aee0dec0::1453898911558", "lastRead" : ISODate("2021-04-30T03:34:29.699Z") } ], "transactions" : [ ] } \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/mdstore_record.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/mdstore_record.json new file mode 100644 index 000000000..42f6ed1d9 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/mdstore_record.json @@ -0,0 +1,6 @@ +{ + "id" : "od________76::c6e4a36099aba4b0c390a365251428c9", + "originalId" : null, + "body" : "od________76::c6e4a36099aba4b0c390a365251428c9oai:DiVA.org:du-100072016-01-27T12:48:31.609Zdc42663d-5257-4c6f-bf09-53cf47e36fed_UmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZXMvUmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZVR5cGU=od________76oai:DiVA.org:du-100072012-04-20T14:05:25ZHumanitiesTheologydustudentThesis", + "timestamp" : NumberLong("1453898911613") +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index e80e16c37..e846b2256 100644 --- a/pom.xml +++ b/pom.xml @@ -412,6 +412,11 @@ mongo-java-driver ${mongodb.driver.version} + + io.fares.junit.mongodb + mongodb-junit-test + 1.1.0 + org.postgresql postgresql