dnet-applications/apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/utils/ZeppelinClient.java

192 lines
6.9 KiB
Java

package eu.dnetlib.data.mdstore.manager.utils;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.client.RestTemplate;
import eu.dnetlib.data.mdstore.manager.exceptions.MDStoreManagerException;
import eu.dnetlib.data.mdstore.manager.utils.zeppelin.ListResponse;
import eu.dnetlib.data.mdstore.manager.utils.zeppelin.Note;
import eu.dnetlib.data.mdstore.manager.utils.zeppelin.StringResponse;
@Component
public class ZeppelinClient {
@Autowired
private DatabaseUtils databaseUtils;
@Value("${dhp.mdstore-manager.hadoop.zeppelin.login}")
private String zeppelinLogin;
@Value("${dhp.mdstore-manager.hadoop.zeppelin.password}")
private String zeppelinPassword;
@Value("${dhp.mdstore-manager.hadoop.zeppelin.base-url}")
private String zeppelinBaseUrl;
@Value("${dhp.mdstore-manager.hadoop.zeppelin.name-prefix}")
private String zeppelinNamePrefix;
private static final Log log = LogFactory.getLog(ZeppelinClient.class);
public String submitNode(final Note n) throws MDStoreManagerException {
final List<String> jsessionIds = calculateJsessionIDs();
final Optional<String> noteUrl = jsessionIds.stream()
.map(this::listNotes)
.filter(Objects::nonNull)
.flatMap(List::stream)
.filter(map -> n.getName().equals(map.get("name")))
.map(map -> zeppelinBaseUrl + "/#/notebook/" + map.get("id"))
.findFirst();
if (noteUrl.isPresent()) {
// TODO the paragraph "configuration" should be updated
return noteUrl.get();
}
return jsessionIds.stream()
.map(jid -> registerNote(n, jid))
.filter(Objects::nonNull)
.map(id -> zeppelinBaseUrl + "/#/notebook/" + id)
.findFirst()
.orElseThrow(() -> new MDStoreManagerException("Zeppelin note not uploaded"));
}
private List<Map<String, String>> listNotes(final String jsessionid) {
final String url = zeppelinBaseUrl + "/api/notebook;JSESSIONID=" + jsessionid;
log.debug("Performing POST: " + url);
final ResponseEntity<ListResponse> res = new RestTemplate().getForEntity(url, ListResponse.class);
if (res.getStatusCode() != HttpStatus.OK) {
log.debug("Zeppelin API failed with HTTP error: " + res);
return null;
} else if (res.getBody() == null) {
log.debug("Zeppelin API returned a null response");
return null;
} else if (!res.getBody().getStatus().equals("OK")) {
log.debug("Registration of zeppelin note failed: " + res.getBody());
return null;
} else {
return res.getBody().getBody();
}
}
private String registerNote(final Note n, final String jsessionid) {
final String url = zeppelinBaseUrl + "/api/notebook;JSESSIONID=" + jsessionid;
log.debug("Performing POST: " + url);
final ResponseEntity<StringResponse> res = new RestTemplate().postForEntity(url, n, StringResponse.class);
if (res.getStatusCode() != HttpStatus.OK) {
log.debug("Zeppelin API failed with HTTP error: " + res);
return null;
} else if (res.getBody() == null) {
log.debug("Zeppelin API returned a null response");
return null;
} else if (!res.getBody().getStatus().equals("OK")) {
log.debug("Registration of zeppelin note failed: " + res.getBody());
return null;
} else {
return res.getBody().getBody();
}
}
private List<String> calculateJsessionIDs() throws MDStoreManagerException {
final HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
final MultiValueMap<String, String> map = new LinkedMultiValueMap<>();
map.add("userName", zeppelinLogin);
map.add("password", zeppelinPassword);
final HttpEntity<MultiValueMap<String, String>> request = new HttpEntity<>(map, headers);
final String url = zeppelinBaseUrl + "/api/login";
final ResponseEntity<?> res = new RestTemplate().postForEntity(url, request, Object.class);
if (res.getStatusCode() != HttpStatus.OK) {
log.error("Zeppelin API: login failed with HTTP error: " + res);
throw new MDStoreManagerException("Zeppelin API: login failed with HTTP error: " + res);
} else if (!res.getHeaders().containsKey(HttpHeaders.SET_COOKIE)) {
log.error("Zeppelin API: login failed (missing SET_COOKIE header)");
throw new MDStoreManagerException("Zeppelin API: login failed (missing SET_COOKIE header)");
} else {
return res.getHeaders()
.get(HttpHeaders.SET_COOKIE)
.stream()
.map(s -> s.split(";"))
.flatMap(Arrays::stream)
.map(String::trim)
.filter(s -> s.startsWith("JSESSIONID="))
.map(s -> StringUtils.removeStart(s, "JSESSIONID="))
.filter(s -> !s.equalsIgnoreCase("deleteMe"))
.distinct()
.collect(Collectors.toList());
}
}
public Note generateNote(final String mdId) throws MDStoreManagerException, IOException {
final Note note = new Note(zeppelinNamePrefix + "/" + mdId);
note.addParagraph("Configuration", confParagraph(mdId));
note.addParagraph("First Record", getClass().getResource("/zeppelin/firstRecord.py"));
note.addParagraph("Analyze the years in 'date' field", getClass().getResource("/zeppelin/analyzeYears.py"));
note.addParagraph("Analyze the types in 'type' field", getClass().getResource("/zeppelin/analyzeTypes.py"));
return note;
}
private String confParagraph(final String mdId) throws MDStoreManagerException, IOException {
final String currentVersion = databaseUtils.findMdStore(mdId).getCurrentVersion();
final String versions = StreamSupport.stream(databaseUtils.listVersions(mdId).spliterator(), false)
.filter(v -> !v.isWriting())
.filter(v -> v.getLastUpdate() != null)
.sorted((v1, v2) -> {
if (v1.getId().equals(currentVersion)) {
return -1;
} else if (v2.getId().equals(currentVersion)) {
return 1;
} else {
return v1.getLastUpdate().compareTo(v2.getLastUpdate());
}
})
.map(v -> {
final String path = v.getHdfsPath() + "/store";
final String id = v.getId().equals(currentVersion) ? v.getId() + " (main)" : v.getId();
return String.format("(\"%s\", \"%s\")", path, id);
})
.collect(Collectors.joining());
return IOUtils.toString(getClass().getResourceAsStream("/zeppelin/conf.tmpl.py"))
.replaceAll("__MDSTORE_ID__", mdId)
.replaceAll("__LIST_MDSTORE_VERSIONS__", versions);
}
}