storagehub/src/main/java/org/gcube/data/access/storagehub/handlers/vres/VREQueryRetriever.java

274 lines
9.2 KiB
Java

package org.gcube.data.access.storagehub.handlers.vres;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import javax.jcr.Credentials;
import javax.jcr.Node;
import javax.jcr.NodeIterator;
import javax.jcr.Property;
import javax.jcr.Repository;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
import javax.jcr.observation.Event;
import javax.jcr.observation.EventJournal;
import org.gcube.common.storagehub.model.Excludes;
import org.gcube.common.storagehub.model.exceptions.BackendGenericError;
import org.gcube.common.storagehub.model.items.AbstractFileItem;
import org.gcube.common.storagehub.model.items.FolderItem;
import org.gcube.common.storagehub.model.items.Item;
import org.gcube.data.access.storagehub.handlers.items.Node2ItemConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class VREQueryRetriever implements Callable<List<Item>> {
private static final Logger logger = LoggerFactory.getLogger(VREQueryRetriever.class);
private static final int CACHE_DIMENSION = 50;
private Repository repository;
private Credentials credentials;
private Item vreFolder;
Map<String, Long> cachedMap = new HashMap<>(CACHE_DIMENSION);
long lastTimestamp =0;
long doTime= 0;
boolean underRedo = false;
Node2ItemConverter node2Item;
private static int TIME_MEASURE = Calendar.MINUTE;
private static int VALUE_TIME_BEFORE = 30;
public VREQueryRetriever(Repository repository, Credentials credentials, Node2ItemConverter node2Item, Item vreFolder) {
super();
this.repository = repository;
this.credentials = credentials;
this.vreFolder = vreFolder;
this.node2Item = node2Item;
}
public List<Item> call() {
Session ses = null;
try {
ses = repository.login(credentials);
Calendar now = Calendar.getInstance();
now.add(TIME_MEASURE, -1*Math.abs(VALUE_TIME_BEFORE));
if (doTime> now.getTimeInMillis() || underRedo) {
logger.debug("executing recents task for {} (cahced result)",vreFolder.getTitle());
return correctValues(ses, cachedMap);
}else {
logger.debug("executing recents task for {} (redoing it)",vreFolder.getTitle());
List<Item> toReturn = redo(ses);
doTime = System.currentTimeMillis();
return toReturn;
}
}catch (Exception e) {
logger.error("error preparing recents for folder {}", vreFolder.getTitle(),e);
return Collections.emptyList();
}finally{
if (ses!=null)
ses.logout();
logger.debug("recents task finished");
}
}
public synchronized List<Item> redo(Session ses) {
underRedo = true;
try {
Map<String, Long> tempCachedMap = new HashMap<>(cachedMap);
if (cachedMap.isEmpty())
init(ses, tempCachedMap);
else {
logger.debug("redoing recents for {}",vreFolder.getTitle());
update(ses, tempCachedMap);
}
cachedMap = tempCachedMap;
return correctValues(ses, tempCachedMap);
}finally{
underRedo = false;
}
}
private List<Item> correctValues(Session ses, Map<String, Long> tempCachedMap){
logger.debug("preparing returning values for {}",vreFolder.getTitle());
long start = System.currentTimeMillis();
List<Map.Entry<String, Long>> list = new LinkedList<>(tempCachedMap.entrySet());
list.sort((c1, c2) -> c1.getValue().compareTo(c2.getValue())*-1);
if (list.size()>CACHE_DIMENSION)
for (int index = CACHE_DIMENSION-1; index< list.size() ; index++)
tempCachedMap.remove(list.get(index).getKey());
List<String> cachedIds = list.stream().map(m -> m.getKey()).collect(Collectors.toList());
if (cachedIds.size()>10)
cachedIds = cachedIds.subList(0, 10);
List<Item> result = new ArrayList<>(10);
for (String id: cachedIds) {
try {
Item item = node2Item.getItem(id, ses, Excludes.EXCLUDE_ACCOUNTING);
if (item!=null)
result.add(item);
else logger.warn("item with id {} is null",id);
} catch (BackendGenericError | RepositoryException e) { }
}
logger.debug("returning values prepared in {} for {}",System.currentTimeMillis()-start,vreFolder.getTitle());
return result;
}
private void insertItemInTheRightPlace(Item item, Map<String, Long> tempCachedMap) {
long lastModifiedTime = item.getLastModificationTime().getTimeInMillis();
if (tempCachedMap.size()<CACHE_DIMENSION || lastModifiedTime>Collections.min(tempCachedMap.values()))
tempCachedMap.put(item.getId(), lastModifiedTime);
}
private void init(Session ses,Map<String, Long> tempCachedMap){
try {
long start = System.currentTimeMillis();
lastTimestamp = System.currentTimeMillis();
Node vreFolderNode = ses.getNodeByIdentifier(vreFolder.getId());
logger.debug("starting visiting children for {}",vreFolder.getTitle());
visitChildren(vreFolderNode, tempCachedMap);
logger.debug("initializing recents for {} took {}",vreFolder.getTitle(),System.currentTimeMillis()-start);
} catch (Exception e) {
logger.error("error querying vre {}",vreFolder.getTitle(),e);
throw new RuntimeException(e);
}
}
private void update(Session ses, Map<String, Long> tempCachedMap) {
try {
long timestampToUse = lastTimestamp;
lastTimestamp = System.currentTimeMillis();
long start = System.currentTimeMillis();
final String[] types = { "nthl:workspaceLeafItem", "nthl:workspaceItem"};
EventJournal journalChanged = ses.getWorkspace().getObservationManager().getEventJournal(Event.PROPERTY_CHANGED^Event.NODE_REMOVED^Event.NODE_MOVED^Event.NODE_ADDED, vreFolder.getPath(), true, null, types);
journalChanged.skipTo(timestampToUse);
logger.debug("getting the journal took {}",System.currentTimeMillis()-start);
int events = 0;
//TODO: manage hidden nodes
while (journalChanged.hasNext()) {
events++;
Event event = journalChanged.nextEvent();
try {
switch(event.getType()) {
case Event.NODE_ADDED:
if (ses.nodeExists(event.getPath())) {
Node nodeAdded = ses.getNode(event.getPath());
if (nodeAdded.isNodeType("nthl:workspaceLeafItem")) {
logger.debug("node added event received with name {}", nodeAdded.getName());
Item item = node2Item.getItem(nodeAdded, Excludes.ALL);
if (tempCachedMap.get(event.getIdentifier())!=null)
tempCachedMap.remove(event.getIdentifier());
if (item.isHidden())
break;
insertItemInTheRightPlace(item,tempCachedMap);
}
}
break;
case Event.PROPERTY_CHANGED:
if (ses.propertyExists(event.getPath())) {
Property property = ses.getProperty(event.getPath());
if (property.getName().equalsIgnoreCase("jcr:lastModified")) {
logger.debug("event property changed on {} with value {} and parent {}",property.getName(), property.getValue().getString(), property.getParent().getPath());
String identifier = property.getParent().getIdentifier();
tempCachedMap.remove(identifier);
Item item = node2Item.getItem(property.getParent(), Excludes.ALL);
if (item.isHidden())
break;
insertItemInTheRightPlace(item, tempCachedMap);
}
}
break;
case Event.NODE_REMOVED:
logger.trace("node removed event received with type {}", event.getIdentifier());
if (tempCachedMap.get(event.getIdentifier())!=null &&
tempCachedMap.get(event.getIdentifier())<event.getDate())
tempCachedMap.remove(event.getIdentifier());
break;
case Event.NODE_MOVED:
Node nodeMoved = ses.getNode(event.getPath());
logger.trace("node moved event received with type {}", nodeMoved.getPrimaryNodeType());
if (nodeMoved.isNodeType("nthl:workspaceLeafItem")) {
logger.debug("event node moved on {} with path {}",nodeMoved.getName(), nodeMoved.getPath());
String identifier = nodeMoved.getIdentifier();
String nodePath = ses.getNode(identifier).getPath();
if (tempCachedMap.get(event.getIdentifier())!=null &&
!nodePath.startsWith(vreFolder.getPath()))
tempCachedMap.remove(event.getIdentifier());
}
break;
default:
break;
}
}catch (Exception e) {
logger.warn("error handling event {}",event.getType(),e);
}
}
logger.trace("retrieving event took {} with {} events",System.currentTimeMillis()-start, events);
} catch (Exception e) {
logger.error("error getting events for vre {}",vreFolder.getTitle(),e);
throw new RuntimeException(e);
}
}
private void visitChildren(Node node, Map<String, Long> tempCachedMap) throws Exception{
NodeIterator nodeIt = node.getNodes();
while(nodeIt.hasNext()) {
Node child = nodeIt.nextNode();
Item item = node2Item.getItem(child, Excludes.ALL);
if (item==null || item.isHidden()) continue;
if (item instanceof FolderItem)
visitChildren(child,tempCachedMap);
else if(item instanceof AbstractFileItem)
insertItemInTheRightPlace(item,tempCachedMap);
}
}
/* @Override
public void onEvent(EventIterator events) {
logger.trace("on event called");
while (events.hasNext()) {
Event event = events.nextEvent();
try {
logger.trace("new event received of type {} on node {}",event.getType(),event.getIdentifier());
} catch (RepositoryException e) {
logger.error("error reading event",e);
}
}
}*/
}