dnet-core/dnet-data-services/src/main/java/eu/dnetlib/data/objectstore/filesystem/FileSystemObjectStoreResult...

351 lines
7.7 KiB
Java

package eu.dnetlib.data.objectstore.filesystem;
import java.util.List;
import com.google.common.collect.Lists;
import com.mongodb.DBObject;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Sorts;
import eu.dnetlib.enabling.resultset.ResultSet;
import eu.dnetlib.enabling.resultset.ResultSetAware;
import eu.dnetlib.enabling.resultset.ResultSetListener;
import eu.dnetlib.miscutils.collections.MappedCollection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.bson.conversions.Bson;
/**
* The listener interface for receiving fileSystemObjectStoreResultSet events.
* The class that is interested in processing a fileSystemObjectStoreResultSet
* event implements this interface, and the object created
* with that class is registered with a component using the
* component's <code>addFileSystemObjectStoreResultSetListener<code> method. When
* the fileSystemObjectStoreResultSet event occurs, that object's appropriate
* method is invoked.
*
* @author sandro
*/
public class FileSystemObjectStoreResultSetListener implements ResultSetListener, ResultSetAware {
/** The Constant log. */
private static final Log log = LogFactory.getLog(FileSystemObjectStoreResultSetListener.class); // NOPMD by marko on 11/24/08 5:02 PM
/** The from date. */
private Long fromDate;
/** The until date. */
private Long untilDate;
/** The records. */
private List<String> records;
/** The object store id. */
private String objectStoreID;
/** The mongo collection. */
private MongoCollection<DBObject> mongoCollection;
/** The base uri. */
private String baseURI;
/**
* The base path
*/
private String basePath;
/** The current size. */
private int currentSize = -1;
/** The current cursor. */
private MongoCursor<DBObject> currentCursor;
/** The cursor position. */
private long cursorPosition;
/**
* {@inheritDoc}
* @see eu.dnetlib.enabling.resultset.TypedResultSetListener#getResult(int, int)
*/
@Override
public List<String> getResult(final int from, final int to) {
if (log.isDebugEnabled()) {
log.debug(String.format("ObjectStoreId :%s, from: %d, to: %d", objectStoreID, from, to));
}
if (records != null) {
List<String> ids = Lists.newArrayList();
for (int i = from-1; i < Math.min(records.size(),to); i++) {
ids.add(records.get(i));
}
Bson q = Filters.in("id", ids);
FindIterable<DBObject> res = getMongoCollection().find(q);
return MappedCollection.listMap(res, ObjectStoreFileUtility.asJSON(getBaseURI(), getObjectStoreID(), getBasePath()));
} else if ((fromDate != null) && (untilDate != null)) {
if ((currentCursor == null) || (cursorPosition > from)) {
createCurrentCursor();
}
while (cursorPosition < from) {
currentCursor.next();
cursorPosition++;
}
List<DBObject> result = Lists.newArrayList();
for (int i = from; i <= to; i++) {
if (currentCursor.hasNext()) {
result.add(currentCursor.next());
cursorPosition++;
}
}
return MappedCollection.listMap(result, ObjectStoreFileUtility.asJSON(getBaseURI(), getObjectStoreID(), getBasePath()));
}
throw new IllegalArgumentException("Missing parameters on Delivery must provide either from, to, or ObjectStoreIDs");
}
/**
* Creates the current cursor.
*/
private void createCurrentCursor() {
Bson timestampQuery = Filters.and(Filters.gt("timestamp", fromDate.doubleValue()), Filters.lt("timestamp", untilDate.doubleValue()));
if (currentCursor != null) {
currentCursor.close();
}
currentCursor = getMongoCollection().find(timestampQuery).sort(Sorts.orderBy(Filters.eq("_id", 1))).iterator();
cursorPosition = 1;
}
/**
* {@inheritDoc}
* @see eu.dnetlib.enabling.resultset.TypedResultSetListener#getSize()
*/
@Override
public int getSize() {
if (currentSize == -1) {
currentSize = calculateSize();
}
return Math.max(0, currentSize - 1);
}
/**
* Calculate size.
*
* @return the int
*/
private int calculateSize() {
if (records != null) {
Bson query = Filters.in("id", records);
return (int) getMongoCollection().count(query);
} else if ((fromDate != null) && (untilDate != null)) {
Bson timestampQuery = Filters.and(Filters.gt("timestamp", fromDate.doubleValue()), Filters.lt("timestamp", untilDate.doubleValue()));
return (int) getMongoCollection().count(timestampQuery);
}
return 0;
}
/**
* {@inheritDoc}
* @see eu.dnetlib.enabling.resultset.ResultSetAware#setResultSet(eu.dnetlib.enabling.resultset.ResultSet)
*/
@Override
public void setResultSet(final ResultSet resultSet) {
resultSet.close();
}
/**
* Gets the from date.
*
* @return the fromDate
*/
public Long getFromDate() {
return fromDate;
}
/**
* Sets the from date.
*
* @param fromDate the fromDate to set
*/
public FileSystemObjectStoreResultSetListener setFromDate(final Long fromDate) {
this.fromDate = fromDate;
return this;
}
/**
* Gets the until date.
*
* @return the untilDate
*/
public Long getUntilDate() {
return untilDate;
}
/**
* Sets the until date.
*
* @param untilDate the untilDate to set
*/
public FileSystemObjectStoreResultSetListener setUntilDate(final Long untilDate) {
this.untilDate = untilDate;
return this;
}
/**
* Gets the records.
*
* @return the records
*/
public List<String> getRecords() {
return records;
}
/**
* Sets the records.
*
* @param records the records to set
*/
public void setRecords(final List<String> records) {
this.records = records;
}
/**
* Gets the object store id.
*
* @return the objectStoreID
*/
public String getObjectStoreID() {
return objectStoreID;
}
/**
* Sets the object store id.
*
* @param objectStoreID the objectStoreID to set
*/
public void setObjectStoreID(final String objectStoreID) {
this.objectStoreID = objectStoreID;
}
/**
* Gets the base uri.
*
* @return the baseURI
*/
public String getBaseURI() {
return baseURI;
}
/**
* Sets the base uri.
*
* @param baseURI the baseURI to set
*/
public void setBaseURI(final String baseURI) {
this.baseURI = baseURI;
}
/**
* Gets the current size.
*
* @return the currentSize
*/
public int getCurrentSize() {
return currentSize;
}
/**
* Sets the current size.
*
* @param currentSize the currentSize to set
*/
public void setCurrentSize(final int currentSize) {
this.currentSize = currentSize;
}
/**
* Gets the current cursor.
*
* @return the currentCursor
*/
public MongoCursor<DBObject> getCurrentCursor() {
return currentCursor;
}
/**
* Sets the current cursor.
*
* @param currentCursor the currentCursor to set
*/
public void setCurrentCursor(final MongoCursor<DBObject> currentCursor) {
this.currentCursor = currentCursor;
}
/**
* Gets the cursor position.
*
* @return the cursorPosition
*/
public long getCursorPosition() {
return cursorPosition;
}
/**
* Sets the cursor position.
*
* @param cursorPosition the cursorPosition to set
*/
public void setCursorPosition(final long cursorPosition) {
this.cursorPosition = cursorPosition;
}
/**
* Gets the mongo collection.
*
* @return the mongo collection
*/
public MongoCollection<DBObject> getMongoCollection() {
return mongoCollection;
}
/**
* Sets the mongo collection.
*
* @param mongoCollection the new mongo collection
*/
public void setMongoCollection(final MongoCollection<DBObject> mongoCollection) {
this.mongoCollection = mongoCollection;
}
public String getBasePath() {
return basePath;
}
public void setBasePath(final String basePath) {
this.basePath = basePath;
}
}