package eu.dnetlib.data.mdstore.manager.utils; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import javax.annotation.PostConstruct; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringEscapeUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; import org.springframework.web.client.RestTemplate; import eu.dnetlib.data.mdstore.manager.exceptions.MDStoreManagerException; import eu.dnetlib.data.mdstore.manager.utils.zeppelin.HasStatus; import eu.dnetlib.data.mdstore.manager.utils.zeppelin.ListResponse; import eu.dnetlib.data.mdstore.manager.utils.zeppelin.Note; import eu.dnetlib.data.mdstore.manager.utils.zeppelin.Paragraph; import eu.dnetlib.data.mdstore.manager.utils.zeppelin.SimpleResponse; import eu.dnetlib.data.mdstore.manager.utils.zeppelin.StringResponse; import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo; @Component public class ZeppelinClient { @Value("${dhp.mdstore-manager.hadoop.zeppelin.login}") private String zeppelinLogin; @Value("${dhp.mdstore-manager.hadoop.zeppelin.password}") private String zeppelinPassword; @Value("${dhp.mdstore-manager.hadoop.zeppelin.base-url}") private String zeppelinBaseUrl; @Value("${dhp.mdstore-manager.hadoop.zeppelin.name-prefix}") private String zeppelinNamePrefix; private static final Log log = LogFactory.getLog(ZeppelinClient.class); private static final Map> DEFAULT_RIGHTS = new LinkedHashMap<>(); private static final Integer MAX_NUMBER_OF_MD_NOTES = 2; @PostConstruct public void init() { DEFAULT_RIGHTS.put("owners", Arrays.asList(zeppelinLogin)); DEFAULT_RIGHTS.put("readers", new ArrayList<>()); // ALL DEFAULT_RIGHTS.put("runners", new ArrayList<>()); // ALL DEFAULT_RIGHTS.put("writers", new ArrayList<>()); // ALL } private String jsessionid; public String zeppelinNote(final String note, final MDStoreWithInfo mdstore, final String currentVersionPath) throws MDStoreManagerException { if (notConfigured()) { throw new MDStoreManagerException("A zeppelin property is empty"); } final String newName = StringUtils.join(Arrays.asList(zeppelinNamePrefix, "notes", mdstore.getDatasourceName().replaceAll("/", "-"), mdstore.getApiId() .replaceAll("/", "-"), note.replaceAll("/", "-"), mdstore.getCurrentVersion().replaceAll("/", "-")), "/"); final List> notes = listNotes(); final Optional oldNoteId = notes.stream() .filter(Objects::nonNull) .filter(map -> newName.equals(map.get("name"))) .map(map -> map.get("id")) .findFirst(); if (oldNoteId.isPresent()) { log.debug("Returning existing note: " + oldNoteId.get()); return zeppelinBaseUrl + "/#/notebook/" + oldNoteId.get(); } final String templateName = zeppelinNamePrefix + "/templates/" + note; final String templateNoteId = notes.stream() .filter(map -> map.get("name").equals(templateName)) .map(map -> map.get("id")) .findFirst() .orElseThrow(() -> new MDStoreManagerException("Template Note not found: " + templateName)); final String newId = cloneNote(templateNoteId, newName, mdstore, currentVersionPath); return zeppelinBaseUrl + "/#/notebook/" + newId; } public List listTemplates() { final String prefix = zeppelinNamePrefix + "/templates/"; if (notConfigured()) { return new ArrayList<>(); } else { return listNotes().stream() .map(map -> map.get("name")) .filter(s -> s.startsWith(prefix)) .map(s -> StringUtils.substringAfter(s, prefix)) .sorted() .collect(Collectors.toList()); } } private List> listNotes() { return callApi(HttpMethod.GET, "notebook", ListResponse.class, null).getBody(); } private String cloneNote(final String noteId, final String newName, final MDStoreWithInfo mdstore, final String currentVersionPath) throws MDStoreManagerException { final String newId = callApi(HttpMethod.POST, "notebook/" + noteId, StringResponse.class, new Note(newName)).getBody(); callApi(HttpMethod.POST, "notebook/" + newId + "/paragraph", StringResponse.class, confParagraph(mdstore, currentVersionPath)).getBody(); callApi(HttpMethod.PUT, "notebook/" + newId + "/permissions", SimpleResponse.class, DEFAULT_RIGHTS); log.info("New note created, id: " + newId + ", name: " + newName); return newId; } private Paragraph confParagraph(final MDStoreWithInfo mdstore, final String currentVersionPath) throws MDStoreManagerException { try { final String code = IOUtils.toString(getClass().getResourceAsStream("/zeppelin/paragraph_conf.tmpl"), StandardCharsets.UTF_8) .replaceAll("__DS_NAME__", StringEscapeUtils.escapeJava(mdstore.getDatasourceName())) .replaceAll("__DS_ID__", StringEscapeUtils.escapeJava(mdstore.getDatasourceId())) .replaceAll("__API_ID__", StringEscapeUtils.escapeJava(mdstore.getApiId())) .replaceAll("__MDSTORE_ID__", mdstore.getId()) .replaceAll("__VERSION__", mdstore.getCurrentVersion()) .replaceAll("__PATH__", currentVersionPath); return new Paragraph("Configuration", code, 0); } catch (final IOException e) { log.error("Error preparing configuration paragraph", e); throw new MDStoreManagerException("Error preparing configuration paragraph", e); } } @Scheduled(fixedRate = 12 * 60 * 60 * 1000) // 12 hours public void cleanExpiredNotes() { if (notConfigured()) { return; } try { // I sort the notes according to the version datestamp (more recent first) final List> notes = listNotes() .stream() .filter(n -> n.get("name").startsWith(zeppelinNamePrefix + "/notes/")) .sorted((o1, o2) -> StringUtils.compare(o2.get("name"), o1.get("name"))) .collect(Collectors.toList()); final Map map = new HashMap<>(); for (final Map n : notes) { final String firstPart = StringUtils.substringBeforeLast(n.get("name"), "-"); if (!map.containsKey(firstPart)) { log.debug("Evaluating note " + n.get("name") + " for deletion: CONFIRMED"); map.put(firstPart, 1); } else if (map.get(firstPart) < MAX_NUMBER_OF_MD_NOTES) { log.debug("Evaluating note " + n.get("name") + " for deletion: CONFIRMED"); map.put(firstPart, map.get(firstPart) + 1); } else { log.debug("Evaluating note " + n.get("name") + " for deletion: TO_DELETE"); callApi(HttpMethod.DELETE, "notebook/" + n.get("id"), SimpleResponse.class, null); } } } catch (final Exception e) { log.error("Error cleaning expired notes", e); } } private T callApi(final HttpMethod method, final String api, final Class resClazz, final Object objRequest) { if (jsessionid == null) { final T res = findNewJsessionId(method, api, resClazz, objRequest); if (res != null) { return res; } } else { try { return callApi(method, api, resClazz, objRequest, jsessionid); } catch (final MDStoreManagerException e) { final T res = findNewJsessionId(method, api, resClazz, objRequest); if (res != null) { return res; } } } throw new RuntimeException("All attempted calls are failed"); } @SuppressWarnings("unchecked") private T callApi(final HttpMethod method, final String api, final Class resClazz, final Object objRequest, final String jsessionid) throws MDStoreManagerException { final String url = String.format("%s/api/%s;JSESSIONID=%s", zeppelinBaseUrl, api, jsessionid); final RestTemplate restTemplate = new RestTemplate(); ResponseEntity res = null; switch (method) { case GET: log.debug("Performing GET: " + url); res = restTemplate.getForEntity(url, resClazz); break; case POST: log.debug("Performing POST: " + url); res = restTemplate.postForEntity(url, objRequest, resClazz); break; case PUT: log.debug("Performing PUT: " + url); restTemplate.put(url, objRequest); break; case DELETE: log.debug("Performing DELETE: " + url); restTemplate.delete(url); break; default: throw new RuntimeException("Unsupported method: " + method); } if (method == HttpMethod.PUT || method == HttpMethod.DELETE) { return (T) new SimpleResponse("OK"); } else if (res == null) { log.error("NULL response from the API"); throw new MDStoreManagerException("NULL response from the API"); } else if (res.getStatusCode() != HttpStatus.OK) { log.error("Zeppelin API failed with HTTP error: " + res); throw new MDStoreManagerException("Zeppelin API failed with HTTP error: " + res); } else if (res.getBody() == null) { log.error("Zeppelin API returned a null response"); throw new MDStoreManagerException("Zeppelin API returned a null response"); } else if (!res.getBody().getStatus().equals("OK")) { log.error("Zeppelin API Operation failed: " + res.getBody()); throw new MDStoreManagerException("Registration of zeppelin note failed: " + res.getBody()); } else { return res.getBody(); } } private T findNewJsessionId(final HttpMethod method, final String api, final Class resClazz, final Object objRequest) { for (final String id : obtainJsessionIDs()) { try { final T res = callApi(method, api, resClazz, objRequest, id); setJsessionid(id); return res; } catch (final MDStoreManagerException e) { log.warn("Skipping invalid jsessionid: " + id); } } return null; } private Set obtainJsessionIDs() { final HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED); final MultiValueMap map = new LinkedMultiValueMap<>(); map.add("userName", zeppelinLogin); map.add("password", zeppelinPassword); final HttpEntity> request = new HttpEntity<>(map, headers); final String url = zeppelinBaseUrl + "/api/login"; final ResponseEntity res = new RestTemplate().postForEntity(url, request, Object.class); if (res.getStatusCode() != HttpStatus.OK) { log.error("Zeppelin API: login failed with HTTP error: " + res); throw new RuntimeException("Zeppelin API: login failed with HTTP error: " + res); } else if (!res.getHeaders().containsKey(HttpHeaders.SET_COOKIE)) { log.error("Zeppelin API: login failed (missing SET_COOKIE header)"); throw new RuntimeException("Zeppelin API: login failed (missing SET_COOKIE header)"); } else { return res.getHeaders() .get(HttpHeaders.SET_COOKIE) .stream() .map(s -> s.split(";")) .flatMap(Arrays::stream) .map(String::trim) .filter(s -> s.startsWith("JSESSIONID=")) .map(s -> StringUtils.removeStart(s, "JSESSIONID=")) .filter(s -> !s.equalsIgnoreCase("deleteMe")) .collect(Collectors.toSet()); } } public String getJsessionid() { return jsessionid; } public void setJsessionid(final String jsessionid) { this.jsessionid = jsessionid; } private boolean notConfigured() { return StringUtils.isAnyBlank(zeppelinBaseUrl, zeppelinLogin, zeppelinPassword, zeppelinNamePrefix); } }