package org.gcube.dataanalysis.dataminer.poolmanager.service; import org.gcube.common.authorization.library.provider.SecurityTokenProvider; import org.gcube.common.resources.gcore.GenericResource; import org.gcube.common.resources.gcore.Resources; import org.gcube.common.scope.api.ScopeProvider; import org.gcube.dataanalysis.dataminer.poolmanager.ansible.AnsibleWorker; import org.gcube.dataanalysis.dataminer.poolmanager.ansiblebridge.AnsibleBridge; import org.gcube.dataanalysis.dataminer.poolmanager.clients.HAProxy; import org.gcube.dataanalysis.dataminer.poolmanager.datamodel.*; import org.gcube.informationsystem.publisher.AdvancedScopedPublisher; import org.gcube.informationsystem.publisher.RegistryPublisherFactory; import org.gcube.informationsystem.publisher.ScopedPublisher; import org.gcube.informationsystem.publisher.exception.RegistryNotFoundException; import org.gcube.resources.discovery.client.api.DiscoveryClient; import org.gcube.resources.discovery.client.queries.api.SimpleQuery; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tmatesoft.svn.core.SVNException; import java.io.*; import java.net.MalformedURLException; import java.net.URL; import java.net.URLConnection; import java.net.UnknownHostException; import java.util.*; import static org.gcube.resources.discovery.icclient.ICFactory.clientFor; import static org.gcube.resources.discovery.icclient.ICFactory.queryFor; public class DataminerPoolManager { private static final Logger LOGGER = LoggerFactory.getLogger(DataminerPoolManager.class); public String stageAlgorithm(Algorithm algo) throws IOException, InterruptedException { Cluster cluster = new Cluster(); Host h = new Host(); h.setName(getStagingDataminerHostname()); cluster.addHost(h); //Assumes the service is running in RPrototypingLab String token = SecurityTokenProvider.instance.get(); return addAlgorithmToCluster(algo, cluster, true, "root", true, token); } public String publishAlgorithm(Algorithm algo, String targetVREToken) throws IOException, InterruptedException { Cluster cluster = new Cluster(); for (Host h : new HAProxy().listDataMinersByCluster()) { cluster.addHost(h); } return addAlgorithmToCluster(algo, cluster, false, "gcube", false, targetVREToken); } private String getStagingDataminerHostname(){ return null; } private void updateSVNDependencies(Algorithm algo, boolean stagingVRE) throws IOException, SVNException { for (Dependency d : algo.getDependencies()) { if (d.getType().equals("os")) { List ls = new LinkedList(); ls.add(d.getName()); this.updateSVN(stagingVRE ? "test_": "" + "r_deb_pkgs.txt", ls); } if (d.getType().equals("cran")) { List ls = new LinkedList(); ls.add(d.getName()); this.updateSVN(stagingVRE ? "test_": "" + "r_cran_pkgs.txt", ls); } if (d.getType().equals("github")) { List ls = new LinkedList(); ls.add(d.getName()); this.updateSVN(stagingVRE ? "test_": "" + "r_github_pkgs.txt", ls); } } } private void createISResource(Algorithm algo, String vreToken){ //TODO: implement method //TODO: create the resource only if not already present } /** * * @param algo * @param dataminerCluster * @return uuid of the execution */ public String addAlgorithmToCluster( final Algorithm algo, Cluster dataminerCluster, boolean includeAlgorithmDependencies, String user, final boolean stagingVRE, final String targetVREToken) throws IOException { AnsibleBridge ansibleBridge = new AnsibleBridge(); final AnsibleWorker worker = ansibleBridge.createWorker(algo, dataminerCluster, includeAlgorithmDependencies, user); new Thread(new Runnable() { @Override public void run() { try { File path = new File(worker.getWorkdir() + File.separator + "logs"); path.mkdirs(); File n = new File(path + File.separator + worker.getWorkerId()); FileOutputStream fos = new FileOutputStream(n); PrintStream ps = new PrintStream(fos); int retValue = worker.execute(ps); System.out.println("Log stored to to " + n.getAbsolutePath()); if(retValue == 0) { updateSVNDependencies(algo, stagingVRE); createISResource(algo, targetVREToken); } // destroy the worker worker.destroy(); } catch (Exception e) { e.printStackTrace(); } } }).start(); // this line will execute immediately, not waiting for task to // complete return worker.getWorkerId(); } public String getScriptFromURL(URL url) throws IOException { if (url == null) { return null; } URLConnection yc = url.openConnection(); BufferedReader input = new BufferedReader(new InputStreamReader(yc.getInputStream())); String line; StringBuffer buffer = new StringBuffer(); while ((line = input.readLine()) != null) { buffer.append(line + "\n"); } String bufferScript = buffer.substring(0, buffer.length()); input.close(); return bufferScript; } /** * Publish the given algorithm in the given VRE * * @param algorithmName * @param vre * */ public String addAlgorithmToVRE(Algorithm algorithm, final String vre, /*final boolean updateSVN*/ final boolean test) throws IOException { // create a fake algorithm set final AlgorithmSet algoSet = new AlgorithmSet(); algoSet.setName("fake"); algoSet.addAlgorithm(algorithm); final String uuid = UUID.randomUUID().toString(); new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub try { try { addAlgorithmsToVRE(algoSet, vre, uuid, /*updateSVN*/test); } catch (SVNException e) { // TODO Auto-generated catch block e.printStackTrace(); } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }).start(); // this line will execute immediately, not waiting for task to // complete System.out.println(uuid); return uuid; } public String addAlgorithmToHost(Algorithm algorithm, final String hostname, /*final boolean updateSVN*/ final boolean test) throws IOException { // create a fake algorithm set final AlgorithmSet algoSet = new AlgorithmSet(); algoSet.setName("fake"); algoSet.addAlgorithm(algorithm); final String uuid = UUID.randomUUID().toString(); new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub try { if(test){ addAlgorithmsToStagingHost(algoSet, hostname, uuid, /*updateSVN,*/test);} } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (SVNException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }).start(); // this line will execute immediately, not waiting for your task to // complete System.out.println(uuid); return uuid; } public URL getURLfromWorkerLog(String a) throws MalformedURLException, UnknownHostException { File path = new File(System.getProperty("user.home") + File.separator + "dataminer-pool-manager/work/" + a + File.separator + "logs"); path.mkdirs(); File n = new File(path + File.separator + a); // String addr = InetAddress.getLocalHost().getHostAddress(); return new File(n.getPath()).toURI().toURL(); } public String addAlgorithmsToVRE(AlgorithmSet algorithms, String vre, String uuid, /*boolean updateSVN,*/ boolean test) throws IOException, InterruptedException, SVNException { // create the cluster (dataminers in the vre) Cluster cluster = new Cluster(); for (Host h : new HAProxy().listDataMinersByCluster()) { //for (Host h : new ISClient().listDataminersInVRE()) { cluster.addHost(h); } // apply the changes AnsibleBridge a = new AnsibleBridge(); return a.applyAlgorithmSetToCluster(algorithms, cluster, uuid, /*updateSVN,*/ test).getWorkerId(); } public String addAlgorithmsToHost(AlgorithmSet algorithms, String hostname, String uuid, /*boolean updateSVN,*/boolean test) throws IOException, InterruptedException, SVNException { // create the cluster (dataminers in the vre) Cluster cluster = new Cluster(); for (Host h : new HAProxy().listDataMinersByCluster()) { if (h.getName().equals(hostname)) { cluster.addHost(h); } } // if(ISClient.getHProxy().equals(hostname)){ // cluster.addHost(new ISClient().getDataminer(hostname)); // } // apply the changes AnsibleBridge a = new AnsibleBridge(); return a.applyAlgorithmSetToCluster(algorithms, cluster, uuid, /*updateSVN,*/test).getWorkerId(); } public String addAlgorithmsToStagingHost(AlgorithmSet algorithms, String hostname, String uuid, /*boolean updateSVN,*/boolean test) throws IOException, InterruptedException, SVNException { Cluster cluster = new Cluster(); Host h = new Host(); h.setName(hostname); cluster.addHost(h); AnsibleBridge a = new AnsibleBridge(); return a.applyAlgorithmSetToCluster(algorithms, cluster, uuid, /*updateSVN,*/test).getWorkerId(); } // 2017 March 29 public void unPublishScopedResource(GenericResource resource) throws RegistryNotFoundException, Exception { ScopedPublisher scopedPublisher = RegistryPublisherFactory.scopedPublisher(); AdvancedScopedPublisher advancedScopedPublisher = new AdvancedScopedPublisher(scopedPublisher); String id = resource.id(); LOGGER.debug("Trying to remove {} with ID {} from {}", resource.getClass().getSimpleName(), id, ScopeProvider.instance.get()); // scopedPublisher.remove(resource, scopes); advancedScopedPublisher.forceRemove(resource); LOGGER.debug("{} with ID {} removed successfully", resource.getClass().getSimpleName(), id); } public void publishScopedResource(GenericResource a, List scopes) throws RegistryNotFoundException, Exception { StringWriter stringWriter = new StringWriter(); Resources.marshal(a, stringWriter); ScopedPublisher scopedPublisher = RegistryPublisherFactory.scopedPublisher(); try { System.out.println(scopes); System.out.println(stringWriter); scopedPublisher.create(a, scopes); } catch (RegistryNotFoundException e) { System.out.println(e); throw e; } } public void addAlgToIs(Algorithm algo) { GenericResource a = new GenericResource(); a.newProfile().name(algo.getName()).type("StatisticalManagerAlgorithm").description(algo.getDescription()); a.profile().newBody(this.getAlgoBody(algo)); try { publishScopedResource(a, Arrays.asList(new String[] { ScopeProvider.instance.get() })); } catch (Exception e) { e.printStackTrace(); } } private String getAlgoBody(Algorithm algo) { return "" + algo.getCategory() + "" + "\n" + "" + algo.getClazz() + "" + "\n" + "" + algo.getAlgorithmType() + "" + "\n" + "" + algo.getSkipJava() + "" + "\n" + "" + algo.getPackageURL() + "" + "\n" + "" + algo.getDependencies() + ""; } public void updateAlg(Algorithm algo) { ScopedPublisher scopedPublisher = RegistryPublisherFactory.scopedPublisher(); SimpleQuery query = queryFor(GenericResource.class); query.addCondition("$resource/Profile/Name/text() eq '" + algo.getName() + "'").setResult("$resource"); DiscoveryClient client = clientFor(GenericResource.class); List ds = client.submit(query); if (ds.isEmpty()) { return; } GenericResource a = ds.get(0); a.profile().newBody(this.getAlgoBody(algo)); try { scopedPublisher.update(a); } catch (RegistryNotFoundException e) { e.printStackTrace(); } } private Algorithm convertAlgo(GenericResource a) { Algorithm out = new Algorithm(); // out.setId(a.profile().body().getElementsByTagName("id").item(0).getTextContent()); out.setAlgorithmType(a.profile().body().getElementsByTagName("algorithmType").item(0).getTextContent()); out.setCategory(a.profile().body().getElementsByTagName("category").item(0).getTextContent()); out.setClazz(a.profile().body().getElementsByTagName("clazz").item(0).getTextContent()); out.setName(a.profile().name()); out.setPackageURL(a.profile().body().getElementsByTagName("packageURL").item(0).getTextContent()); out.setSkipJava(a.profile().body().getElementsByTagName("skipJava").item(0).getTextContent()); out.setDescription(a.profile().description()); Set deps = new HashSet(); for (int i = 0; i < a.profile().body().getElementsByTagName("dependencies").getLength(); i++) { org.gcube.dataanalysis.dataminer.poolmanager.datamodel.Dependency d1 = new org.gcube.dataanalysis.dataminer.poolmanager.datamodel.Dependency(); d1.setName(a.profile().body().getElementsByTagName("dependencies").item(i).getTextContent()); deps.add(d1); } out.setDependencies(deps); return out; } public Set getAlgoFromIs() { // TODO Auto-generated method stub Set out = new HashSet(); SimpleQuery query = queryFor(GenericResource.class); query.addCondition("$resource/Profile/SecondaryType/text() eq 'StatisticalManagerAlgorithm'") .setResult("$resource"); DiscoveryClient client = clientFor(GenericResource.class); List ds = client.submit(query); for (GenericResource a : ds) { out.add(this.convertAlgo(a)); } return out; } public List updateSVN(String file, List ldep) throws SVNException, IOException { // TODO Auto-generated method stub return null; } }