@ -1,7 +1,6 @@
package org.gcube.data.access.storagehub.handlers.vres ;
import java.util.ArrayList ;
import java.util.Arrays ;
import java.util.Calendar ;
import java.util.Collections ;
import java.util.HashMap ;
@ -11,7 +10,6 @@ import java.util.Map;
import java.util.concurrent.Callable ;
import java.util.stream.Collectors ;
import javax.inject.Inject ;
import javax.jcr.Credentials ;
import javax.jcr.Node ;
import javax.jcr.NodeIterator ;
@ -23,7 +21,6 @@ import javax.jcr.observation.Event;
import javax.jcr.observation.EventJournal ;
import org.gcube.common.storagehub.model.Excludes ;
import org.gcube.common.storagehub.model.NodeConstants ;
import org.gcube.common.storagehub.model.exceptions.BackendGenericError ;
import org.gcube.common.storagehub.model.items.AbstractFileItem ;
import org.gcube.common.storagehub.model.items.FolderItem ;
@ -41,14 +38,17 @@ public class VREQueryRetriever implements Callable<List<Item>> {
private Repository repository ;
private Credentials credentials ;
private Item vreFolder ;
Map < String , Long > cached List = new HashMap < > ( CACHE_DIMENSION ) ;
Map < String , Long > cached Map = new HashMap < > ( CACHE_DIMENSION ) ;
long higherTimestamp = Long . MAX_VALUE ;
long lastTimestamp = 0 ;
long doTime = 0 ;
boolean underRedo = false ;
Node2ItemConverter node2Item ;
public VREQueryRetriever ( Repository repository , Credentials credentials , Node2ItemConverter node2Item , Item vreFolder ) {
super ( ) ;
this . repository = repository ;
@ -58,101 +58,22 @@ public class VREQueryRetriever implements Callable<List<Item>> {
}
public List < Item > call ( ) {
logger . debug ( "executing recents task for {}" , vreFolder . getTitle ( ) ) ;
Session ses = null ;
try {
ses = repository . login ( credentials ) ;
//if (cachedList.isEmpty()) {
init ( ses ) ;
/ * } else {
logger . debug ( "redoing recents for {}" , vreFolder . getTitle ( ) ) ;
try {
long timestampToUse = lastTimestamp ;
lastTimestamp = System . currentTimeMillis ( ) ;
long start = System . currentTimeMillis ( ) ;
ses = repository . login ( credentials ) ;
final String [ ] types = { "nthl:workspaceLeafItem" , "nthl:workspaceItem" } ;
EventJournal journalChanged = ses . getWorkspace ( ) . getObservationManager ( ) . getEventJournal ( Event . PROPERTY_CHANGED ^ Event . NODE_REMOVED ^ Event . NODE_MOVED ^ Event . NODE_ADDED , vreFolder . getPath ( ) , true , null , types ) ;
journalChanged . skipTo ( timestampToUse ) ;
logger . debug ( "getting the journal took {}" , System . currentTimeMillis ( ) - start ) ;
int events = 0 ;
while ( journalChanged . hasNext ( ) ) {
events + + ;
Event event = journalChanged . nextEvent ( ) ;
switch ( event . getType ( ) ) {
case Event . NODE_ADDED :
if ( ses . nodeExists ( event . getPath ( ) ) ) {
Node nodeAdded = ses . getNode ( event . getPath ( ) ) ;
if ( nodeAdded . isNodeType ( "nthl:workspaceLeafItem" ) ) {
logger . trace ( "node added event received with name {}" , nodeAdded . getName ( ) ) ;
Item item = node2Item . getItem ( nodeAdded , Arrays . asList ( NodeConstants . ACCOUNTING_NAME ) ) ;
if ( cachedList . get ( event . getIdentifier ( ) ) ! = null )
cachedList . remove ( event . getIdentifier ( ) ) ;
insertItemInTheRightPlace ( item ) ;
}
}
break ;
case Event . PROPERTY_CHANGED :
if ( ses . propertyExists ( event . getPath ( ) ) ) {
Property property = ses . getProperty ( event . getPath ( ) ) ;
if ( property . getName ( ) . equalsIgnoreCase ( "jcr:lastModified" ) ) {
logger . trace ( "event property changed on {} with value {} and parent {}" , property . getName ( ) , property . getValue ( ) . getString ( ) , property . getParent ( ) . getPath ( ) ) ;
String identifier = property . getParent ( ) . getIdentifier ( ) ;
cachedList . remove ( identifier ) ;
Item item = node2Item . getItem ( property . getParent ( ) , Excludes . EXCLUDE_ACCOUNTING ) ;
insertItemInTheRightPlace ( item ) ;
}
}
break ;
case Event . NODE_REMOVED :
logger . trace ( "node removed event received with type {}" , event . getIdentifier ( ) ) ;
if ( cachedList . get ( event . getIdentifier ( ) ) ! = null & &
cachedList . get ( event . getIdentifier ( ) ) < event . getDate ( ) )
cachedList . remove ( event . getIdentifier ( ) ) ;
break ;
case Event . NODE_MOVED :
Node nodeMoved = ses . getNode ( event . getPath ( ) ) ;
logger . trace ( "node moved event received with type {}" , nodeMoved . getPrimaryNodeType ( ) ) ;
if ( nodeMoved . isNodeType ( "nthl:workspaceLeafItem" ) ) {
logger . trace ( "event node moved on {} with path {}" , nodeMoved . getName ( ) , nodeMoved . getPath ( ) ) ;
String identifier = nodeMoved . getIdentifier ( ) ;
String nodePath = ses . getNode ( identifier ) . getPath ( ) ;
if ( cachedList . get ( event . getIdentifier ( ) ) ! = null & &
! nodePath . startsWith ( vreFolder . getPath ( ) ) )
cachedList . remove ( event . getIdentifier ( ) ) ;
}
break ;
default :
throw new Exception ( "error in event handling" ) ;
}
}
logger . trace ( "retrieving event took {} with {} events" , System . currentTimeMillis ( ) - start , events ) ;
} catch ( Exception e ) {
logger . error ( "error getting events for vre {}" , vreFolder . getTitle ( ) , e ) ;
throw new RuntimeException ( e ) ;
} finally {
if ( ses ! = null )
ses . logout ( ) ;
Calendar now = Calendar . getInstance ( ) ;
now . add ( Calendar . HOUR_OF_DAY , - 1 ) ;
if ( doTime > now . getTimeInMillis ( ) | | underRedo ) {
logger . debug ( "executing recents task for {} (cahced result)" , vreFolder . getTitle ( ) ) ;
return correctValues ( ses , cachedMap ) ;
} else {
logger . debug ( "executing recents task for {} (redoing it)" , vreFolder . getTitle ( ) ) ;
List < Item > toReturn = redo ( ses ) ;
doTime = System . currentTimeMillis ( ) ;
return toReturn ;
}
} * /
return correctValues ( ses ) ;
} catch ( Exception e ) {
} catch ( Exception e ) {
logger . error ( "error preparing recents for folder {}" , vreFolder . getTitle ( ) , e ) ;
return Collections . emptyList ( ) ;
} finally {
@ -162,16 +83,32 @@ public class VREQueryRetriever implements Callable<List<Item>> {
}
}
public synchronized List < Item > redo ( Session ses ) {
underRedo = true ;
try {
Map < String , Long > tempCachedMap = new HashMap < > ( CACHE_DIMENSION ) ;
if ( cachedMap . isEmpty ( ) )
init ( ses , tempCachedMap ) ;
else {
logger . debug ( "redoing recents for {}" , vreFolder . getTitle ( ) ) ;
update ( ses , tempCachedMap ) ;
}
cachedMap = tempCachedMap ;
return correctValues ( ses , tempCachedMap ) ;
} finally {
underRedo = false ;
}
}
private List < Item > correctValues ( Session ses ) {
private List < Item > correctValues ( Session ses , Map < String , Long > tempCachedMap ){
logger . debug ( "preparing returning values for {}" , vreFolder . getTitle ( ) ) ;
long start = System . currentTimeMillis ( ) ;
List < Map . Entry < String , Long > > list = new LinkedList < > ( cachedList . entrySet ( ) ) ;
List < Map . Entry < String , Long > > list = new LinkedList < > ( tempCachedMap . entrySet ( ) ) ;
list . sort ( ( c1 , c2 ) - > c1 . getValue ( ) . compareTo ( c2 . getValue ( ) ) * - 1 ) ;
if ( list . size ( ) > CACHE_DIMENSION )
for ( int index = CACHE_DIMENSION - 1 ; index < list . size ( ) ; index + + )
cachedList . remove ( list . get ( index ) . getKey ( ) ) ;
tempCachedMap . remove ( list . get ( index ) . getKey ( ) ) ;
List < String > cachedIds = list . stream ( ) . map ( m - > m . getKey ( ) ) . collect ( Collectors . toList ( ) ) ;
if ( cachedIds . size ( ) > 10 )
@ -193,25 +130,23 @@ public class VREQueryRetriever implements Callable<List<Item>> {
return result ;
}
private void insertItemInTheRightPlace ( Item item ) {
private void insertItemInTheRightPlace ( Item item , Map < String , Long > tempCachedMap ) {
long lastModifiedTime = item . getLastModificationTime ( ) . getTime ( ) . getTime ( ) ;
if ( ! ( lastModifiedTime > higherTimestamp & & cachedList . size ( ) > CACHE_DIMENSION ) ) {
cachedList . put ( item . getId ( ) , lastModifiedTime ) ;
if ( ! ( lastModifiedTime > higherTimestamp & & tempCachedMap . size ( ) > CACHE_DIMENSION ) ) {
tempCachedMap . put ( item . getId ( ) , lastModifiedTime ) ;
if ( lastModifiedTime > higherTimestamp ) higherTimestamp = lastModifiedTime ;
}
}
private void init ( Session ses ){
private void init ( Session ses ,Map < String , Long > tempCachedMap ){
try {
long start = System . currentTimeMillis ( ) ;
Calendar now = Calendar . getInstance ( ) ;
now . add ( Calendar . YEAR , - 1 ) ;
lastTimestamp = System . currentTimeMillis ( ) ;
Node vreFolderNode = ses . getNodeByIdentifier ( vreFolder . getId ( ) ) ;
logger . debug ( "starting visiting children for {}" , vreFolder . getTitle ( ) ) ;
visitChildren ( vreFolderNode );
visitChildren ( vreFolderNode , tempCachedMap );
logger . debug ( "initializing recents for {} took {}" , vreFolder . getTitle ( ) , System . currentTimeMillis ( ) - start ) ;
} catch ( Exception e ) {
logger . error ( "error querying vre {}" , vreFolder . getTitle ( ) , e ) ;
@ -219,16 +154,103 @@ public class VREQueryRetriever implements Callable<List<Item>> {
}
}
private void visitChildren ( Node node ) throws Exception {
private void update ( Session ses , Map < String , Long > tempCachedMap ) {
try {
long timestampToUse = lastTimestamp ;
lastTimestamp = System . currentTimeMillis ( ) ;
long start = System . currentTimeMillis ( ) ;
final String [ ] types = { "nthl:workspaceLeafItem" , "nthl:workspaceItem" } ;
EventJournal journalChanged = ses . getWorkspace ( ) . getObservationManager ( ) . getEventJournal ( Event . PROPERTY_CHANGED ^ Event . NODE_REMOVED ^ Event . NODE_MOVED ^ Event . NODE_ADDED , vreFolder . getPath ( ) , true , null , types ) ;
journalChanged . skipTo ( timestampToUse ) ;
logger . debug ( "getting the journal took {}" , System . currentTimeMillis ( ) - start ) ;
int events = 0 ;
//TODO: manage hidden nodes
while ( journalChanged . hasNext ( ) ) {
events + + ;
Event event = journalChanged . nextEvent ( ) ;
try {
switch ( event . getType ( ) ) {
case Event . NODE_ADDED :
if ( ses . nodeExists ( event . getPath ( ) ) ) {
Node nodeAdded = ses . getNode ( event . getPath ( ) ) ;
if ( nodeAdded . isNodeType ( "nthl:workspaceLeafItem" ) ) {
logger . debug ( "node added event received with name {}" , nodeAdded . getName ( ) ) ;
Item item = node2Item . getItem ( nodeAdded , Excludes . ALL ) ;
if ( tempCachedMap . get ( event . getIdentifier ( ) ) ! = null )
tempCachedMap . remove ( event . getIdentifier ( ) ) ;
insertItemInTheRightPlace ( item , tempCachedMap ) ;
}
}
break ;
case Event . PROPERTY_CHANGED :
if ( ses . propertyExists ( event . getPath ( ) ) ) {
Property property = ses . getProperty ( event . getPath ( ) ) ;
if ( property . getName ( ) . equalsIgnoreCase ( "jcr:lastModified" ) ) {
logger . debug ( "event property changed on {} with value {} and parent {}" , property . getName ( ) , property . getValue ( ) . getString ( ) , property . getParent ( ) . getPath ( ) ) ;
String identifier = property . getParent ( ) . getIdentifier ( ) ;
tempCachedMap . remove ( identifier ) ;
Item item = node2Item . getItem ( property . getParent ( ) , Excludes . ALL ) ;
insertItemInTheRightPlace ( item , tempCachedMap ) ;
}
}
break ;
case Event . NODE_REMOVED :
logger . trace ( "node removed event received with type {}" , event . getIdentifier ( ) ) ;
if ( tempCachedMap . get ( event . getIdentifier ( ) ) ! = null & &
tempCachedMap . get ( event . getIdentifier ( ) ) < event . getDate ( ) )
tempCachedMap . remove ( event . getIdentifier ( ) ) ;
break ;
case Event . NODE_MOVED :
Node nodeMoved = ses . getNode ( event . getPath ( ) ) ;
logger . trace ( "node moved event received with type {}" , nodeMoved . getPrimaryNodeType ( ) ) ;
if ( nodeMoved . isNodeType ( "nthl:workspaceLeafItem" ) ) {
logger . debug ( "event node moved on {} with path {}" , nodeMoved . getName ( ) , nodeMoved . getPath ( ) ) ;
String identifier = nodeMoved . getIdentifier ( ) ;
String nodePath = ses . getNode ( identifier ) . getPath ( ) ;
if ( tempCachedMap . get ( event . getIdentifier ( ) ) ! = null & &
! nodePath . startsWith ( vreFolder . getPath ( ) ) )
tempCachedMap . remove ( event . getIdentifier ( ) ) ;
}
break ;
default :
break ;
}
} catch ( Exception e ) {
logger . warn ( "error handling event {}" , event . getType ( ) , e ) ;
}
}
logger . trace ( "retrieving event took {} with {} events" , System . currentTimeMillis ( ) - start , events ) ;
} catch ( Exception e ) {
logger . error ( "error getting events for vre {}" , vreFolder . getTitle ( ) , e ) ;
throw new RuntimeException ( e ) ;
}
}
private void visitChildren ( Node node , Map < String , Long > tempCachedMap ) throws Exception {
NodeIterator nodeIt = node . getNodes ( ) ;
while ( nodeIt . hasNext ( ) ) {
Node child = nodeIt . nextNode ( ) ;
Item item = node2Item . getItem ( child , Excludes . ALL ) ;
if ( item = = null | | item . isHidden ( ) ) continue ;
if ( item instanceof FolderItem )
visitChildren ( child ) ;
visitChildren ( child ,tempCachedMap );
else if ( item instanceof AbstractFileItem )
insertItemInTheRightPlace ( item ) ;
insertItemInTheRightPlace ( item ,tempCachedMap );
}
}