/** * */ package org.gcube.vremanagement.executor.persistence.orientdb; import java.util.ArrayList; import java.util.Calendar; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import org.gcube.vremanagement.executor.SmartExecutorInitializator; import org.gcube.vremanagement.executor.api.types.LaunchParameter; import org.gcube.vremanagement.executor.exception.PluginStateNotRetrievedException; import org.gcube.vremanagement.executor.exception.SchedulePersistenceException; import org.gcube.vremanagement.executor.json.ObjectMapperManager; import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceConfiguration; import org.gcube.vremanagement.executor.persistence.SmartExecutorPersistenceConnector; import org.gcube.vremanagement.executor.plugin.PluginDeclaration; import org.gcube.vremanagement.executor.plugin.PluginStateEvolution; import org.gcube.vremanagement.executor.plugin.RunOn; import org.gcube.vremanagement.executor.scheduledtask.ScheduledTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import com.orientechnologies.orient.core.db.OPartitionedDatabasePool; import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx; import com.orientechnologies.orient.core.record.impl.ODocument; import com.orientechnologies.orient.core.sql.query.OSQLSynchQuery; /** * @author Luca Frosini (ISTI - CNR) * */ public class OrientDBPersistenceConnector extends SmartExecutorPersistenceConnector { private static final Logger logger = LoggerFactory .getLogger(OrientDBPersistenceConnector.class); protected final static int LAST = -1; protected final String scope; protected final String SCOPE = "scope"; protected final String UUID = "uuid"; protected final String ITERATION = "iteration"; protected final String TIMESTAMP = "timestamp"; protected final String RUN_ON = "runOn"; protected OPartitionedDatabasePool oPartitionedDatabasePool; protected ObjectMapper mapper; public OrientDBPersistenceConnector(String scope, SmartExecutorPersistenceConfiguration configuration) throws Exception { super(); this.scope = scope; prepareConnection(configuration); this.mapper = ObjectMapperManager.getObjectMapper(); } public OrientDBPersistenceConnector( SmartExecutorPersistenceConfiguration configuration) throws Exception { this(SmartExecutorInitializator.getCurrentScope(), configuration); } protected void prepareConnection( SmartExecutorPersistenceConfiguration configuration) throws Exception { logger.debug("Preparing Connection for {}", this.getClass() .getSimpleName()); String url = configuration.getURL(); String username = configuration.getUsername(); String password = configuration.getPassword(); this.oPartitionedDatabasePool = new OPartitionedDatabasePool(url, username, password); } protected void prepareObjectMapper() { this.mapper = new ObjectMapper(); } @Override public void close() throws Exception { oPartitionedDatabasePool.close(); } protected PluginStateEvolution getPluginStateEvolution(UUID uuid, int iterationNumber) throws Exception { ODatabaseDocumentTx db = null; try { db = oPartitionedDatabasePool.acquire(); String type = PluginStateEvolution.class.getSimpleName(); Map params = new HashMap(); params.put(UUID, uuid.toString()); params.put(SCOPE, scope); OSQLSynchQuery query = null; if (iterationNumber != LAST) { query = new OSQLSynchQuery( String.format( "SELECT FROM %s WHERE %s = :%s AND %s = :%s AND %s = :%s ORDER BY %s DESC LIMIT 1", type, SCOPE, SCOPE, UUID, UUID, ITERATION, ITERATION, TIMESTAMP)); params.put(ITERATION, iterationNumber); } else { query = new OSQLSynchQuery( String.format( "SELECT FROM %s WHERE %s = :%s AND %s = :%s ORDER BY %s DESC", type, SCOPE, SCOPE, UUID, UUID, ITERATION)); } List result = query.execute(params); ODocument resDoc = null; if (iterationNumber != LAST) { resDoc = result.get(0); } else { // TODO manage better long maxTimestamp = 0; for (ODocument oDoc : result) { long tm = (long) oDoc.field(TIMESTAMP); if (maxTimestamp <= tm) { maxTimestamp = tm; resDoc = oDoc; } } } String json = resDoc.toJSON("class"); PluginStateEvolution pluginStateEvolution = mapper.readValue(json, PluginStateEvolution.class); return pluginStateEvolution; } catch (Exception e) { throw new PluginStateNotRetrievedException(e); } finally { db.close(); } } @Override public PluginStateEvolution getPluginInstanceState(UUID uuid, int iterationNumber) throws Exception { return getPluginStateEvolution(uuid, iterationNumber); } @Override public PluginStateEvolution getLastPluginInstanceState(UUID uuid) throws Exception { return getPluginStateEvolution(uuid, LAST); } @Override public void pluginStateEvolution(PluginStateEvolution pluginStateEvolution, Exception exception) throws Exception { ODatabaseDocumentTx db = null; try { db = oPartitionedDatabasePool.acquire(); ODocument doc = new ODocument( PluginStateEvolution.class.getSimpleName()); String json = mapper.writeValueAsString(pluginStateEvolution); doc.fromJSON(json); doc.field(SCOPE, scope); doc.save(); db.commit(); } catch (Exception e) { if (db != null) { db.rollback(); } throw e; } finally { if (db != null) { db.close(); } } } @Override public void addScheduledTask(ScheduledTask scheduledTask) throws SchedulePersistenceException { ODatabaseDocumentTx db = null; try { db = oPartitionedDatabasePool.acquire(); ODocument doc = new ODocument(ScheduledTask.class.getSimpleName()); long timestamp = Calendar.getInstance().getTimeInMillis(); doc.field(TIMESTAMP, timestamp); String json = mapper.writeValueAsString(scheduledTask); doc.fromJSON(json); doc.save(); db.commit(); } catch (Exception e) { if (db != null) { db.rollback(); } throw new SchedulePersistenceException(e); } finally { db.close(); } } @Override public List getOrphanScheduledTasks( Collection pluginDeclarations) throws SchedulePersistenceException { ODatabaseDocumentTx db = null; try { db = oPartitionedDatabasePool.acquire(); String type = ScheduledTask.class.getSimpleName(); String queryString = String.format("SELECT * FROM %s WHERE %s = '%s'", type, "scope", SmartExecutorInitializator.getCurrentScope()); if(pluginDeclarations!=null && pluginDeclarations.size()!=0){ boolean first = true; for(PluginDeclaration pluginDeclaration : pluginDeclarations){ if(first){ first = false; queryString = String.format("%s AND ( (%s = '%s') ", queryString, ScheduledTask.LAUNCH_PARAMETER + "." + LaunchParameter.PLUGIN_NAME, pluginDeclaration.getName()); }else{ queryString = String.format("%s OR (%s = '%s') ", queryString, ScheduledTask.LAUNCH_PARAMETER + "." + LaunchParameter.PLUGIN_NAME, pluginDeclaration.getName()); } } queryString = queryString + ")"; } OSQLSynchQuery query = new OSQLSynchQuery( queryString ); List result = query.execute(); List scheduledTasks = new ArrayList<>(); for (ODocument doc : result) { String json = doc.toJSON("class"); ScheduledTask scheduledTask = mapper.readValue(json, ScheduledTask.class); try { if (isOrphan(scheduledTask)) { scheduledTasks.add(scheduledTask); } } catch (Exception e) { logger.error( "An Exception occurred while evaluating if {} is orphan", json); } } return scheduledTasks; } catch (Exception e) { throw new SchedulePersistenceException(e); } finally { if (db != null) { db.close(); } } } protected ODocument getScheduledTaskDocument(ODatabaseDocumentTx db, UUID uuid) throws SchedulePersistenceException { try { String type = ScheduledTask.class.getSimpleName(); Map params = new HashMap(); params.put(UUID, uuid.toString()); OSQLSynchQuery query = new OSQLSynchQuery( String.format("SELECT FROM %s WHERE %s = :%s", type, UUID, UUID)); List result = query.execute(params); if (result.size() > 1) { String error = String.format( "Found more than one %s with UUID=%s. %s. %s.", type, uuid.toString(), "This is really strange and should not occur", "Please contact the smart-executor administrator"); logger.error(error); throw new SchedulePersistenceException(error); } else if (result.size() == 0) { String error = String.format("No %s with UUID=%s found.", type, uuid.toString()); logger.error(error); throw new SchedulePersistenceException(error); } return result.get(0); } catch (Exception e) { throw new SchedulePersistenceException(e); } } @Override public ScheduledTask getScheduledTask(UUID uuid) throws SchedulePersistenceException { ODatabaseDocumentTx db = null; try { db = oPartitionedDatabasePool.acquire(); ODocument doc = getScheduledTaskDocument(db, uuid); String json = doc.toJSON("class"); return mapper.readValue(json, ScheduledTask.class); } catch (Exception e) { throw new SchedulePersistenceException(e); } finally { if (db != null) { db.close(); } } } @Override public void reserveScheduledTask(ScheduledTask scheduledTask) throws SchedulePersistenceException { ODatabaseDocumentTx db = null; try { db = oPartitionedDatabasePool.acquire(); UUID uuid = scheduledTask.getUUID(); ODocument doc = getScheduledTaskDocument(db, uuid); Long timestamp = doc.field(TIMESTAMP); if (timestamp!=null && timestamp.compareTo(scheduledTask.getTimestamp())!=0) { throw new SchedulePersistenceException( "The ScheduledTask has been already reserved."); } RunOn runOn = ScheduledTask.generateRunOn(); ODocument runOnDocument = new ODocument(); runOnDocument.fromJSON(mapper.writeValueAsString(runOn)); doc.field(RUN_ON, runOnDocument); timestamp = Calendar.getInstance().getTimeInMillis(); doc.field(TIMESTAMP, timestamp); doc.save(); db.commit(); } catch (Exception e) { if (db != null) { db.rollback(); } throw new SchedulePersistenceException(e); } finally { if (db != null) { db.close(); } } } @Override public void removeScheduledTask(UUID uuid) throws SchedulePersistenceException { ODatabaseDocumentTx db = null; try { db = oPartitionedDatabasePool.acquire(); ODocument doc = getScheduledTaskDocument(db, uuid); doc.delete(); db.commit(); } catch (Exception e) { if (db != null) { db.rollback(); } throw new SchedulePersistenceException(e); } finally { if (db != null) { db.close(); } } } @Override public void removeScheduledTask(ScheduledTask scheduledTask) throws SchedulePersistenceException { ODatabaseDocumentTx db = null; try { db = oPartitionedDatabasePool.acquire(); UUID uuid = scheduledTask.getUUID(); ODocument doc = getScheduledTaskDocument(db, uuid); Long timestamp = doc.field(TIMESTAMP); if (timestamp!=null && timestamp.compareTo(scheduledTask.getTimestamp())!=0) { throw new SchedulePersistenceException( "The ScheduledTask has been changed."); } doc.delete(); db.commit(); } catch (Exception e) { if (db != null) { db.rollback(); } throw new SchedulePersistenceException(e); } finally { if (db != null) { db.close(); } } } @Override public void releaseScheduledTask(UUID uuid) throws SchedulePersistenceException { ODatabaseDocumentTx db = null; try { db = oPartitionedDatabasePool.acquire(); ODocument doc = getScheduledTaskDocument(db, uuid); doc.removeField(RUN_ON); doc.save(); } catch (Exception e) { if (db != null) { db.rollback(); } throw new SchedulePersistenceException(e); } finally { if (db != null) { db.close(); } } } @Override public void releaseScheduledTask(ScheduledTask scheduledTask) throws SchedulePersistenceException { ODatabaseDocumentTx db = null; try { db = oPartitionedDatabasePool.acquire(); UUID uuid = scheduledTask.getUUID(); ODocument doc = getScheduledTaskDocument(db, uuid); Long timestamp = doc.field(TIMESTAMP); if (timestamp!=null && timestamp.compareTo(scheduledTask.getTimestamp())!=0) { throw new SchedulePersistenceException( "The ScheduledTask has been changed."); } doc.removeField(RUN_ON); doc.save(); } catch (Exception e) { if (db != null) { db.rollback(); } throw new SchedulePersistenceException(e); } finally { if (db != null) { db.close(); } } } }