dataminer-pool-manager/src/main/java/org/gcube/dataanalysis/dataminer/poolmanager/service/DataminerPoolManager.java

310 lines
9.9 KiB
Java
Raw Normal View History

package org.gcube.dataanalysis.dataminer.poolmanager.service;
import org.gcube.common.authorization.library.provider.SecurityTokenProvider;
import org.gcube.common.resources.gcore.GenericResource;
import org.gcube.common.resources.gcore.Resources;
import org.gcube.common.scope.api.ScopeProvider;
import org.gcube.dataanalysis.dataminer.poolmanager.ansible.AnsibleWorker;
import org.gcube.dataanalysis.dataminer.poolmanager.ansiblebridge.AnsibleBridge;
import org.gcube.dataanalysis.dataminer.poolmanager.clients.HAProxy;
import org.gcube.dataanalysis.dataminer.poolmanager.clients.ISClient;
import org.gcube.dataanalysis.dataminer.poolmanager.datamodel.*;
import org.gcube.dataanalysis.dataminer.poolmanager.util.Props;
import org.gcube.dataanalysis.dataminer.poolmanager.util.SVNUpdater;
import org.gcube.informationsystem.publisher.AdvancedScopedPublisher;
import org.gcube.informationsystem.publisher.RegistryPublisherFactory;
import org.gcube.informationsystem.publisher.ScopedPublisher;
import org.gcube.informationsystem.publisher.exception.RegistryNotFoundException;
import org.gcube.resources.discovery.client.api.DiscoveryClient;
import org.gcube.resources.discovery.client.queries.api.SimpleQuery;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tmatesoft.svn.core.SVNException;
import java.io.*;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import java.net.UnknownHostException;
import java.util.*;
import static org.gcube.resources.discovery.icclient.ICFactory.clientFor;
import static org.gcube.resources.discovery.icclient.ICFactory.queryFor;
public class DataminerPoolManager {
private SVNUpdater svnUpdater;
public DataminerPoolManager(){
try {
//TODO: read this from configuration - fatto
this.svnUpdater = new SVNUpdater(new Props().getSVNrepo());
} catch (SVNException e) {
e.printStackTrace();
}
}
public String stageAlgorithm(Algorithm algo) throws IOException, InterruptedException {
Cluster cluster = getStagingDataminerCluster();
//Assumes the service is running in RPrototypingLab
String token = SecurityTokenProvider.instance.get();
return addAlgorithmToCluster(algo, cluster, true, "root", true, token);
}
public String publishAlgorithm(Algorithm algo, String targetVREToken) throws IOException, InterruptedException {
Cluster cluster = new Cluster();
SecurityTokenProvider.instance.set(targetVREToken);
for (Host h : new HAProxy().listDataMinersByCluster()) {
cluster.addHost(h);
}
return addAlgorithmToCluster(algo, cluster, false, "gcube", false, targetVREToken);
}
//1. to complete
private Cluster getStagingDataminerCluster(){
Cluster cluster = new Cluster();
Host h = new Host();
Props p = new Props();
//TODO: read this from configuration or IS?
h.setName(p.getStagingHost());
cluster.addHost(h);
return cluster;
}
private void createISResource(Algorithm algo, String vreToken) {
ISClient client = new ISClient();
for (Algorithm a : client.getAlgoFromIs()) {
if (a.getName().equals(algo.getName())) {
continue;
}
if (!a.getName().equals(algo.getName())){
new ISClient().addAlgToIs(algo, vreToken);
}
}
// TODO: create the resource only if not already present
}
private void updateSVNDependencies(Algorithm algo, boolean stagingVRE) throws IOException, SVNException {
for (Dependency d : algo.getDependencies()) {
if (d.getType().equals("os")) {
List<String> ls = new LinkedList<String>();
ls.add(d.getName());
this.svnUpdater.updateSVN(stagingVRE ? "test_": "" + "r_deb_pkgs.txt", ls);
}
if (d.getType().equals("cran")) {
List<String> ls = new LinkedList<String>();
ls.add(d.getName());
this.svnUpdater.updateSVN(stagingVRE ? "test_": "" + "r_cran_pkgs.txt", ls);
}
if (d.getType().equals("github")) {
List<String> ls = new LinkedList<String>();
ls.add(d.getName());
this.svnUpdater.updateSVN(stagingVRE ? "test_": "" + "r_github_pkgs.txt", ls);
}
}
}
/**
*
* @param algo
* @param dataminerCluster
* @return uuid of the execution
*/
public String addAlgorithmToCluster(
final Algorithm algo,
Cluster dataminerCluster,
boolean includeAlgorithmDependencies,
String user,
final boolean stagingVRE,
final String targetVREToken) throws IOException {
AnsibleBridge ansibleBridge = new AnsibleBridge();
final AnsibleWorker worker = ansibleBridge.createWorker(algo, dataminerCluster, includeAlgorithmDependencies, user);
new Thread(new Runnable() {
@Override
public void run() {
try {
File path = new File(worker.getWorkdir() + File.separator + "logs");
path.mkdirs();
File n = new File(path + File.separator + worker.getWorkerId());
FileOutputStream fos = new FileOutputStream(n);
PrintStream ps = new PrintStream(fos);
int retValue = worker.execute(ps);
System.out.println("Log stored to to " + n.getAbsolutePath());
if(retValue == 0) {
updateSVNDependencies(algo, stagingVRE);
//createISResource(algo, targetVREToken);
}
// destroy the worker
worker.destroy();
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
// this line will execute immediately, not waiting for task to
// complete
return worker.getWorkerId();
}
public String getScriptFromURL(URL url) throws IOException {
if (url == null) {
return null;
}
URLConnection yc = url.openConnection();
BufferedReader input = new BufferedReader(new InputStreamReader(yc.getInputStream()));
String line;
StringBuffer buffer = new StringBuffer();
while ((line = input.readLine()) != null) {
buffer.append(line + "\n");
}
String bufferScript = buffer.substring(0, buffer.length());
input.close();
return bufferScript;
}
// public String addAlgorithmToVRE(Algorithm algorithm, final String vre, /*final boolean updateSVN*/ final boolean test) throws IOException {
// // create a fake algorithm set
// final AlgorithmSet algoSet = new AlgorithmSet();
// algoSet.setName("fake");
// algoSet.addAlgorithm(algorithm);
// final String uuid = UUID.randomUUID().toString();
//
// new Thread(new Runnable() {
// @Override
// public void run() {
// // TODO Auto-generated method stub
// try {
// try {
// addAlgorithmsToVRE(algoSet, vre, uuid, /*updateSVN*/test);
// } catch (SVNException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
// } catch (IOException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// } catch (InterruptedException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
// }
// }).start();
// // this line will execute immediately, not waiting for task to
// // complete
// System.out.println(uuid);
// return uuid;
// }
// public String addAlgorithmToHost(Algorithm algorithm, final String hostname, /*final boolean updateSVN*/ final boolean test) throws IOException {
// // create a fake algorithm set
// final AlgorithmSet algoSet = new AlgorithmSet();
// algoSet.setName("fake");
// algoSet.addAlgorithm(algorithm);
// final String uuid = UUID.randomUUID().toString();
//
// new Thread(new Runnable() {
// @Override
// public void run() {
// // TODO Auto-generated method stub
// try {
// if(test){
// addAlgorithmsToStagingHost(algoSet, hostname, uuid, /*updateSVN,*/test);}
// } catch (IOException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// } catch (InterruptedException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// } catch (SVNException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
// }
// }).start();
// // this line will execute immediately, not waiting for your task to
// // complete
// System.out.println(uuid);
// return uuid;
// }
public URL getURLfromWorkerLog(String a) throws MalformedURLException, UnknownHostException {
File path = new File(System.getProperty("user.home") + File.separator + "dataminer-pool-manager/work/"
+ a + File.separator + "logs");
path.mkdirs();
File n = new File(path + File.separator + a);
// String addr = InetAddress.getLocalHost().getHostAddress();
return new File(n.getPath()).toURI().toURL();
}
// public String addAlgorithmsToVRE(AlgorithmSet algorithms, String vre, String uuid, /*boolean updateSVN,*/ boolean test) throws IOException, InterruptedException, SVNException {
//
// // create the cluster (dataminers in the vre)
// Cluster cluster = new Cluster();
// for (Host h : new HAProxy().listDataMinersByCluster()) {
// //for (Host h : new ISClient().listDataminersInVRE()) {
// cluster.addHost(h);
// }
//
// // apply the changes
// AnsibleBridge a = new AnsibleBridge();
// return a.applyAlgorithmSetToCluster(algorithms, cluster, uuid, /*updateSVN,*/ test).getWorkerId();
//
// }
// public String addAlgorithmsToHost(AlgorithmSet algorithms, String hostname, String uuid, /*boolean updateSVN,*/boolean test)
// throws IOException, InterruptedException, SVNException {
//
// // create the cluster (dataminers in the vre)
// Cluster cluster = new Cluster();
// for (Host h : new HAProxy().listDataMinersByCluster()) {
// if (h.getName().equals(hostname)) {
// cluster.addHost(h);
// }
// }
// // if(ISClient.getHProxy().equals(hostname)){
// // cluster.addHost(new ISClient().getDataminer(hostname));
// // }
// // apply the changes
// AnsibleBridge a = new AnsibleBridge();
// return a.applyAlgorithmSetToCluster(algorithms, cluster, uuid, /*updateSVN,*/test).getWorkerId();
//
// }
//
// public String addAlgorithmsToStagingHost(AlgorithmSet algorithms, String hostname, String uuid, /*boolean updateSVN,*/boolean test)
// throws IOException, InterruptedException, SVNException {
// Cluster cluster = new Cluster();
// Host h = new Host();
// h.setName(hostname);
// cluster.addHost(h);
//
// AnsibleBridge a = new AnsibleBridge();
// return a.applyAlgorithmSetToCluster(algorithms, cluster, uuid, /*updateSVN,*/test).getWorkerId();
//
// }
}