/** * */ package org.gcube.vremanagement.executor.persistence.orientdb; import java.util.ArrayList; import java.util.Calendar; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import org.gcube.vremanagement.executor.ContextUtility; import org.gcube.vremanagement.executor.api.types.LaunchParameter; import org.gcube.vremanagement.executor.exception.ExecutorException; import org.gcube.vremanagement.executor.exception.PluginInstanceNotFoundException; import org.gcube.vremanagement.executor.exception.SchedulePersistenceException; import org.gcube.vremanagement.executor.json.ExtendedSEMapper; import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceConfiguration; import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceConnector; import org.gcube.vremanagement.executor.plugin.PluginStateEvolution; import org.gcube.vremanagement.executor.scheduledtask.ScheduledTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.orientechnologies.orient.core.Orient; import com.orientechnologies.orient.core.db.ODatabasePool; import com.orientechnologies.orient.core.db.ODatabaseSession; import com.orientechnologies.orient.core.record.impl.ODocument; import com.orientechnologies.orient.core.sql.query.OSQLSynchQuery; /** * @author Luca Frosini (ISTI - CNR) * */ public class OrientDBPersistenceConnector extends SmartExecutorPersistenceConnector { private static final Logger logger = LoggerFactory.getLogger(OrientDBPersistenceConnector.class); protected final static String CONTEXT = "context"; protected final static String UUID = "uuid"; protected final static String ITERATION = "iteration"; protected final static String TIMESTAMP = "timestamp"; protected final String RUN_ON = "runOn"; protected ODatabasePool oDatabasePool; public OrientDBPersistenceConnector(SmartExecutorPersistenceConfiguration configuration) throws Exception { super(); prepareConnection(configuration); } protected void prepareConnection(SmartExecutorPersistenceConfiguration configuration) throws Exception { Orient.instance().removeShutdownHook(); logger.debug("Preparing Connection for {}", this.getClass().getSimpleName()); String url = configuration.getURL(); String username = configuration.getUsername(); String password = configuration.getPassword(); oDatabasePool = new ODatabasePool(url, username, password); } @Override public void close() throws Exception { oDatabasePool.close(); oDatabasePool = null; } @Override public PluginStateEvolution getPluginInstanceState(UUID uuid, Integer iterationNumber) throws PluginInstanceNotFoundException, ExecutorException { ODatabaseSession oDatabaseSession = null; try { oDatabaseSession = oDatabasePool.acquire(); String type = PluginStateEvolution.class.getSimpleName(); Map params = new HashMap(); params.put(UUID, uuid.toString()); params.put(CONTEXT, ContextUtility.getCurrentContext()); OSQLSynchQuery query = null; if(iterationNumber != null && iterationNumber > 0) { query = new OSQLSynchQuery(String.format( "SELECT FROM %s WHERE %s = :%s AND %s = :%s AND %s = :%s ORDER BY %s DESC LIMIT 1", type, CONTEXT, CONTEXT, UUID, UUID, ITERATION, ITERATION, TIMESTAMP)); params.put(ITERATION, iterationNumber); } else { /* query = new OSQLSynchQuery( String.format( "SELECT FROM %s WHERE %s = :%s AND %s = :%s ORDER BY %s DESC", type, SCOPE, SCOPE, UUID, UUID, ITERATION)); */ query = new OSQLSynchQuery( String.format("SELECT FROM %s WHERE %s = :%s AND %s = :%s ORDER BY %s DESC, %s DESC LIMIT 1", type, CONTEXT, CONTEXT, UUID, UUID, ITERATION, TIMESTAMP)); } List result = query.execute(params); ODocument resDoc = result.get(0); /* ODocument resDoc = null; if (iterationNumber != null) { resDoc = result.get(0); } else { // TODO manage better long maxTimestamp = 0; for (ODocument oDoc : result) { long tm = (long) oDoc.field(TIMESTAMP); if (maxTimestamp <= tm) { maxTimestamp = tm; resDoc = oDoc; } } } */ String json = resDoc.toJSON("class"); PluginStateEvolution pluginStateEvolution = ExtendedSEMapper.getInstance() .unmarshal(PluginStateEvolution.class, json); return pluginStateEvolution; } catch(Exception e) { throw new PluginInstanceNotFoundException(uuid); } finally { closeDatabaseSession(oDatabaseSession); } } @Override public void pluginStateEvolution(PluginStateEvolution pluginStateEvolution, Exception exception) throws Exception { ODatabaseSession oDatabaseSession = null; try { oDatabaseSession = oDatabasePool.acquire(); ODocument doc = new ODocument(PluginStateEvolution.class.getSimpleName()); String json = ExtendedSEMapper.getInstance().marshal(pluginStateEvolution); doc.fromJSON(json); doc.field(CONTEXT, ContextUtility.getCurrentContext()); doc.save(); oDatabaseSession.commit(); } catch(Exception e) { if(oDatabaseSession != null) { oDatabaseSession.rollback(); } throw e; } finally { closeDatabaseSession(oDatabaseSession); } } @Override public void addScheduledTask(ScheduledTask scheduledTask) throws SchedulePersistenceException { ODatabaseSession oDatabaseSession = null; try { oDatabaseSession = oDatabasePool.acquire(); ODocument doc = new ODocument(ScheduledTask.class.getSimpleName()); long timestamp = Calendar.getInstance().getTimeInMillis(); doc.field(TIMESTAMP, timestamp); String json = ExtendedSEMapper.getInstance().marshal(scheduledTask); doc.fromJSON(json); doc.save(); oDatabaseSession.commit(); } catch(Exception e) { if(oDatabaseSession != null) { oDatabaseSession.rollback(); } throw new SchedulePersistenceException(e); } finally { closeDatabaseSession(oDatabaseSession); } } @Override public List getScheduledTasks(Collection plugins) throws SchedulePersistenceException { ODatabaseSession oDatabaseSession = null; try { oDatabaseSession = oDatabasePool.acquire(); String type = ScheduledTask.class.getSimpleName(); String queryString = String.format("SELECT * FROM %s WHERE %s = '%s'", type, "context", ContextUtility.getCurrentContext()); if(plugins != null && plugins.size() != 0) { boolean first = true; for(String pluginName : plugins) { if(first) { first = false; queryString = String.format("%s AND ( (%s = '%s') ", queryString, ScheduledTask.LAUNCH_PARAMETER + "." + LaunchParameter.PLUGIN_NAME, pluginName); } else { queryString = String.format("%s OR (%s = '%s') ", queryString, ScheduledTask.LAUNCH_PARAMETER + "." + LaunchParameter.PLUGIN_NAME, pluginName); } } queryString = queryString + ")"; } OSQLSynchQuery query = new OSQLSynchQuery(queryString); List result = query.execute(); List scheduledTasks = new ArrayList<>(); for(ODocument doc : result) { String json = doc.toJSON("class"); ScheduledTask scheduledTask = ExtendedSEMapper.getInstance().unmarshal(ScheduledTask.class, json); scheduledTasks.add(scheduledTask); } return scheduledTasks; } catch(Exception e) { throw new SchedulePersistenceException(e); } finally { closeDatabaseSession(oDatabaseSession); } } protected ODocument getScheduledTaskDocument(UUID uuid) throws SchedulePersistenceException { try { String type = ScheduledTask.class.getSimpleName(); Map params = new HashMap(); params.put(UUID, uuid.toString()); OSQLSynchQuery query = new OSQLSynchQuery( String.format("SELECT FROM %s WHERE %s = :%s", type, UUID, UUID)); List result = query.execute(params); if(result.size() > 1) { String error = String.format("Found more than one %s with UUID=%s. %s. %s.", type, uuid.toString(), "This is really strange and should not occur", "Please contact the smart-executor administrator"); logger.error(error); throw new SchedulePersistenceException(error); } else if(result.size() == 0) { String error = String.format("No %s with UUID=%s found.", type, uuid.toString()); logger.error(error); throw new SchedulePersistenceException(error); } return result.get(0); } catch(Exception e) { throw new SchedulePersistenceException(e); } } @Override public ScheduledTask getScheduledTask(UUID uuid) throws SchedulePersistenceException { ODatabaseSession oDatabaseSession = null; try { oDatabaseSession = oDatabasePool.acquire(); ODocument doc = getScheduledTaskDocument(uuid); String json = doc.toJSON("class"); return ExtendedSEMapper.getInstance().unmarshal(ScheduledTask.class, json); } catch(Exception e) { throw new SchedulePersistenceException(e); } finally { closeDatabaseSession(oDatabaseSession); } } @Override public void reserveScheduledTask(ScheduledTask scheduledTask) throws SchedulePersistenceException { releaseScheduledTask(scheduledTask); } @Override public void removeScheduledTask(UUID uuid) throws SchedulePersistenceException { ODatabaseSession oDatabaseSession = null; try { oDatabaseSession = oDatabasePool.acquire(); ODocument doc = getScheduledTaskDocument(uuid); doc.delete(); oDatabaseSession.commit(); } catch(Exception e) { if(oDatabaseSession != null) { oDatabaseSession.rollback(); } throw new SchedulePersistenceException(e); } finally { closeDatabaseSession(oDatabaseSession); } } @Override public void removeScheduledTask(ScheduledTask scheduledTask) throws SchedulePersistenceException { removeScheduledTask(scheduledTask.getUUID()); } @Override public void releaseScheduledTask(UUID uuid) throws SchedulePersistenceException { ODatabaseSession oDatabaseSession = null; try { oDatabaseSession = oDatabasePool.acquire(); ODocument doc = getScheduledTaskDocument(uuid); doc.removeField(RUN_ON); doc.save(); } catch(Exception e) { if(oDatabaseSession != null) { oDatabaseSession.rollback(); } throw new SchedulePersistenceException(e); } finally { closeDatabaseSession(oDatabaseSession); } } private void closeDatabaseSession(ODatabaseSession oDatabaseSession) { if(oDatabaseSession != null) { oDatabaseSession.close(); oDatabaseSession = null; } } @Override public void releaseScheduledTask(ScheduledTask scheduledTask) throws SchedulePersistenceException { releaseScheduledTask(scheduledTask.getUUID()); } public static void shutdown(){ //Orient.OShutdownOrientDBInstancesHandler. Orient.instance().shutdown(); } }