@ -1,29 +1,33 @@
package org.gcube.data.access.storagehub.handlers.vres ;
import java.text.SimpleDateFormat ;
import java.util.ArrayList ;
import java.util.Arrays ;
import java.util.Calendar ;
import java.util.Iterator ;
import java.util.Collections ;
import java.util.HashMap ;
import java.util.LinkedList ;
import java.util.List ;
import java.util.Locale ;
import java.util. Map ;
import java.util.concurrent.Callable ;
import java.util.stream.Collectors ;
import javax.inject.Inject ;
import javax.jcr.Credentials ;
import javax.jcr.Node ;
import javax.jcr.NodeIterator ;
import javax.jcr.Property ;
import javax.jcr.Repository ;
import javax.jcr.RepositoryException ;
import javax.jcr.Session ;
import javax.jcr.observation.Event ;
import javax.jcr.observation.EventJournal ;
import javax.jcr.query.Query ;
import org.apache.jackrabbit.util.ISO9075 ;
import org.gcube.common.storagehub.model.Excludes ;
import org.gcube.common.storagehub.model.NodeConstants ;
import org.gcube.common.storagehub.model.exceptions.BackendGenericError ;
import org.gcube.common.storagehub.model.items.AbstractFileItem ;
import org.gcube.common.storagehub.model.items.FolderItem ;
import org.gcube.common.storagehub.model.items.Item ;
import org.gcube.data.access.storagehub.NodeChildrenFilterIterator ;
import org.gcube.data.access.storagehub.handlers.items.Node2ItemConverter ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
@ -37,64 +41,32 @@ public class VREQueryRetriever implements Callable<List<Item>> {
private Repository repository ;
private Credentials credentials ;
private Item vreFolder ;
List < Item > cachedList = new ArrayList < > ( CACHE_DIMENSION ) ;
Map < String , Long > cachedList = new HashMap < > ( CACHE_DIMENSION ) ;
long higherTimestamp = Long . MAX_VALUE ;
long lastTimestamp = 0 ;
private Node2ItemConverter node2Item = new Node2ItemConverter ( ) ;
Node2ItemConverter node2Item ;
public VREQueryRetriever ( Repository repository , Credentials credentials , Item vreFolder ) {
public VREQueryRetriever ( Repository repository , Credentials credentials , Node2ItemConverter node2Item , Item vreFolder ) {
super ( ) ;
this . repository = repository ;
this . credentials = credentials ;
this . vreFolder = vreFolder ;
this . node2Item = node2Item ;
}
public List < Item > call ( ) {
logger . trace( "executing recents task" ) ;
logger . debug( "executing recents task for {}" , vreFolder . getTitle ( ) ) ;
Session ses = null ;
if ( lastTimestamp = = 0 ) {
try {
long start = System . currentTimeMillis ( ) ;
ses = repository . login ( credentials ) ;
Calendar now = Calendar . getInstance ( ) ;
now . add ( Calendar . YEAR , - 1 ) ;
SimpleDateFormat formatter = new SimpleDateFormat ( "yyyy-MM-dd'T'HH:mm:ss.SSS" , Locale . ENGLISH ) ;
String formattedDate = formatter . format ( now . getTime ( ) ) ;
String xpath = String . format ( "/jcr:root%s//element(*,nthl:workspaceLeafItem)[@jcr:lastModified>xs:dateTime('%s')] order by @jcr:lastModified descending" , ISO9075 . encodePath ( vreFolder . getPath ( ) ) , formattedDate ) ;
//String query = String.format("SELECT * FROM [nthl:workspaceLeafItem] AS node WHERE ISDESCENDANTNODE('%s') ORDER BY node.[jcr:lastModified] DESC ",vreFolder.getPath());
logger . debug ( "query for recents is {}" , xpath ) ;
Query jcrQuery = ses . getWorkspace ( ) . getQueryManager ( ) . createQuery ( xpath , Query . XPATH ) ;
jcrQuery . setLimit ( CACHE_DIMENSION ) ;
lastTimestamp = System . currentTimeMillis ( ) ;
NodeChildrenFilterIterator it = new NodeChildrenFilterIterator ( jcrQuery . execute ( ) . getNodes ( ) ) ;
logger . debug ( "query for recents took {}" , System . currentTimeMillis ( ) - start ) ;
while ( it . hasNext ( ) ) {
Node node = it . next ( ) ;
Item item = node2Item . getItem ( node , Excludes . EXCLUDE_ACCOUNTING ) ;
logger . debug ( "RECENTS - adding item {} with timestamp {}" , item . getTitle ( ) , item . getLastModificationTime ( ) . getTimeInMillis ( ) ) ;
cachedList . add ( item ) ;
}
logger . debug ( "creating objects took {}" , System . currentTimeMillis ( ) - start ) ;
if ( cachedList . size ( ) < = 10 ) return cachedList ;
else return cachedList . subList ( 0 , 10 ) ;
} catch ( Exception e ) {
logger . error ( "error querying vre {}" , vreFolder . getTitle ( ) , e ) ;
throw new RuntimeException ( e ) ;
} finally {
if ( ses ! = null )
ses . logout ( ) ;
logger . trace ( "recents task finished" ) ;
}
} else {
try {
ses = repository . login ( credentials ) ;
//if (cachedList.isEmpty()) {
init ( ses ) ;
/ * } else {
logger . debug ( "redoing recents for {}" , vreFolder . getTitle ( ) ) ;
try {
long timestampToUse = lastTimestamp ;
@ -107,7 +79,7 @@ public class VREQueryRetriever implements Callable<List<Item>> {
EventJournal journalChanged = ses . getWorkspace ( ) . getObservationManager ( ) . getEventJournal ( Event . PROPERTY_CHANGED ^ Event . NODE_REMOVED ^ Event . NODE_MOVED ^ Event . NODE_ADDED , vreFolder . getPath ( ) , true , null , types ) ;
journalChanged . skipTo ( timestampToUse ) ;
logger . trace ( "getting the journal took {}" , System . currentTimeMillis ( ) - start ) ;
logger . debug ( "getting the journal took {}" , System . currentTimeMillis ( ) - start ) ;
int events = 0 ;
@ -122,6 +94,8 @@ public class VREQueryRetriever implements Callable<List<Item>> {
if ( nodeAdded . isNodeType ( "nthl:workspaceLeafItem" ) ) {
logger . trace ( "node added event received with name {}" , nodeAdded . getName ( ) ) ;
Item item = node2Item . getItem ( nodeAdded , Arrays . asList ( NodeConstants . ACCOUNTING_NAME ) ) ;
if ( cachedList . get ( event . getIdentifier ( ) ) ! = null )
cachedList . remove ( event . getIdentifier ( ) ) ;
insertItemInTheRightPlace ( item ) ;
}
}
@ -133,7 +107,7 @@ public class VREQueryRetriever implements Callable<List<Item>> {
if ( property . getName ( ) . equalsIgnoreCase ( "jcr:lastModified" ) ) {
logger . trace ( "event property changed on {} with value {} and parent {}" , property . getName ( ) , property . getValue ( ) . getString ( ) , property . getParent ( ) . getPath ( ) ) ;
String identifier = property . getParent ( ) . getIdentifier ( ) ;
cachedList . remove If( i - > i . getId ( ) . equals ( identifier ) ) ;
cachedList . remove ( identifier ) ;
Item item = node2Item . getItem ( property . getParent ( ) , Excludes . EXCLUDE_ACCOUNTING ) ;
insertItemInTheRightPlace ( item ) ;
}
@ -141,14 +115,10 @@ public class VREQueryRetriever implements Callable<List<Item>> {
break ;
case Event . NODE_REMOVED :
logger . trace ( "node removed event received with type {}" , event . getIdentifier ( ) ) ;
cachedList . removeIf ( i - > {
try {
return i . getId ( ) . equals ( event . getIdentifier ( ) ) & & i . getLastModificationTime ( ) . getTime ( ) . getTime ( ) < event . getDate ( ) ;
} catch ( RepositoryException e ) {
return false ;
}
} ) ;
if ( cachedList . get ( event . getIdentifier ( ) ) ! = null & &
cachedList . get ( event . getIdentifier ( ) ) < event . getDate ( ) )
cachedList . remove ( event . getIdentifier ( ) ) ;
break ;
case Event . NODE_MOVED :
Node nodeMoved = ses . getNode ( event . getPath ( ) ) ;
@ -156,7 +126,11 @@ public class VREQueryRetriever implements Callable<List<Item>> {
if ( nodeMoved . isNodeType ( "nthl:workspaceLeafItem" ) ) {
logger . trace ( "event node moved on {} with path {}" , nodeMoved . getName ( ) , nodeMoved . getPath ( ) ) ;
String identifier = nodeMoved . getIdentifier ( ) ;
cachedList . removeIf ( i - > i . getId ( ) . equals ( identifier ) & & ! i . getPath ( ) . startsWith ( vreFolder . getPath ( ) ) ) ;
String nodePath = ses . getNode ( identifier ) . getPath ( ) ;
if ( cachedList . get ( event . getIdentifier ( ) ) ! = null & &
! nodePath . startsWith ( vreFolder . getPath ( ) ) )
cachedList . remove ( event . getIdentifier ( ) ) ;
}
break ;
default :
@ -165,11 +139,9 @@ public class VREQueryRetriever implements Callable<List<Item>> {
}
if ( cachedList . size ( ) > CACHE_DIMENSION )
cachedList . subList ( 51 , cachedList . size ( ) ) . clear ( ) ;
logger . trace ( "retrieving event took {} with {} events" , System . currentTimeMillis ( ) - start , events ) ;
if ( cachedList . size ( ) < = 10 ) return cachedList ;
else return cachedList . subList ( 0 , 10 ) ;
} catch ( Exception e ) {
logger . error ( "error getting events for vre {}" , vreFolder . getTitle ( ) , e ) ;
throw new RuntimeException ( e ) ;
@ -177,20 +149,87 @@ public class VREQueryRetriever implements Callable<List<Item>> {
if ( ses ! = null )
ses . logout ( ) ;
}
} * /
return correctValues ( ses ) ;
} catch ( Exception e ) {
logger . error ( "error preparing recents for folder {}" , vreFolder . getTitle ( ) , e ) ;
return Collections . emptyList ( ) ;
} finally {
if ( ses ! = null )
ses . logout ( ) ;
logger . debug ( "recents task finished" ) ;
}
}
private List < Item > correctValues ( Session ses ) {
logger . debug ( "preparing returning values for {}" , vreFolder . getTitle ( ) ) ;
long start = System . currentTimeMillis ( ) ;
List < Map . Entry < String , Long > > list = new LinkedList < > ( cachedList . entrySet ( ) ) ;
list . sort ( ( c1 , c2 ) - > c1 . getValue ( ) . compareTo ( c2 . getValue ( ) ) * - 1 ) ;
if ( list . size ( ) > CACHE_DIMENSION )
for ( int index = CACHE_DIMENSION - 1 ; index < list . size ( ) ; index + + )
cachedList . remove ( list . get ( index ) . getKey ( ) ) ;
List < String > cachedIds = list . stream ( ) . map ( m - > m . getKey ( ) ) . collect ( Collectors . toList ( ) ) ;
if ( cachedIds . size ( ) > 10 )
cachedIds = cachedIds . subList ( 0 , 10 ) ;
List < Item > result = new ArrayList < > ( 10 ) ;
for ( String id : cachedIds ) {
try {
Item item = node2Item . getItem ( id , ses , Excludes . EXCLUDE_ACCOUNTING ) ;
if ( item ! = null )
result . add ( item ) ;
else logger . warn ( "item with id {} is null" , id ) ;
} catch ( BackendGenericError | RepositoryException e ) { }
}
logger . debug ( "returning values prepared in {} for {}" , System . currentTimeMillis ( ) - start , vreFolder . getTitle ( ) ) ;
return result ;
}
private void insertItemInTheRightPlace ( Item item ) {
Iterator < Item > it = cachedList . iterator ( ) ;
int index = 0 ;
while ( it . hasNext ( ) ) {
Item inListItem = it . next ( ) ;
if ( item . getLastModificationTime ( ) . getTime ( ) . getTime ( ) > = inListItem . getLastModificationTime ( ) . getTime ( ) . getTime ( ) ) break ;
index + + ;
long lastModifiedTime = item . getLastModificationTime ( ) . getTime ( ) . getTime ( ) ;
if ( ! ( lastModifiedTime > higherTimestamp & & cachedList . size ( ) > CACHE_DIMENSION ) ) {
cachedList . put ( item . getId ( ) , lastModifiedTime ) ;
if ( lastModifiedTime > higherTimestamp ) higherTimestamp = lastModifiedTime ;
}
}
private void init ( Session ses ) {
try {
long start = System . currentTimeMillis ( ) ;
Calendar now = Calendar . getInstance ( ) ;
now . add ( Calendar . YEAR , - 1 ) ;
lastTimestamp = System . currentTimeMillis ( ) ;
Node vreFolderNode = ses . getNodeByIdentifier ( vreFolder . getId ( ) ) ;
logger . debug ( "starting visiting children for {}" , vreFolder . getTitle ( ) ) ;
visitChildren ( vreFolderNode ) ;
logger . debug ( "initializing recents for {} took {}" , vreFolder . getTitle ( ) , System . currentTimeMillis ( ) - start ) ;
} catch ( Exception e ) {
logger . error ( "error querying vre {}" , vreFolder . getTitle ( ) , e ) ;
throw new RuntimeException ( e ) ;
}
}
private void visitChildren ( Node node ) throws Exception {
NodeIterator nodeIt = node . getNodes ( ) ;
while ( nodeIt . hasNext ( ) ) {
Node child = nodeIt . nextNode ( ) ;
Item item = node2Item . getItem ( child , Excludes . ALL ) ;
if ( item = = null | | item . isHidden ( ) ) continue ;
if ( item instanceof FolderItem )
visitChildren ( child ) ;
else if ( item instanceof AbstractFileItem )
insertItemInTheRightPlace ( item ) ;
}
if ( index < CACHE_DIMENSION )
cachedList . add ( index , item ) ;
}
/ * @Override