This commit is contained in:
Nunzio Andrea Galante 2017-04-22 17:24:33 +00:00
parent f3dd230754
commit 830080e1ea
7 changed files with 236 additions and 143 deletions

View File

@ -1,17 +1,39 @@
package org.gcube.dataanalysis.dataminer.poolmanager.ansible;
***REMOVED***
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
***REMOVED***
import java.io.InputStream;
***REMOVED***
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
***REMOVED***
***REMOVED***
import java.util.Scanner;
import java.util.Set;
import org.gcube.dataanalysis.dataminer.poolmanager.ansible.model.Inventory;
import org.gcube.dataanalysis.dataminer.poolmanager.ansible.model.Playbook;
import org.gcube.dataanalysis.dataminer.poolmanager.ansible.model.Role;
import org.gcube.dataanalysis.dataminer.poolmanager.ansiblebridge.AnsibleSerializeHelper;
import org.gcube.dataanalysis.dataminer.poolmanager.datamodel.Algorithm;
import org.gcube.dataanalysis.dataminer.poolmanager.datamodel.AlgorithmSet;
import org.gcube.dataanalysis.dataminer.poolmanager.datamodel.Dependency;
import org.tmatesoft.svn.core.SVNException;
import org.tmatesoft.svn.core.SVNURL;
import org.tmatesoft.svn.core.auth.ISVNAuthenticationManager;
import org.tmatesoft.svn.core.internal.wc.SVNFileUtil;
import org.tmatesoft.svn.core.internal.wc.admin.SVNChecksumInputStream;
import org.tmatesoft.svn.core.io.ISVNEditor;
import org.tmatesoft.svn.core.io.SVNRepository;
import org.tmatesoft.svn.core.io.SVNRepositoryFactory;
import org.tmatesoft.svn.core.io.diff.SVNDeltaGenerator;
import org.tmatesoft.svn.core.wc.SVNWCUtil;
/**
* This class is responsible for the interface with ansible, retrieving log,
@ -96,7 +118,7 @@ public class AnsibleWorker {
public void apply(PrintStream ps) throws IOException {
public void apply(AlgorithmSet as, PrintStream ps, boolean updateSVN) throws IOException, InterruptedException, SVNException {
***REMOVED*** TODO execute the playbook and return output
System.out.println(this.getWorkdir());
try {
@ -105,6 +127,41 @@ public class AnsibleWorker {
inheritIO(p.getInputStream(), ps);
inheritIO(p.getErrorStream(), ps);
if (updateSVN){
int exitValue = p.waitFor();
if (exitValue == 0){
for (Algorithm algo : as.getAlgorithms()){
for (Dependency d : algo.getDependencies()) {
if (d.getType().equals("os")) {
List<String> ls = new LinkedList<String>();
ls.add(d.getName());
this.updateSVN("r_deb_pkgs.txt", ls);
***REMOVED***
if (d.getType().equals("cran")) {
List<String> ls = new LinkedList<String>();
ls.add(d.getName());
this.updateSVN("r_cran_pkgs.txt", ls);
***REMOVED***
if (d.getType().equals("github")) {
List<String> ls = new LinkedList<String>();
ls.add(d.getName());
this.updateSVN("r_github_pkgs.txt", ls);
***REMOVED***
***REMOVED***
***REMOVED***
***REMOVED***
***REMOVED***
***REMOVED*** catch (IOException e) {
e.printStackTrace();
***REMOVED***
@ -113,6 +170,83 @@ public class AnsibleWorker {
private SVNRepository getSvnRepository(String url) throws SVNException {
SVNRepository repository = SVNRepositoryFactory.create(SVNURL.parseURIEncoded(url));
ISVNAuthenticationManager authManager = SVNWCUtil.createDefaultAuthenticationManager();
repository.setAuthenticationManager(authManager);
System.out.println(repository.getLocation());
return repository;
***REMOVED***
public List<String> updateSVN(String file, List<String> ldep) throws SVNException, IOException {
final SVNRepository svnRepository = this.getSvnRepository(
"https:***REMOVED***svn.d4science.research-infrastructures.eu/gcube/trunk/data-analysis/RConfiguration/RPackagesManagement/");
try {
final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
svnRepository.getFile(file, SVNRepository.INVALID_REVISION, null, byteArrayOutputStream);
String lines[] = byteArrayOutputStream.toString().split("\\r?\\n");
List<String> aa = this.checkMatch(lines, ldep);
Collections.sort(aa);
final SVNDeltaGenerator deltaGenerator = new SVNDeltaGenerator();
byte[] originalContents = byteArrayOutputStream.toByteArray();
final ISVNEditor commitEditor = svnRepository.getCommitEditor("update dependencies", null);
commitEditor.openRoot(-1);
commitEditor.openFile(file, -1);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
for (String line : aa) {
baos.write(line.getBytes());
baos.write("\n".getBytes());
***REMOVED***
byte[] bytes = baos.toByteArray();
commitEditor.applyTextDelta(file,md5(originalContents));
final String checksum = deltaGenerator.sendDelta(file, new ByteArrayInputStream(originalContents), 0,
new ByteArrayInputStream(bytes), commitEditor, true);
commitEditor.closeFile(file, checksum);
commitEditor.closeEdit();
return aa;
***REMOVED*** finally {
svnRepository.closeSession();
***REMOVED***
***REMOVED***
public static String md5(byte[] contents) {
final byte[] tmp = new byte[1024];
final SVNChecksumInputStream checksumStream = new SVNChecksumInputStream(new ByteArrayInputStream(contents), "md5");
try {
while (checksumStream.read(tmp) > 0) {
***REMOVED***
***REMOVED***
return checksumStream.getDigest();
***REMOVED*** catch (IOException e) {
***REMOVED***never happens
e.printStackTrace();
return null;
***REMOVED*** finally {
SVNFileUtil.closeFile(checksumStream);
***REMOVED***
***REMOVED***
public List<String> checkMatch(String[] lines, List<String> ls) {
Set<String> ss = new HashSet<String>(ls);
ss.addAll(Arrays.asList(lines));
return new ArrayList<>(ss);
***REMOVED***
private static void inheritIO(final InputStream src, final PrintStream dest) {
new Thread(new Runnable() {
public void run() {

View File

@ -32,6 +32,7 @@ import org.gcube.dataanalysis.dataminer.poolmanager.datamodel.Dependency;
***REMOVED***
import org.gcube.dataanalysis.dataminer.poolmanager.datamodel.comparator.HostComparator;
***REMOVED***
import org.tmatesoft.svn.core.SVNException;
public class AnsibleBridge {
private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AnsibleBridge.class);
@ -145,13 +146,13 @@ public class AnsibleBridge {
***REMOVED***
***REMOVED***
public AnsibleWorker applyAlgorithmSetToCluster(AlgorithmSet as, Cluster cluster) throws IOException {
public AnsibleWorker applyAlgorithmSetToCluster(AlgorithmSet as, Cluster cluster, boolean updateSVN) throws IOException, InterruptedException, SVNException {
return applyAlgorithmSetToCluster (as,cluster,UUID.randomUUID().toString());
return applyAlgorithmSetToCluster (as,cluster,UUID.randomUUID().toString(),updateSVN);
***REMOVED***
public AnsibleWorker applyAlgorithmSetToCluster(AlgorithmSet as, Cluster cluster,String uuid) throws IOException {
public AnsibleWorker applyAlgorithmSetToCluster(AlgorithmSet as, Cluster cluster,String uuid, boolean updateSVN) throws IOException, InterruptedException, SVNException {
AnsibleWorker worker = new AnsibleWorker(new File(this.getWorkDir(), uuid));
@ -207,7 +208,7 @@ public class AnsibleBridge {
***REMOVED***System.setErr(console);
worker.apply(ps);
worker.apply(as,ps,updateSVN);
***REMOVED***System.setOut(console);
***REMOVED***worker.apply();
System.out.println("Log stored to to " + n.getAbsolutePath());

View File

@ -39,8 +39,8 @@ import org.tmatesoft.svn.core.SVNException;
public interface PoolManager {
String addAlgorithmToVRE(Algorithm algo, String vre) throws IOException, InterruptedException;
String addAlgorithmToHost(Algorithm algo, String host) throws IOException, InterruptedException;
String addAlgorithmToVRE(Algorithm algo, String vre, boolean svn ) throws IOException, InterruptedException;
String addAlgorithmToHost(Algorithm algo, String host, boolean svn) throws IOException, InterruptedException;
Algorithm extractAlgorithm(String url) throws IOException;

View File

@ -203,28 +203,28 @@ public class RestPoolManager implements PoolManager {
service.addAlgToIs(algo);
***REMOVED***
***REMOVED*** update svn
if (updateSVN){
for (Dependency d : algo.getDependencies()) {
***REMOVED*** if (updateSVN){
***REMOVED*** for (Dependency d : algo.getDependencies()) {
***REMOVED***
***REMOVED*** if (d.getType().equals("os")) {
***REMOVED*** List<String> ls = new LinkedList<String>();
***REMOVED*** ls.add(d.getName());
***REMOVED*** service.updateSVN("r_deb_pkgs.txt", ls);
***REMOVED*** ***REMOVED***
***REMOVED*** if (d.getType().equals("cran")) {
***REMOVED*** List<String> ls = new LinkedList<String>();
***REMOVED*** ls.add(d.getName());
***REMOVED*** service.updateSVN("r_cran_pkgs.txt", ls);
***REMOVED*** ***REMOVED***
***REMOVED*** if (d.getType().equals("github")) {
***REMOVED*** List<String> ls = new LinkedList<String>();
***REMOVED*** ls.add(d.getName());
***REMOVED*** service.updateSVN("r_github_pkgs.txt", ls);
***REMOVED*** ***REMOVED***
***REMOVED*** ***REMOVED***
***REMOVED******REMOVED***
if (d.getType().equals("os")) {
List<String> ls = new LinkedList<String>();
ls.add(d.getName());
service.updateSVN("r_deb_pkgs.txt", ls);
***REMOVED***
if (d.getType().equals("cran")) {
List<String> ls = new LinkedList<String>();
ls.add(d.getName());
service.updateSVN("r_cran_pkgs.txt", ls);
***REMOVED***
if (d.getType().equals("github")) {
List<String> ls = new LinkedList<String>();
ls.add(d.getName());
service.updateSVN("r_github_pkgs.txt", ls);
***REMOVED***
***REMOVED***
***REMOVED***
return service.addAlgorithmToVRE(algo, ScopeProvider.instance.get());
return service.addAlgorithmToVRE(algo, ScopeProvider.instance.get(),updateSVN);
***REMOVED***
@ -252,27 +252,27 @@ public class RestPoolManager implements PoolManager {
***REMOVED***
***REMOVED*** update svn
if (updateSVN){
for (Dependency d : algo.getDependencies()) {
if (d.getType().equals("os")) {
List<String> ls = new LinkedList<String>();
ls.add(d.getName());
service.updateSVN("r_deb_pkgs.txt", ls);
***REMOVED***
if (d.getType().equals("cran")) {
List<String> ls = new LinkedList<String>();
ls.add(d.getName());
service.updateSVN("r_cran_pkgs.txt", ls);
***REMOVED***
if (d.getType().equals("github")) {
List<String> ls = new LinkedList<String>();
ls.add(d.getName());
service.updateSVN("r_github_pkgs.txt", ls);
***REMOVED***
***REMOVED***
***REMOVED***
return service.addAlgorithmToHost(algo, hostname);
***REMOVED*** if (updateSVN){
***REMOVED*** for (Dependency d : algo.getDependencies()) {
***REMOVED***
***REMOVED*** if (d.getType().equals("os")) {
***REMOVED*** List<String> ls = new LinkedList<String>();
***REMOVED*** ls.add(d.getName());
***REMOVED*** service.updateSVN("r_deb_pkgs.txt", ls);
***REMOVED*** ***REMOVED***
***REMOVED*** if (d.getType().equals("cran")) {
***REMOVED*** List<String> ls = new LinkedList<String>();
***REMOVED*** ls.add(d.getName());
***REMOVED*** service.updateSVN("r_cran_pkgs.txt", ls);
***REMOVED*** ***REMOVED***
***REMOVED*** if (d.getType().equals("github")) {
***REMOVED*** List<String> ls = new LinkedList<String>();
***REMOVED*** ls.add(d.getName());
***REMOVED*** service.updateSVN("r_github_pkgs.txt", ls);
***REMOVED*** ***REMOVED***
***REMOVED*** ***REMOVED***
***REMOVED******REMOVED***
return service.addAlgorithmToHost(algo, hostname,updateSVN);
***REMOVED***
private Algorithm getAlgorithm(String algorithm, String vre, String hostname, String name, String description,
@ -348,10 +348,10 @@ public class RestPoolManager implements PoolManager {
***REMOVED*** false);
a.addAlgorithmToHost(
"http:***REMOVED***data.d4science.org/dENQTTMxdjNZcGRpK0NHd2pvU0owMFFzN0VWemw3Zy9HbWJQNStIS0N6Yz0",
"dataminer2-d-d4s.d4science.org",
"http:***REMOVED***data.d4science.org/MnovRjZIdGV5WlB0WXE5NVNaZnRoRVg0SU8xZWpWQlFHbWJQNStIS0N6Yz0",
"dataminer1-devnext.d4science.org",
"ICHTHYOP_MODEL_ONE_BY_ONE", null, "ICHTHYOP_MODEL", "transducerers",
"N",false, false);
"N",false, true);
@ -375,17 +375,7 @@ public class RestPoolManager implements PoolManager {
return null;
***REMOVED***
@Override
public String addAlgorithmToVRE(Algorithm algo, String vre) throws IOException, InterruptedException {
***REMOVED*** TODO Auto-generated method stub
return null;
***REMOVED***
@Override
public String addAlgorithmToHost(Algorithm algo, String hostname) throws IOException, InterruptedException {
***REMOVED*** TODO Auto-generated method stub
return null;
***REMOVED***
@Override
public URL getURLfromWorkerLog(String logUrl) throws MalformedURLException, UnknownHostException {
@ -413,4 +403,21 @@ public class RestPoolManager implements PoolManager {
***REMOVED***
@Override
public String addAlgorithmToVRE(Algorithm algo, String vre, boolean svn) throws IOException, InterruptedException {
***REMOVED*** TODO Auto-generated method stub
return null;
***REMOVED***
@Override
public String addAlgorithmToHost(Algorithm algo, String host, boolean svn)
throws IOException, InterruptedException {
***REMOVED*** TODO Auto-generated method stub
return null;
***REMOVED***
***REMOVED***

View File

@ -476,7 +476,7 @@ public class DataminerPoolManager implements PoolManager {
* @param vre
*
*/
public String addAlgorithmToVRE(Algorithm algorithm, final String vre) throws IOException {
public String addAlgorithmToVRE(Algorithm algorithm, final String vre, final boolean updateSVN) throws IOException {
***REMOVED*** create a fake algorithm set
final AlgorithmSet algoSet = new AlgorithmSet();
algoSet.setName("fake");
@ -488,10 +488,18 @@ public class DataminerPoolManager implements PoolManager {
public void run() {
***REMOVED*** TODO Auto-generated method stub
try {
addAlgorithmsToVRE(algoSet, vre, uuid);
try {
addAlgorithmsToVRE(algoSet, vre, uuid, updateSVN);
***REMOVED*** catch (SVNException e) {
***REMOVED*** TODO Auto-generated catch block
e.printStackTrace();
***REMOVED***
***REMOVED*** catch (IOException e) {
***REMOVED*** TODO Auto-generated catch block
e.printStackTrace();
***REMOVED*** catch (InterruptedException e) {
***REMOVED*** TODO Auto-generated catch block
e.printStackTrace();
***REMOVED***
***REMOVED***
***REMOVED***).start();
@ -501,7 +509,7 @@ public class DataminerPoolManager implements PoolManager {
return uuid;
***REMOVED***
public String addAlgorithmToHost(Algorithm algorithm, final String hostname) throws IOException {
public String addAlgorithmToHost(Algorithm algorithm, final String hostname, final boolean updateSVN) throws IOException {
***REMOVED*** create a fake algorithm set
final AlgorithmSet algoSet = new AlgorithmSet();
algoSet.setName("fake");
@ -513,10 +521,16 @@ public class DataminerPoolManager implements PoolManager {
public void run() {
***REMOVED*** TODO Auto-generated method stub
try {
addAlgorithmsToHost(algoSet, hostname, uuid);
addAlgorithmsToHost(algoSet, hostname, uuid, updateSVN);
***REMOVED*** catch (IOException e) {
***REMOVED*** TODO Auto-generated catch block
e.printStackTrace();
***REMOVED*** catch (InterruptedException e) {
***REMOVED*** TODO Auto-generated catch block
e.printStackTrace();
***REMOVED*** catch (SVNException e) {
***REMOVED*** TODO Auto-generated catch block
e.printStackTrace();
***REMOVED***
***REMOVED***
***REMOVED***).start();
@ -537,7 +551,7 @@ public class DataminerPoolManager implements PoolManager {
return new File(n.getPath()).toURI().toURL();
***REMOVED***
public String addAlgorithmsToVRE(AlgorithmSet algorithms, String vre, String uuid) throws IOException {
public String addAlgorithmsToVRE(AlgorithmSet algorithms, String vre, String uuid, boolean updateSVN) throws IOException, InterruptedException, SVNException {
***REMOVED*** create the cluster (dataminers in the vre)
Cluster cluster = new Cluster();
@ -548,11 +562,11 @@ public class DataminerPoolManager implements PoolManager {
***REMOVED*** apply the changes
AnsibleBridge a = new AnsibleBridge();
return a.applyAlgorithmSetToCluster(algorithms, cluster, uuid).getWorkerId();
return a.applyAlgorithmSetToCluster(algorithms, cluster, uuid, updateSVN).getWorkerId();
***REMOVED***
public String addAlgorithmsToHost(AlgorithmSet algorithms, String hostname, String uuid) throws IOException {
public String addAlgorithmsToHost(AlgorithmSet algorithms, String hostname, String uuid, boolean updateSVN) throws IOException, InterruptedException, SVNException {
***REMOVED*** create the cluster (dataminers in the vre)
Cluster cluster = new Cluster();
@ -563,7 +577,7 @@ public class DataminerPoolManager implements PoolManager {
***REMOVED*** apply the changes
AnsibleBridge a = new AnsibleBridge();
return a.applyAlgorithmSetToCluster(algorithms, cluster, uuid).getWorkerId();
return a.applyAlgorithmSetToCluster(algorithms, cluster, uuid,updateSVN ).getWorkerId();
***REMOVED***
@ -571,80 +585,9 @@ public class DataminerPoolManager implements PoolManager {
private SVNRepository getSvnRepository(String url) throws SVNException {
SVNRepository repository = SVNRepositoryFactory.create(SVNURL.parseURIEncoded(url));
ISVNAuthenticationManager authManager = SVNWCUtil.createDefaultAuthenticationManager();
repository.setAuthenticationManager(authManager);
System.out.println(repository.getLocation());
return repository;
***REMOVED***
@Override
public List<String> updateSVN(String file, List<String> ldep) throws SVNException, IOException {
final SVNRepository svnRepository = this.getSvnRepository(
"https:***REMOVED***svn.d4science.research-infrastructures.eu/gcube/trunk/data-analysis/RConfiguration/RPackagesManagement/");
try {
final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
svnRepository.getFile(file, SVNRepository.INVALID_REVISION, null, byteArrayOutputStream);
String lines[] = byteArrayOutputStream.toString().split("\\r?\\n");
List<String> aa = this.checkMatch(lines, ldep);
Collections.sort(aa);
final SVNDeltaGenerator deltaGenerator = new SVNDeltaGenerator();
byte[] originalContents = byteArrayOutputStream.toByteArray();
final ISVNEditor commitEditor = svnRepository.getCommitEditor("update dependencies", null);
commitEditor.openRoot(-1);
commitEditor.openFile(file, -1);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
for (String line : aa) {
baos.write(line.getBytes());
baos.write("\n".getBytes());
***REMOVED***
byte[] bytes = baos.toByteArray();
commitEditor.applyTextDelta(file,md5(originalContents));
final String checksum = deltaGenerator.sendDelta(file, new ByteArrayInputStream(originalContents), 0,
new ByteArrayInputStream(bytes), commitEditor, true);
commitEditor.closeFile(file, checksum);
commitEditor.closeEdit();
return aa;
***REMOVED*** finally {
svnRepository.closeSession();
***REMOVED***
***REMOVED***
public static String md5(byte[] contents) {
final byte[] tmp = new byte[1024];
final SVNChecksumInputStream checksumStream = new SVNChecksumInputStream(new ByteArrayInputStream(contents), "md5");
try {
while (checksumStream.read(tmp) > 0) {
***REMOVED***
***REMOVED***
return checksumStream.getDigest();
***REMOVED*** catch (IOException e) {
***REMOVED***never happens
e.printStackTrace();
return null;
***REMOVED*** finally {
SVNFileUtil.closeFile(checksumStream);
***REMOVED***
***REMOVED***
public List<String> checkMatch(String[] lines, List<String> ls) {
Set<String> ss = new HashSet<String>(ls);
ss.addAll(Arrays.asList(lines));
return new ArrayList<>(ss);
***REMOVED***
public Algorithm extractAlgorithm(String url) throws IOException {
@ -778,4 +721,12 @@ public class DataminerPoolManager implements PoolManager {
***REMOVED***
***REMOVED***
@Override
public List<String> updateSVN(String file, List<String> ldep) throws SVNException, IOException {
***REMOVED*** TODO Auto-generated method stub
return null;
***REMOVED***
***REMOVED***

View File

@ -220,7 +220,7 @@ public class DataminerPoolManagerTest {
ensemble.addDependency(d);
algorithms.addAlgorithm(ensemble);
new DataminerPoolManager().addAlgorithmsToVRE(algorithms, "/gcube/devNext/NextNext", "test"+UUID.randomUUID());
new DataminerPoolManager().addAlgorithmsToVRE(algorithms, "/gcube/devNext/NextNext", "test"+UUID.randomUUID(), false);
***REMOVED***

View File

@ -220,7 +220,7 @@ public class DataminerPoolManagerTest {
ensemble.addDependency(d);
algorithms.addAlgorithm(ensemble);
new DataminerPoolManager().addAlgorithmsToVRE(algorithms, "/gcube/devNext/NextNext", "test"+UUID.randomUUID());
new DataminerPoolManager().addAlgorithmsToVRE(algorithms, "/gcube/devNext/NextNext", "test"+UUID.randomUUID(), false);
***REMOVED***