package eu.dnetlib.data.mdstore.modular.mongodb; import java.util.List; import java.util.Map; import javax.annotation.Nullable; import com.google.common.base.Predicate; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.mongodb.BasicDBObject; import com.mongodb.DBObject; import com.mongodb.WriteConcern; import com.mongodb.client.MongoCollection; import com.mongodb.client.model.*; import eu.dnetlib.data.mdstore.MDStoreServiceException; import eu.dnetlib.data.mdstore.modular.MDFormatDescription; import eu.dnetlib.data.mdstore.modular.RecordParser; import eu.dnetlib.data.mdstore.modular.mongodb.utils.IndexFieldRecordParser; import eu.dnetlib.data.mdstore.modular.mongodb.utils.IndexFieldRecordParserException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import static eu.dnetlib.data.mdstore.modular.MDStoreConstants.*; public class MongoBulkWritesManager { private static final Log log = LogFactory.getLog(MongoBulkWritesManager.class); private final boolean discardRecords; private final boolean indexRecords; private final IndexFieldRecordParser indexFieldRecordParser = new IndexFieldRecordParser(); private final List mdref; private RecordParser recordParser; private MongoCollection validCollection; private List> validBulkOperationList; private BulkWriteOptions writeOptions; private MongoCollection discardedCollection; private List> discardedBulkOperationList; private int bulkSize; public MongoBulkWritesManager(final MongoCollection collection, final MongoCollection discardedCollection, final List mdref, final int bulkSize, final RecordParser parser, final boolean discardRecords) { this.validCollection = collection.withWriteConcern(WriteConcern.ACKNOWLEDGED); this.validBulkOperationList = Lists.newArrayList(); this.discardedCollection = discardedCollection.withWriteConcern(WriteConcern.ACKNOWLEDGED); this.discardedBulkOperationList = Lists.newArrayList(); this.bulkSize = bulkSize; this.recordParser = parser; this.discardRecords = discardRecords; this.mdref = mdref; this.indexRecords = (this.mdref != null && !this.mdref.isEmpty()); this.writeOptions = new BulkWriteOptions().ordered(false); } public void insert(final String record) throws MDStoreServiceException { Map recordProperties = null; try { recordProperties = recordParser.parseRecord(record); } catch (Throwable e) { if (discardRecords) { log.debug("unhandled exception: " + e.getMessage()); discardRecord(record); } } Map> indexRecordField = null; try { if (indexRecords) { indexRecordField = indexFieldRecordParser.parseRecord(record, mdref); } } catch (IndexFieldRecordParserException e) { // could not index record fields throw new MDStoreServiceException("Are you using the correct type of store / index definition for the records in " + validCollection.getNamespace() + " ?", e); } log.debug("found props: " + recordProperties); if (recordProperties.containsKey(ID)) { final DBObject obj = buildDBObject(record, recordProperties, indexRecordField); if (log.isDebugEnabled()) { log.debug("Saving object" + obj); } validBulkOperationList.add(new ReplaceOneModel(new BasicDBObject(ID, obj.get(ID)), obj, new UpdateOptions().upsert(true))); if (((validBulkOperationList.size() % bulkSize) == 0) && !validBulkOperationList.isEmpty()) { validCollection.bulkWrite(validBulkOperationList, writeOptions); validBulkOperationList.clear(); } } else { if (discardRecords) { log.debug("parsed record seems invalid"); discardRecord(record); } } } private void discardRecord(final String record) { discardedBulkOperationList.add(new InsertOneModel(new BasicDBObject(BODY, record))); if (((discardedBulkOperationList.size() % bulkSize) == 0) && !discardedBulkOperationList.isEmpty()) { discardedCollection.bulkWrite(discardedBulkOperationList, writeOptions); discardedBulkOperationList.clear(); } } public void flushBulks() { //setting to journaled write concern to be sure that when the write returns everything has been flushed to disk (https://docs.mongodb.org/manual/faq/developers/#when-does-mongodb-write-updates-to-disk) //the explicit fsync command can't be run anymore: 'Command failed with error 13: 'fsync may only be run against the admin database.' if (!validBulkOperationList.isEmpty()) { validCollection = getCollectionWithWriteConcern(validCollection, WriteConcern.JOURNALED); validCollection.bulkWrite(validBulkOperationList, writeOptions); } if (!discardedBulkOperationList.isEmpty()) { discardedCollection = getCollectionWithWriteConcern(discardedCollection, WriteConcern.JOURNALED); discardedCollection.bulkWrite(discardedBulkOperationList, writeOptions); } //setting write concern back to ACKNOWLEDGE to avoid the execution of future writes all in Journaled mode validCollection = getCollectionWithWriteConcern(validCollection, WriteConcern.ACKNOWLEDGED); discardedCollection = getCollectionWithWriteConcern(discardedCollection, WriteConcern.ACKNOWLEDGED); } protected DBObject buildDBObject(final String record, final Map recordProperties, final Map> indexFieldProperties) { final DBObject obj = new BasicDBObject(); obj.put(ID, recordProperties.get(ID)); obj.put(ORIGINALID, recordProperties.get(ORIGINALID)); obj.put(BODY, record); obj.put(TIMESTAMP, Long.valueOf(recordProperties.get(TIMESTAMP))); if (indexFieldProperties != null) obj.putAll(Maps.filterKeys(indexFieldProperties, new Predicate() { //ensure we do not override the mandatory fields above with some unexpected value @Override public boolean apply(@Nullable final String s) { return !s.equalsIgnoreCase(ID) && !s.equalsIgnoreCase(ORIGINALID) && !s.equalsIgnoreCase(BODY) && !s.equalsIgnoreCase(TIMESTAMP); } })); return obj; } private MongoCollection getCollectionWithWriteConcern(MongoCollection collection, WriteConcern writeConcern) { return collection.withWriteConcern(writeConcern); } }