145 lines
4.7 KiB
Java
145 lines
4.7 KiB
Java
package org.gcube.application.perform.service.engine;
|
|
|
|
import java.sql.Connection;
|
|
import java.sql.PreparedStatement;
|
|
import java.sql.ResultSet;
|
|
import java.sql.SQLException;
|
|
import java.util.List;
|
|
|
|
import org.gcube.application.perform.service.LocalConfiguration;
|
|
import org.gcube.application.perform.service.engine.dm.DMException;
|
|
import org.gcube.application.perform.service.engine.dm.DMUtils;
|
|
import org.gcube.application.perform.service.engine.dm.ImporterMonitor;
|
|
import org.gcube.application.perform.service.engine.model.BeanNotFound;
|
|
import org.gcube.application.perform.service.engine.model.DBField.ImportRoutine;
|
|
import org.gcube.application.perform.service.engine.model.DBQueryDescriptor;
|
|
import org.gcube.application.perform.service.engine.model.ISQueryDescriptor;
|
|
import org.gcube.application.perform.service.engine.model.InternalException;
|
|
import org.gcube.application.perform.service.engine.model.importer.ImportRequest;
|
|
import org.gcube.application.perform.service.engine.model.importer.ImportRoutineDescriptor;
|
|
import org.gcube.data.analysis.dataminermanagercl.shared.data.computations.ComputationId;
|
|
|
|
import lombok.Synchronized;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
@Slf4j
|
|
public class ImporterImpl implements Importer {
|
|
|
|
private static final String ORPHAN_IMPORTS="";
|
|
private static final String ACQUIRE_PS="";
|
|
private static final String GET_BY_ID="";
|
|
|
|
private static ISQueryDescriptor isQueryDescriptor=null;
|
|
|
|
|
|
@Synchronized
|
|
private ISQueryDescriptor getISQueryDescriptor() {
|
|
if(isQueryDescriptor==null) {
|
|
isQueryDescriptor=
|
|
new ISQueryDescriptor(
|
|
LocalConfiguration.getProperty(LocalConfiguration.MAPPING_DB_ENDPOINT_NAME), null,
|
|
LocalConfiguration.getProperty(LocalConfiguration.MAPPING_DB_ENDPOINT_CATEGORY));
|
|
}
|
|
return isQueryDescriptor;
|
|
}
|
|
|
|
|
|
private static final String getHostname() {
|
|
throw new RuntimeException("IMPLEMENT THIS");
|
|
}
|
|
|
|
public void init() throws InternalException{
|
|
try {
|
|
log.info("Initializing IMPORTER");
|
|
DataBaseManager db=DataBaseManager.get(getISQueryDescriptor());
|
|
Connection conn=db.getConnection();
|
|
conn.setAutoCommit(true);
|
|
PreparedStatement psOrphans=conn.prepareStatement(ORPHAN_IMPORTS);
|
|
PreparedStatement psAcquire=conn.prepareStatement(ACQUIRE_PS);
|
|
// set ps
|
|
ResultSet rsOrphans=psOrphans.executeQuery();
|
|
long monitoredCount=0l;
|
|
while(rsOrphans.next()) {
|
|
Long id=rsOrphans.getLong(ImportRoutine.ID);
|
|
try {
|
|
ImportRoutineDescriptor desc=rowToDescriptor(rsOrphans);
|
|
String hostname=getHostname();
|
|
|
|
// "acquire"
|
|
// set lock = hostname where ID =? and LOCK is null
|
|
// Acquired = updated rows == 1
|
|
psAcquire.setString(1, hostname);
|
|
psAcquire.setLong(2, id);
|
|
|
|
if(psAcquire.executeUpdate()>0) {
|
|
log.debug("Acquired {} ",id);
|
|
monitor(desc);
|
|
monitoredCount++;
|
|
}
|
|
}catch(Throwable t) {
|
|
log.warn("Unable to monitor orphan with ID {} ",id,t);
|
|
}
|
|
}
|
|
log.info("Acquired {} import executions for monitoring",monitoredCount);
|
|
}catch(Throwable t) {
|
|
log.warn("Unexpected Error while trying to check orphan import routines");
|
|
throw new InternalException(t);
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
public ImportRoutineDescriptor importExcel(ImportRequest request) throws DMException, BeanNotFound, SQLException {
|
|
log.debug("Submitting {} ",request);
|
|
ComputationId id=submit(request);
|
|
log.debug("Registering {} computationID {} ",request,id);
|
|
ImportRoutineDescriptor desc=register(id,request);
|
|
log.debug("Monitoring {} computationID {} ",desc,id);
|
|
monitor(desc);
|
|
return getDescriptorById(desc.getId());
|
|
}
|
|
|
|
|
|
|
|
private void monitor(ImportRoutineDescriptor desc) throws DMException {
|
|
log.debug("Monitoring {} ",desc);
|
|
DMUtils.monitor(DMUtils.getComputation(desc), new ImporterMonitor(desc,getISQueryDescriptor()));
|
|
}
|
|
|
|
private ComputationId submit(ImportRequest request) {
|
|
throw new RuntimeException("IMPLEMENT THIS SHIT");
|
|
}
|
|
|
|
private ImportRoutineDescriptor register(ComputationId computationId,ImportRequest request) {
|
|
throw new RuntimeException("IMPLEMENT THIS SHIT");
|
|
}
|
|
|
|
private ImportRoutineDescriptor getDescriptorById(Long id) throws BeanNotFound, SQLException {
|
|
DataBaseManager db=DataBaseManager.get(getISQueryDescriptor());
|
|
Connection conn=db.getConnection();
|
|
PreparedStatement ps=conn.prepareStatement(GET_BY_ID);
|
|
ps.setLong(1, id);
|
|
ResultSet rs=ps.executeQuery();
|
|
if(rs.next()) return rowToDescriptor(rs);
|
|
else throw new BeanNotFound("Unable to find Routine with ID "+id);
|
|
|
|
}
|
|
|
|
|
|
@Override
|
|
public List<ImportRoutineDescriptor> getDescriptors(DBQueryDescriptor desc) {
|
|
// TODO Auto-generated method stub
|
|
return null;
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private static ImportRoutineDescriptor rowToDescriptor(ResultSet rs) {
|
|
throw new RuntimeException("IMPLEMENT THIS SHIT");
|
|
}
|
|
|
|
}
|