reverted implementation of the mdstore client

This commit is contained in:
Claudio Atzori 2021-05-20 18:26:09 +02:00
parent 863b56b6ce
commit 9d725efdc1
5 changed files with 55 additions and 188 deletions

View File

@ -3,19 +3,26 @@ package eu.dnetlib.dhp.common;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.*; import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.stream.StreamSupport; import java.util.stream.StreamSupport;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.bson.Document; import org.bson.Document;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.mongodb.*; import com.mongodb.BasicDBObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.QueryBuilder;
import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase; import com.mongodb.client.MongoDatabase;
import com.mongodb.client.MongoIterable;
public class MdstoreClient implements Closeable { public class MdstoreClient implements Closeable {
@ -24,96 +31,37 @@ public class MdstoreClient implements Closeable {
private final MongoClient client; private final MongoClient client;
private final MongoDatabase db; private final MongoDatabase db;
public static final String MD_ID = "mdId";
public static final String CURRENT_ID = "currentId";
public static final String EXPIRING = "expiring";
public static final String ID = "id";
public static final String LAST_READ = "lastRead";
public static final String FORMAT = "format";
public static final String LAYOUT = "layout";
public static final String INTERPRETATION = "interpretation";
public static final String BODY = "body";
private static final String COLL_METADATA = "metadata"; private static final String COLL_METADATA = "metadata";
private static final String COLL_METADATA_MANAGER = "metadataManager"; private static final String COLL_METADATA_MANAGER = "metadataManager";
public MdstoreClient(final MongoClient mongoClient, final String dbName) { public MdstoreClient(final String baseUrl, final String dbName) {
this.client = mongoClient; this.client = new MongoClient(new MongoClientURI(baseUrl));
this.db = getDb(client, dbName); this.db = getDb(client, dbName);
} }
public Iterable<String> mdStoreRecords(final String mdId) {
return recordIterator(mdStore(mdId));
}
public MongoCollection<Document> mdStore(final String mdId) { public MongoCollection<Document> mdStore(final String mdId) {
final Document mdStoreInfo = getMDStoreInfo(mdId); BasicDBObject query = (BasicDBObject) QueryBuilder.start("mdId").is(mdId).get();
final String currentId = mdStoreInfo.getString(CURRENT_ID);
log.info("reading currentId: {}", currentId); log.info("querying current mdId: {}", query.toJson());
final String currentId = Optional
.ofNullable(getColl(db, COLL_METADATA_MANAGER, true).find(query))
.map(r -> r.first())
.map(d -> d.getString("currentId"))
.orElseThrow(() -> new IllegalArgumentException("cannot find current mdstore id for: " + mdId));
log.info("currentId: {}", currentId);
return getColl(db, currentId, true); return getColl(db, currentId, true);
} }
public MdstoreTx readLock(final String mdId) {
final Document mdStoreInfo = getMDStoreInfo(mdId);
final List expiring = mdStoreInfo.get(EXPIRING, List.class);
final String currentId = mdStoreInfo.getString(CURRENT_ID);
log.info("locking collection {}", currentId);
if (expiring.size() > 0) {
for (Object value : expiring) {
final Document obj = (Document) value;
final String expiringId = (String) obj.get(ID);
if (currentId.equals(expiringId)) {
obj.put(LAST_READ, new Date());
break;
}
}
} else {
final BasicDBObject readStore = new BasicDBObject();
readStore.put(ID, currentId);
readStore.put(LAST_READ, new Date());
expiring.add(readStore);
}
getColl(db, COLL_METADATA_MANAGER, true)
.findOneAndReplace(new BasicDBObject("_id", mdStoreInfo.get("_id")), mdStoreInfo);
return new MdstoreTx(this, mdId, currentId);
}
public void readUnlock(final String mdId, final String currentId) {
log.info("unlocking collection {}", currentId);
final Document mdStoreInfo = getMDStoreInfo(mdId);
final List<Document> expiring = mdStoreInfo.get(EXPIRING, List.class);
expiring
.stream()
.filter(d -> currentId.equals(d.getString(ID)))
.findFirst()
.ifPresent(expired -> expiring.remove(expired));
}
/**
* Retrieves from the MDStore mongoDB database a snapshot of the [mdID, currentID] pairs.
* @param mdFormat
* @param mdLayout
* @param mdInterpretation
* @return an HashMap of the mdID -> currentID associations.
*/
public Map<String, String> validCollections( public Map<String, String> validCollections(
final String mdFormat, final String mdLayout, final String mdInterpretation) { final String mdFormat, final String mdLayout, final String mdInterpretation) {
final Map<String, String> transactions = new HashMap<>(); final Map<String, String> transactions = new HashMap<>();
for (final Document entry : getColl(db, COLL_METADATA_MANAGER, true).find()) { for (final Document entry : getColl(db, COLL_METADATA_MANAGER, true).find()) {
final String mdId = entry.getString(MD_ID); final String mdId = entry.getString("mdId");
final String currentId = entry.getString(CURRENT_ID); final String currentId = entry.getString("currentId");
if (StringUtils.isNoneBlank(mdId, currentId)) { if (StringUtils.isNoneBlank(mdId, currentId)) {
transactions.put(mdId, currentId); transactions.put(mdId, currentId);
} }
@ -121,11 +69,11 @@ public class MdstoreClient implements Closeable {
final Map<String, String> res = new HashMap<>(); final Map<String, String> res = new HashMap<>();
for (final Document entry : getColl(db, COLL_METADATA, true).find()) { for (final Document entry : getColl(db, COLL_METADATA, true).find()) {
if (entry.getString(FORMAT).equals(mdFormat) if (entry.getString("format").equals(mdFormat)
&& entry.getString(LAYOUT).equals(mdLayout) && entry.getString("layout").equals(mdLayout)
&& entry.getString(INTERPRETATION).equals(mdInterpretation) && entry.getString("interpretation").equals(mdInterpretation)
&& transactions.containsKey(entry.getString(MD_ID))) { && transactions.containsKey(entry.getString("mdId"))) {
res.put(entry.getString(MD_ID), transactions.get(entry.getString(MD_ID))); res.put(entry.getString("mdId"), transactions.get(entry.getString("mdId")));
} }
} }
@ -144,7 +92,9 @@ public class MdstoreClient implements Closeable {
private MongoCollection<Document> getColl( private MongoCollection<Document> getColl(
final MongoDatabase db, final String collName, final boolean abortIfMissing) { final MongoDatabase db, final String collName, final boolean abortIfMissing) {
if (!Iterables.contains(db.listCollectionNames(), collName)) { if (!Iterables.contains(db.listCollectionNames(), collName)) {
final String err = String.format("Missing collection '%s' in database '%s'", collName, db.getName()); final String err = String
.format(
String.format("Missing collection '%s' in database '%s'", collName, db.getName()));
log.warn(err); log.warn(err);
if (abortIfMissing) { if (abortIfMissing) {
throw new RuntimeException(err); throw new RuntimeException(err);
@ -155,31 +105,14 @@ public class MdstoreClient implements Closeable {
return db.getCollection(collName); return db.getCollection(collName);
} }
private Document getMDStoreInfo(final String mdId) {
return Optional
.ofNullable(getColl(db, COLL_METADATA_MANAGER, true))
.map(metadataManager -> {
BasicDBObject query = (BasicDBObject) QueryBuilder.start(MD_ID).is(mdId).get();
log.info("querying current mdId: {}", query.toJson());
return Optional
.ofNullable(metadataManager.find(query))
.map(MongoIterable::first)
.orElseThrow(() -> new IllegalArgumentException("cannot find current mdstore id for: " + mdId));
})
.orElseThrow(() -> new IllegalStateException("missing collection " + COLL_METADATA_MANAGER));
}
public Iterable<String> listRecords(final String collName) { public Iterable<String> listRecords(final String collName) {
return recordIterator(getColl(db, collName, false)); final MongoCollection<Document> coll = getColl(db, collName, false);
}
private Iterable<String> recordIterator(MongoCollection<Document> coll) {
return coll == null return coll == null
? new ArrayList<>() ? new ArrayList<>()
: () -> StreamSupport : () -> StreamSupport
.stream(coll.find().spliterator(), false) .stream(coll.find().spliterator(), false)
.filter(e -> e.containsKey(BODY)) .filter(e -> e.containsKey("body"))
.map(e -> e.getString(BODY)) .map(e -> e.getString("body"))
.iterator(); .iterator();
} }

View File

@ -1,45 +0,0 @@
package eu.dnetlib.dhp.common;
import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MdstoreTx implements Iterable<String>, Closeable {
private static final Logger log = LoggerFactory.getLogger(MdstoreTx.class);
private final MdstoreClient mdstoreClient;
private final String mdId;
private final String currentId;
public MdstoreTx(MdstoreClient mdstoreClient, String mdId, String currentId) {
this.mdstoreClient = mdstoreClient;
this.mdId = mdId;
this.currentId = currentId;
}
@Override
public Iterator<String> iterator() {
return mdstoreClient.mdStoreRecords(mdId).iterator();
}
@Override
public void close() throws IOException {
mdstoreClient.readUnlock(mdId, currentId);
log.info("unlocked collection {}", currentId);
}
public String getMdId() {
return mdId;
}
public String getCurrentId() {
return currentId;
}
}

View File

@ -48,8 +48,7 @@ public class MDStoreCollectorPlugin implements CollectorPlugin {
.orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", MDSTORE_ID))); .orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", MDSTORE_ID)));
log.info("mdId: {}", mdId); log.info("mdId: {}", mdId);
final MongoClient mongoClient = new MongoClient(new MongoClientURI(mongoBaseUrl)); final MdstoreClient client = new MdstoreClient(mongoBaseUrl, dbName);
final MdstoreClient client = new MdstoreClient(mongoClient, dbName);
final MongoCollection<Document> mdstore = client.mdStore(mdId); final MongoCollection<Document> mdstore = client.mdStore(mdId);
long size = mdstore.count(); long size = mdstore.count();

View File

@ -15,7 +15,6 @@ import com.mongodb.MongoClientURI;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.MdstoreClient; import eu.dnetlib.dhp.common.MdstoreClient;
import eu.dnetlib.dhp.common.MdstoreTx;
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
public class MigrateMongoMdstoresApplication extends AbstractMigrationApplication implements Closeable { public class MigrateMongoMdstoresApplication extends AbstractMigrationApplication implements Closeable {
@ -41,38 +40,31 @@ public class MigrateMongoMdstoresApplication extends AbstractMigrationApplicatio
final String hdfsPath = parser.get("hdfsPath"); final String hdfsPath = parser.get("hdfsPath");
final MongoClient mongoClient = new MongoClient(new MongoClientURI(mongoBaseUrl)); try (MigrateMongoMdstoresApplication app = new MigrateMongoMdstoresApplication(hdfsPath, mongoBaseUrl,
try (MigrateMongoMdstoresApplication app = new MigrateMongoMdstoresApplication(hdfsPath, mongoClient,
mongoDb)) { mongoDb)) {
app.execute(mdFormat, mdLayout, mdInterpretation); app.execute(mdFormat, mdLayout, mdInterpretation);
} }
} }
public MigrateMongoMdstoresApplication( public MigrateMongoMdstoresApplication(
final String hdfsPath, final MongoClient mongoClient, final String mongoDb) throws Exception { final String hdfsPath, final String mongoBaseUrl, final String mongoDb) throws Exception {
super(hdfsPath); super(hdfsPath);
this.mdstoreClient = new MdstoreClient(mongoClient, mongoDb); this.mdstoreClient = new MdstoreClient(mongoBaseUrl, mongoDb);
} }
public void execute(final String format, final String layout, final String interpretation) throws IOException { public void execute(final String format, final String layout, final String interpretation) {
final Map<String, String> colls = mdstoreClient.validCollections(format, layout, interpretation); final Map<String, String> colls = mdstoreClient.validCollections(format, layout, interpretation);
log.info("Found {} mdstores", colls.size()); log.info("Found " + colls.size() + " mdstores");
for (final Entry<String, String> entry : colls.entrySet()) { for (final Entry<String, String> entry : colls.entrySet()) {
log.info("Processing mdstore {}", entry.getKey()); log.info("Processing mdstore " + entry.getKey() + " (collection: " + entry.getValue() + ")");
final String currentColl = entry.getValue();
final String mdID = entry.getKey(); for (final String xml : mdstoreClient.listRecords(currentColl)) {
try (final MdstoreTx tx = mdstoreClient.readLock(mdID)) {
log.info("locked collection {}", tx.getCurrentId());
for (final String xml : tx) {
emit(xml, String.format("%s-%s-%s", format, layout, interpretation)); emit(xml, String.format("%s-%s-%s", format, layout, interpretation));
} }
} }
} }
}
@Override @Override
public void close() throws IOException { public void close() throws IOException {

View File

@ -10,6 +10,7 @@ import org.apache.commons.lang3.StringUtils;
import org.bson.Document; import org.bson.Document;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.api.io.TempDir;
@ -22,6 +23,7 @@ import eu.dnetlib.dhp.common.MdstoreClient;
import io.fares.junit.mongodb.MongoExtension; import io.fares.junit.mongodb.MongoExtension;
import io.fares.junit.mongodb.MongoForAllExtension; import io.fares.junit.mongodb.MongoForAllExtension;
@Disabled
public class MigrateMongoMdstoresApplicationTest { public class MigrateMongoMdstoresApplicationTest {
private static final Logger log = LoggerFactory.getLogger(MigrateMongoMdstoresApplicationTest.class); private static final Logger log = LoggerFactory.getLogger(MigrateMongoMdstoresApplicationTest.class);
@ -40,27 +42,13 @@ public class MigrateMongoMdstoresApplicationTest {
db.getCollection("metadataManager").insertOne(Document.parse(read("mdstore_metadataManager.json"))); db.getCollection("metadataManager").insertOne(Document.parse(read("mdstore_metadataManager.json")));
} }
@Test /*
public void test_MigrateMongoMdstoresApplication(@TempDir Path tmpPath) throws Exception { * @Test public void test_MigrateMongoMdstoresApplication(@TempDir Path tmpPath) throws Exception { final String
* seqFile = "test_records.seq"; Path outputPath = tmpPath.resolve(seqFile); try (MigrateMongoMdstoresApplication
final String seqFile = "test_records.seq"; * app = new MigrateMongoMdstoresApplication( outputPath.toString(), mongo.getMongoClient(),
Path outputPath = tmpPath.resolve(seqFile); * MongoExtension.UNIT_TEST_DB)) { app.execute("oai_dc", "store", "native"); } Assertions .assertTrue( Files
* .list(tmpPath) .filter(f -> seqFile.contains(f.getFileName().toString())) .findFirst() .isPresent()); }
try (MigrateMongoMdstoresApplication app = new MigrateMongoMdstoresApplication( */
outputPath.toString(),
mongo.getMongoClient(),
MongoExtension.UNIT_TEST_DB)) {
app.execute("oai_dc", "store", "native");
}
Assertions
.assertTrue(
Files
.list(tmpPath)
.filter(f -> seqFile.contains(f.getFileName().toString()))
.findFirst()
.isPresent());
}
private static String read(String filename) throws IOException { private static String read(String filename) throws IOException {
return IOUtils.toString(MigrateMongoMdstoresApplicationTest.class.getResourceAsStream(filename)); return IOUtils.toString(MigrateMongoMdstoresApplicationTest.class.getResourceAsStream(filename));