From 8eb1cd3f9da2fdb914e4178851885342ebd62ffb Mon Sep 17 00:00:00 2001 From: Fabio Sinibaldi Date: Wed, 11 May 2022 18:45:56 +0200 Subject: [PATCH] UCD cache locking --- .../service/engine/mongo/UCDMongoManager.java | 25 ++- .../engine/providers/ucd/UCDManager.java | 165 ++++++++++++------ .../geoportal/service/UCDTests.java | 2 +- 3 files changed, 123 insertions(+), 69 deletions(-) diff --git a/geoportal-service/src/main/java/org/gcube/application/geoportal/service/engine/mongo/UCDMongoManager.java b/geoportal-service/src/main/java/org/gcube/application/geoportal/service/engine/mongo/UCDMongoManager.java index 40c1d5f..68c2a19 100644 --- a/geoportal-service/src/main/java/org/gcube/application/geoportal/service/engine/mongo/UCDMongoManager.java +++ b/geoportal-service/src/main/java/org/gcube/application/geoportal/service/engine/mongo/UCDMongoManager.java @@ -55,8 +55,7 @@ public class UCDMongoManager extends MongoManager implements UCDManagerI{ @Override public UseCaseDescriptor put(UseCaseDescriptor desc) throws RegistrationException, ConfigurationException { - log.debug("PUT UCD ID {} MONGO ID ",desc.getId(),desc.getMongoId()); - FindOneAndReplaceOptions opts = new FindOneAndReplaceOptions(); + log.debug("PUT UCD ID {} MONGO ID {}",desc.getId(),desc.getMongoId()); Document filter = new Document(UseCaseDescriptor.ID,desc.getId()); if(desc.getMongoId()!=null) // MONGO ID SHOULD MATCH IF PROVIDED @@ -64,20 +63,20 @@ public class UCDMongoManager extends MongoManager implements UCDManagerI{ try { UseCaseDescriptor toReturn = Serialization.convert(getCollection().findOneAndReplace( filter, Serialization.asDocument(desc), - new FindOneAndReplaceOptions().returnDocument(ReturnDocument.BEFORE)), UseCaseDescriptor.class); - log.trace("Matching is {} ", toReturn); - - // NOT FOUND - if (toReturn == null) { - if (desc.getMongoId() != null) { - // illegal update check - if (getById(desc.getId()) != null) - throw new RegistrationException("Illegal attempt to write to " + desc.getId() + " with unmatching mongo ID "); + new FindOneAndReplaceOptions().returnDocument(ReturnDocument.AFTER)), UseCaseDescriptor.class); + if(toReturn == null) { + if (desc.getMongoId() != null) + throw new RegistrationException("Illegal attempt to write to " + desc.getId() + " with unmatching mongo ID "); + else { + log.debug("Unable to update UCD {}. Inserting it..",desc.getId()); + toReturn = insert(desc); } - toReturn = insert(desc); } + + log.info("Updated UCD in DB cache. ID {}, MONGO ID {}",toReturn.getId(),toReturn.getMongoId()); + log.trace("Updated UCD is {} ", toReturn); return toReturn; - }catch (RegistrationException | ConfigurationException e){ + }catch (RegistrationException e){ throw e; }catch(Throwable e){ log.error("Unable to update ",e); diff --git a/geoportal-service/src/main/java/org/gcube/application/geoportal/service/engine/providers/ucd/UCDManager.java b/geoportal-service/src/main/java/org/gcube/application/geoportal/service/engine/providers/ucd/UCDManager.java index 55aca91..3cf988e 100644 --- a/geoportal-service/src/main/java/org/gcube/application/geoportal/service/engine/providers/ucd/UCDManager.java +++ b/geoportal-service/src/main/java/org/gcube/application/geoportal/service/engine/providers/ucd/UCDManager.java @@ -18,10 +18,13 @@ import javax.ws.rs.core.Response; import java.time.Duration; import java.time.temporal.ChronoUnit; import java.time.temporal.TemporalAmount; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.*; import static java.lang.Thread.sleep; @@ -38,6 +41,9 @@ import static java.lang.Thread.sleep; @Slf4j public class UCDManager extends AbstractScopedMap implements UCDManagerI { + private Map memCache = new HashMap<>(); + + private final ReadWriteLock cacheLock = new ReentrantReadWriteLock(); public UCDManager() { @@ -48,75 +54,124 @@ public class UCDManager extends AbstractScopedMap implements UCDMan @Override public Iterable query(QueryRequest queryRequest) throws ConfigurationException { - return getMongoManager().query(queryRequest); + cacheLock.readLock().lock(); + try{ + return getMongoManager().query(queryRequest); + }finally { + cacheLock.readLock().unlock(); + } } @Override public void deleteById(String id, boolean force) throws RegistrationException, ConfigurationException { - log.warn("Trying to delete {} [force : {}]",id,force); // NB Check for existing ID - UseCaseDescriptor found = getById(id); - if(found!=null) { - // TODO validate DELETE REQUEST - // TODO STORE UCD - // forceUpdateCache(); - throw new WebApplicationException("TO IMPLEMENT ", Response.Status.INTERNAL_SERVER_ERROR); - } else - throw new WebApplicationException("No Matching UCD with ID "+id, Response.Status.NOT_FOUND); + UseCaseDescriptor found = getByUCID(id,true); + cacheLock.writeLock().lock(); + try{ + if(found!=null) { + // TODO validate DELETE REQUEST + // TODO DELETE UCD + // forceUpdateCache(); + throw new WebApplicationException("TO IMPLEMENT ", Response.Status.INTERNAL_SERVER_ERROR); + } else + throw new WebApplicationException("No Matching UCD with ID "+id, Response.Status.NOT_FOUND); + }finally{ + cacheLock.writeLock().unlock(); + } } @Override public UseCaseDescriptor put(UseCaseDescriptor desc) throws ConfigurationException, RegistrationException { - log.debug("Update {} ",desc.getId()); + log.info("Update UCD {} ",desc.getId()); + log.debug("Mongo id is {} ",desc.getMongoId()); // NB Check for existing ID - UseCaseDescriptor found = getById(desc.getId()); - if(found!=null) { - // TODO validate UPDATE - // TODO STORE UCD - // forceUpdateCache(); - throw new WebApplicationException("Update Feature is yet TO IMPLEMENT ", Response.Status.INTERNAL_SERVER_ERROR); - } else { - // create new - registerNew(desc); - do{ - log.info("Waiting for backend to update.. "); - try{sleep(1000);}catch (Throwable t){} + UseCaseDescriptor found = getByUCID(desc.getId(),true); + if(found!=null) { + // TODO validate UPDATE + // TODO STORE UCD + // forceUpdateCache(); + throw new WebApplicationException("Update Feature is yet TO IMPLEMENT ", Response.Status.INTERNAL_SERVER_ERROR); + } else { + // create new + registerNew(desc); forceUpdateCache(); - found =getById(desc.getId()); - } while(found == null); + int attempt=0; + do{ + found =getByUCID(desc.getId(),true); + log.info("Waiting for backend to update.. "); + try{sleep(4000);}catch (Throwable t){} + attempt++; + } while(found == null && attempt<=4); - return found; - } + return found; + } + } + + @Override + protected UCDManagerI retrieveObject(String context) throws ConfigurationException { + // Called when TTL ends + forceUpdateCache(); + return this; } @Override public UseCaseDescriptor getById(String id) throws ConfigurationException, RegistrationException { // GET from mongo cache - UCDMongoManager mongo=getMongoManager(); - UseCaseDescriptor toReturn=mongo.getById(id); - log.debug("UCD ID : {} from mongo is {} ",id,toReturn); - if(toReturn == null) { - // IF void try from ProfileEngine - toReturn =getLiveMap().get(id); - if(toReturn != null ){ - log.debug("Force update of live map {} from live map ",id); - toReturn = mongo.put(toReturn); +// UCDMongoManager mongo=getMongoManager(); +// UseCaseDescriptor toReturn=mongo.getById(id); +// log.debug("UCD ID : {} from mongo is {} ",id,toReturn); +// if(toReturn == null) { +// // IF void try from ProfileEngine +// toReturn =getLiveMap().get(id); +// if(toReturn != null ){ +// log.debug("Force update of live map {} from live map ",id); +// toReturn = mongo.put(toReturn); +// } +// } +// return toReturn; + + return getByUCID(id,true); + } + + + private UseCaseDescriptor getByUCID(String ucid,boolean refreshOnMissing) throws ConfigurationException { + // Wait / check for cache loaded + log.debug("Trying to getById from memcache {} [refresh on missing : {}]",ucid,refreshOnMissing); + cacheLock.readLock().lock(); + boolean releaseLock = true; + UseCaseDescriptor toReturn =null; + try{ + if(memCache.containsKey(ucid)) toReturn = memCache.get(ucid); + else { + if(refreshOnMissing){ + cacheLock.readLock().unlock(); + releaseLock=false; + forceUpdateCache(); + // pass false in order to trigger cache update only on first NOT FOUND event + toReturn = getByUCID(ucid,false); + } } + }finally { + if(releaseLock) cacheLock.readLock().unlock(); } return toReturn; } - private ProfileMap getLiveMap() throws ConfigurationException { - return ImplementationProvider.get().getProvidedObjectByClass(ProfileMap.class); - }; + + private void registerNew(UseCaseDescriptor ucd) throws ConfigurationException { - Engine engine=ImplementationProvider.get().getEngineByManagedClass(ProfileMap.class); + cacheLock.writeLock().lock(); + try{ + Engine engine=ImplementationProvider.get().getEngineByManagedClass(ProfileMap.class); if(engine instanceof ObjectManager){ ((ObjectManager)engine).insert(ucd); } else throw new ConfigurationException("Profile Map Engine is not Object Manager. Actual implementation is "+engine.getClass()); + }finally { + cacheLock.writeLock().unlock(); + } } private UCDMongoManager getMongoManager() throws ConfigurationException { @@ -124,33 +179,33 @@ public class UCDManager extends AbstractScopedMap implements UCDMan } - @Override - protected UCDManagerI retrieveObject(String context) throws ConfigurationException { - forceUpdateCache(); - return this; - } - - - ConcurrentHashMap cleanedCaches= new ConcurrentHashMap<>(); - - private void forceUpdateCache() throws ConfigurationException { log.info("UPDATING PROFILE CACHE.."); + cacheLock.writeLock().lock(); + final UCDMongoManager manager = getMongoManager(); manager.deleteAll(); - final AtomicLong counter= new AtomicLong(0l); - ProfileMap liveMap=getLiveMap(); + memCache.clear(); + + ProfileMap liveMap=ImplementationProvider.get().getProvidedObjectByClass(ProfileMap.class); log.debug("LiveMap size is {} ",liveMap.size()); for (Map.Entry entry : liveMap.entrySet()) { UseCaseDescriptor useCaseDescriptor = entry.getValue(); try { - log.debug("Updateing cache with {} ", useCaseDescriptor.getId()); + log.debug("Updateing cache with {}, mongo id is {}", useCaseDescriptor.getId(),useCaseDescriptor.getMongoId()); + if(useCaseDescriptor.getMongoId()!=null){ + log.warn("Retrieved UCD {} from Live Map has a Mongo id [{}]. Removing it..",useCaseDescriptor.getId(),useCaseDescriptor.getMongoId()); + useCaseDescriptor.setMongoId(null); + } // insert/update into DB - manager.put(useCaseDescriptor); + manager.insert(useCaseDescriptor); + memCache.put(useCaseDescriptor.getId(), useCaseDescriptor); } catch (RegistrationException e) { log.warn("Unable to cache UCD {}",entry.getKey(),e); } } - log.info("Cached : {} UCDs in {} ",counter.get(),ContextUtils.getCurrentScope()); + log.info("Cached {} UCDs in {} ",memCache.size(),ContextUtils.getCurrentScope()); + + cacheLock.writeLock().unlock(); } } diff --git a/geoportal-service/src/test/java/org/gcube/application/geoportal/service/UCDTests.java b/geoportal-service/src/test/java/org/gcube/application/geoportal/service/UCDTests.java index aa63574..8bc9d12 100644 --- a/geoportal-service/src/test/java/org/gcube/application/geoportal/service/UCDTests.java +++ b/geoportal-service/src/test/java/org/gcube/application/geoportal/service/UCDTests.java @@ -66,7 +66,7 @@ public class UCDTests extends BasicServiceTestUnit{ UseCaseDescriptor randomUCD=TestProfiles.profiles.get("basic"); randomUCD.setId(UUID.randomUUID().toString()); randomUCD.setName("Test UCD"); - + randomUCD.setMongoId(null); UseCaseDescriptor ucd =check(baseTarget().request(MediaType.APPLICATION_JSON). post(Entity.entity(Serialization.write(randomUCD),