UCD cache locking

This commit is contained in:
Fabio Sinibaldi 2022-05-11 18:45:56 +02:00
parent 2989a493b9
commit 8eb1cd3f9d
3 changed files with 123 additions and 69 deletions

View File

@ -55,8 +55,7 @@ public class UCDMongoManager extends MongoManager implements UCDManagerI{
@Override @Override
public UseCaseDescriptor put(UseCaseDescriptor desc) throws RegistrationException, ConfigurationException { public UseCaseDescriptor put(UseCaseDescriptor desc) throws RegistrationException, ConfigurationException {
log.debug("PUT UCD ID {} MONGO ID ",desc.getId(),desc.getMongoId()); log.debug("PUT UCD ID {} MONGO ID {}",desc.getId(),desc.getMongoId());
FindOneAndReplaceOptions opts = new FindOneAndReplaceOptions();
Document filter = new Document(UseCaseDescriptor.ID,desc.getId()); Document filter = new Document(UseCaseDescriptor.ID,desc.getId());
if(desc.getMongoId()!=null) if(desc.getMongoId()!=null)
// MONGO ID SHOULD MATCH IF PROVIDED // MONGO ID SHOULD MATCH IF PROVIDED
@ -64,20 +63,20 @@ public class UCDMongoManager extends MongoManager implements UCDManagerI{
try { try {
UseCaseDescriptor toReturn = Serialization.convert(getCollection().findOneAndReplace( UseCaseDescriptor toReturn = Serialization.convert(getCollection().findOneAndReplace(
filter, Serialization.asDocument(desc), filter, Serialization.asDocument(desc),
new FindOneAndReplaceOptions().returnDocument(ReturnDocument.BEFORE)), UseCaseDescriptor.class); new FindOneAndReplaceOptions().returnDocument(ReturnDocument.AFTER)), UseCaseDescriptor.class);
log.trace("Matching is {} ", toReturn);
// NOT FOUND
if(toReturn == null) { if(toReturn == null) {
if (desc.getMongoId() != null) { if (desc.getMongoId() != null)
// illegal update check
if (getById(desc.getId()) != null)
throw new RegistrationException("Illegal attempt to write to " + desc.getId() + " with unmatching mongo ID "); throw new RegistrationException("Illegal attempt to write to " + desc.getId() + " with unmatching mongo ID ");
} else {
log.debug("Unable to update UCD {}. Inserting it..",desc.getId());
toReturn = insert(desc); toReturn = insert(desc);
} }
}
log.info("Updated UCD in DB cache. ID {}, MONGO ID {}",toReturn.getId(),toReturn.getMongoId());
log.trace("Updated UCD is {} ", toReturn);
return toReturn; return toReturn;
}catch (RegistrationException | ConfigurationException e){ }catch (RegistrationException e){
throw e; throw e;
}catch(Throwable e){ }catch(Throwable e){
log.error("Unable to update ",e); log.error("Unable to update ",e);

View File

@ -18,10 +18,13 @@ import javax.ws.rs.core.Response;
import java.time.Duration; import java.time.Duration;
import java.time.temporal.ChronoUnit; import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalAmount; import java.time.temporal.TemporalAmount;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.*;
import static java.lang.Thread.sleep; import static java.lang.Thread.sleep;
@ -38,6 +41,9 @@ import static java.lang.Thread.sleep;
@Slf4j @Slf4j
public class UCDManager extends AbstractScopedMap<UCDManagerI> implements UCDManagerI { public class UCDManager extends AbstractScopedMap<UCDManagerI> implements UCDManagerI {
private Map<String,UseCaseDescriptor> memCache = new HashMap<>();
private final ReadWriteLock cacheLock = new ReentrantReadWriteLock();
public UCDManager() { public UCDManager() {
@ -48,28 +54,38 @@ public class UCDManager extends AbstractScopedMap<UCDManagerI> implements UCDMan
@Override @Override
public Iterable<UseCaseDescriptor> query(QueryRequest queryRequest) throws ConfigurationException { public Iterable<UseCaseDescriptor> query(QueryRequest queryRequest) throws ConfigurationException {
cacheLock.readLock().lock();
try{
return getMongoManager().query(queryRequest); return getMongoManager().query(queryRequest);
}finally {
cacheLock.readLock().unlock();
}
} }
@Override @Override
public void deleteById(String id, boolean force) throws RegistrationException, ConfigurationException { public void deleteById(String id, boolean force) throws RegistrationException, ConfigurationException {
log.warn("Trying to delete {} [force : {}]",id,force);
// NB Check for existing ID // NB Check for existing ID
UseCaseDescriptor found = getById(id); UseCaseDescriptor found = getByUCID(id,true);
cacheLock.writeLock().lock();
try{
if(found!=null) { if(found!=null) {
// TODO validate DELETE REQUEST // TODO validate DELETE REQUEST
// TODO STORE UCD // TODO DELETE UCD
// forceUpdateCache(); // forceUpdateCache();
throw new WebApplicationException("TO IMPLEMENT ", Response.Status.INTERNAL_SERVER_ERROR); throw new WebApplicationException("TO IMPLEMENT ", Response.Status.INTERNAL_SERVER_ERROR);
} else } else
throw new WebApplicationException("No Matching UCD with ID "+id, Response.Status.NOT_FOUND); throw new WebApplicationException("No Matching UCD with ID "+id, Response.Status.NOT_FOUND);
}finally{
cacheLock.writeLock().unlock();
}
} }
@Override @Override
public UseCaseDescriptor put(UseCaseDescriptor desc) throws ConfigurationException, RegistrationException { public UseCaseDescriptor put(UseCaseDescriptor desc) throws ConfigurationException, RegistrationException {
log.debug("Update {} ",desc.getId()); log.info("Update UCD {} ",desc.getId());
log.debug("Mongo id is {} ",desc.getMongoId());
// NB Check for existing ID // NB Check for existing ID
UseCaseDescriptor found = getById(desc.getId()); UseCaseDescriptor found = getByUCID(desc.getId(),true);
if(found!=null) { if(found!=null) {
// TODO validate UPDATE // TODO validate UPDATE
// TODO STORE UCD // TODO STORE UCD
@ -78,45 +94,84 @@ public class UCDManager extends AbstractScopedMap<UCDManagerI> implements UCDMan
} else { } else {
// create new // create new
registerNew(desc); registerNew(desc);
do{
log.info("Waiting for backend to update.. ");
try{sleep(1000);}catch (Throwable t){}
forceUpdateCache(); forceUpdateCache();
found =getById(desc.getId()); int attempt=0;
} while(found == null); do{
found =getByUCID(desc.getId(),true);
log.info("Waiting for backend to update.. ");
try{sleep(4000);}catch (Throwable t){}
attempt++;
} while(found == null && attempt<=4);
return found; return found;
} }
} }
@Override
protected UCDManagerI retrieveObject(String context) throws ConfigurationException {
// Called when TTL ends
forceUpdateCache();
return this;
}
@Override @Override
public UseCaseDescriptor getById(String id) throws ConfigurationException, RegistrationException { public UseCaseDescriptor getById(String id) throws ConfigurationException, RegistrationException {
// GET from mongo cache // GET from mongo cache
UCDMongoManager mongo=getMongoManager(); // UCDMongoManager mongo=getMongoManager();
UseCaseDescriptor toReturn=mongo.getById(id); // UseCaseDescriptor toReturn=mongo.getById(id);
log.debug("UCD ID : {} from mongo is {} ",id,toReturn); // log.debug("UCD ID : {} from mongo is {} ",id,toReturn);
if(toReturn == null) { // if(toReturn == null) {
// IF void try from ProfileEngine // // IF void try from ProfileEngine
toReturn =getLiveMap().get(id); // toReturn =getLiveMap().get(id);
if(toReturn != null ){ // if(toReturn != null ){
log.debug("Force update of live map {} from live map ",id); // log.debug("Force update of live map {} from live map ",id);
toReturn = mongo.put(toReturn); // toReturn = mongo.put(toReturn);
// }
// }
// return toReturn;
return getByUCID(id,true);
} }
private UseCaseDescriptor getByUCID(String ucid,boolean refreshOnMissing) throws ConfigurationException {
// Wait / check for cache loaded
log.debug("Trying to getById from memcache {} [refresh on missing : {}]",ucid,refreshOnMissing);
cacheLock.readLock().lock();
boolean releaseLock = true;
UseCaseDescriptor toReturn =null;
try{
if(memCache.containsKey(ucid)) toReturn = memCache.get(ucid);
else {
if(refreshOnMissing){
cacheLock.readLock().unlock();
releaseLock=false;
forceUpdateCache();
// pass false in order to trigger cache update only on first NOT FOUND event
toReturn = getByUCID(ucid,false);
}
}
}finally {
if(releaseLock) cacheLock.readLock().unlock();
} }
return toReturn; return toReturn;
} }
private ProfileMap getLiveMap() throws ConfigurationException {
return ImplementationProvider.get().getProvidedObjectByClass(ProfileMap.class);
};
private void registerNew(UseCaseDescriptor ucd) throws ConfigurationException { private void registerNew(UseCaseDescriptor ucd) throws ConfigurationException {
cacheLock.writeLock().lock();
try{
Engine<ProfileMap> engine=ImplementationProvider.get().getEngineByManagedClass(ProfileMap.class); Engine<ProfileMap> engine=ImplementationProvider.get().getEngineByManagedClass(ProfileMap.class);
if(engine instanceof ObjectManager){ if(engine instanceof ObjectManager){
((ObjectManager<UseCaseDescriptor>)engine).insert(ucd); ((ObjectManager<UseCaseDescriptor>)engine).insert(ucd);
} else throw new ConfigurationException("Profile Map Engine is not Object Manager. Actual implementation is "+engine.getClass()); } else throw new ConfigurationException("Profile Map Engine is not Object Manager. Actual implementation is "+engine.getClass());
}finally {
cacheLock.writeLock().unlock();
}
} }
private UCDMongoManager getMongoManager() throws ConfigurationException { private UCDMongoManager getMongoManager() throws ConfigurationException {
@ -124,33 +179,33 @@ public class UCDManager extends AbstractScopedMap<UCDManagerI> implements UCDMan
} }
@Override
protected UCDManagerI retrieveObject(String context) throws ConfigurationException {
forceUpdateCache();
return this;
}
ConcurrentHashMap<String,Boolean> cleanedCaches= new ConcurrentHashMap<>();
private void forceUpdateCache() throws ConfigurationException { private void forceUpdateCache() throws ConfigurationException {
log.info("UPDATING PROFILE CACHE.."); log.info("UPDATING PROFILE CACHE..");
cacheLock.writeLock().lock();
final UCDMongoManager manager = getMongoManager(); final UCDMongoManager manager = getMongoManager();
manager.deleteAll(); manager.deleteAll();
final AtomicLong counter= new AtomicLong(0l); memCache.clear();
ProfileMap liveMap=getLiveMap();
ProfileMap liveMap=ImplementationProvider.get().getProvidedObjectByClass(ProfileMap.class);
log.debug("LiveMap size is {} ",liveMap.size()); log.debug("LiveMap size is {} ",liveMap.size());
for (Map.Entry<String, UseCaseDescriptor> entry : liveMap.entrySet()) { for (Map.Entry<String, UseCaseDescriptor> entry : liveMap.entrySet()) {
UseCaseDescriptor useCaseDescriptor = entry.getValue(); UseCaseDescriptor useCaseDescriptor = entry.getValue();
try { try {
log.debug("Updateing cache with {} ", useCaseDescriptor.getId()); log.debug("Updateing cache with {}, mongo id is {}", useCaseDescriptor.getId(),useCaseDescriptor.getMongoId());
if(useCaseDescriptor.getMongoId()!=null){
log.warn("Retrieved UCD {} from Live Map has a Mongo id [{}]. Removing it..",useCaseDescriptor.getId(),useCaseDescriptor.getMongoId());
useCaseDescriptor.setMongoId(null);
}
// insert/update into DB // insert/update into DB
manager.put(useCaseDescriptor); manager.insert(useCaseDescriptor);
memCache.put(useCaseDescriptor.getId(), useCaseDescriptor);
} catch (RegistrationException e) { } catch (RegistrationException e) {
log.warn("Unable to cache UCD {}",entry.getKey(),e); log.warn("Unable to cache UCD {}",entry.getKey(),e);
} }
} }
log.info("Cached : {} UCDs in {} ",counter.get(),ContextUtils.getCurrentScope()); log.info("Cached {} UCDs in {} ",memCache.size(),ContextUtils.getCurrentScope());
cacheLock.writeLock().unlock();
} }
} }

View File

@ -66,7 +66,7 @@ public class UCDTests extends BasicServiceTestUnit{
UseCaseDescriptor randomUCD=TestProfiles.profiles.get("basic"); UseCaseDescriptor randomUCD=TestProfiles.profiles.get("basic");
randomUCD.setId(UUID.randomUUID().toString()); randomUCD.setId(UUID.randomUUID().toString());
randomUCD.setName("Test UCD"); randomUCD.setName("Test UCD");
randomUCD.setMongoId(null);
UseCaseDescriptor ucd =check(baseTarget().request(MediaType.APPLICATION_JSON). UseCaseDescriptor ucd =check(baseTarget().request(MediaType.APPLICATION_JSON).
post(Entity.entity(Serialization.write(randomUCD), post(Entity.entity(Serialization.write(randomUCD),