This commit is contained in:
Michele Artini 2021-03-11 12:44:25 +01:00
parent e4fea5585b
commit c6d06856d2
13 changed files with 224 additions and 196 deletions

View File

@ -1,21 +1,18 @@
package eu.dnetlib.data.mdstore.manager.controller;
import java.io.IOException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.servlet.ModelAndView;
import eu.dnetlib.data.mdstore.manager.exceptions.MDStoreManagerException;
import eu.dnetlib.data.mdstore.manager.utils.ControllerUtils;
import eu.dnetlib.data.mdstore.manager.utils.DatabaseUtils;
import eu.dnetlib.data.mdstore.manager.utils.ZeppelinClient;
import eu.dnetlib.data.mdstore.manager.utils.zeppelin.Note;
@Controller
@RequestMapping("/zeppelin")
@ -24,16 +21,14 @@ public class ZeppelinController {
@Autowired
private ZeppelinClient zeppelinClient;
@RequestMapping("/{mdId}/go")
public String goToZeppelin(@PathVariable final String mdId) throws IOException, MDStoreManagerException {
final Note note = zeppelinClient.generateNote(mdId);
final String url = zeppelinClient.submitNode(note);
return "redirect:" + url;
}
@Autowired
private DatabaseUtils databaseUtils;
@RequestMapping("/{mdId}/note")
public @ResponseBody Note showNote(@PathVariable final String mdId) throws IOException, MDStoreManagerException {
return zeppelinClient.generateNote(mdId);
@RequestMapping("/{mdId}/{note}")
public String goToZeppelin(@PathVariable final String mdId, final @PathVariable String note) throws MDStoreManagerException {
final String currentVersion = databaseUtils.findMdStore(mdId).getCurrentVersion();
final String path = databaseUtils.findVersion(currentVersion).getHdfsPath() + "/store";
return "redirect:" + zeppelinClient.zeppelinNote(note, mdId, currentVersion, path);
}
@ExceptionHandler(Exception.class)

View File

@ -6,14 +6,11 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
@ -28,14 +25,12 @@ import org.springframework.web.client.RestTemplate;
import eu.dnetlib.data.mdstore.manager.exceptions.MDStoreManagerException;
import eu.dnetlib.data.mdstore.manager.utils.zeppelin.ListResponse;
import eu.dnetlib.data.mdstore.manager.utils.zeppelin.Note;
import eu.dnetlib.data.mdstore.manager.utils.zeppelin.Paragraph;
import eu.dnetlib.data.mdstore.manager.utils.zeppelin.StringResponse;
@Component
public class ZeppelinClient {
@Autowired
private DatabaseUtils databaseUtils;
@Value("${dhp.mdstore-manager.hadoop.zeppelin.login}")
private String zeppelinLogin;
@ -50,73 +45,52 @@ public class ZeppelinClient {
private static final Log log = LogFactory.getLog(ZeppelinClient.class);
public String submitNode(final Note n) throws MDStoreManagerException {
public String zeppelinNote(final String note, final String mdId, final String currentVersion, final String currentVersionPath)
throws MDStoreManagerException {
final String jsessionid = obtainJsessionID();
final List<String> jsessionIds = calculateJsessionIDs();
final String newName = zeppelinNamePrefix + "/notes/" + note + "/" + currentVersion;
final Optional<String> noteUrl = jsessionIds.stream()
.map(this::listNotes)
final Optional<String> oldNoteId = listNotes(jsessionid).stream()
.filter(Objects::nonNull)
.flatMap(List::stream)
.filter(map -> n.getName().equals(map.get("name")))
.map(map -> zeppelinBaseUrl + "/#/notebook/" + map.get("id"))
.filter(map -> newName.equals(map.get("name")))
.map(map -> map.get("id"))
.findFirst();
if (noteUrl.isPresent()) {
// TODO the paragraph "configuration" should be updated
return noteUrl.get();
if (oldNoteId.isPresent()) {
log.info("Returning existing note: " + oldNoteId.get());
return zeppelinBaseUrl + "/#/notebook/" + oldNoteId.get();
}
return jsessionIds.stream()
.map(jid -> registerNote(n, jid))
.filter(Objects::nonNull)
.map(id -> zeppelinBaseUrl + "/#/notebook/" + id)
.findFirst()
.orElseThrow(() -> new MDStoreManagerException("Zeppelin note not uploaded"));
}
final String templateNoteId = findTemplateNoteId(note, jsessionid);
private List<Map<String, String>> listNotes(final String jsessionid) {
final String url = zeppelinBaseUrl + "/api/notebook;JSESSIONID=" + jsessionid;
log.debug("Performing POST: " + url);
final String newId = cloneNote(templateNoteId, newName, jsessionid);
final ResponseEntity<ListResponse> res = new RestTemplate().getForEntity(url, ListResponse.class);
log.info("New note created, id: " + newId + ", name: " + newName);
if (res.getStatusCode() != HttpStatus.OK) {
log.debug("Zeppelin API failed with HTTP error: " + res);
return null;
} else if (res.getBody() == null) {
log.debug("Zeppelin API returned a null response");
return null;
} else if (!res.getBody().getStatus().equals("OK")) {
log.debug("Registration of zeppelin note failed: " + res.getBody());
return null;
} else {
return res.getBody().getBody();
}
addParagraph(newId, confParagraph(mdId, currentVersion, currentVersionPath), jsessionid);
return zeppelinBaseUrl + "/#/notebook/" + newId;
}
private String registerNote(final Note n, final String jsessionid) {
final String url = zeppelinBaseUrl + "/api/notebook;JSESSIONID=" + jsessionid;
log.debug("Performing POST: " + url);
// TODO: prepare the cron job
public void cleanExpiredNotes() {
try {
final String jsessionid = obtainJsessionID();
final ResponseEntity<StringResponse> res = new RestTemplate().postForEntity(url, n, StringResponse.class);
if (res.getStatusCode() != HttpStatus.OK) {
log.debug("Zeppelin API failed with HTTP error: " + res);
return null;
} else if (res.getBody() == null) {
log.debug("Zeppelin API returned a null response");
return null;
} else if (!res.getBody().getStatus().equals("OK")) {
log.debug("Registration of zeppelin note failed: " + res.getBody());
return null;
} else {
return res.getBody().getBody();
for (final Map<String, String> n : listNotes(jsessionid)) {
final String id = n.get("id");
if (n.get("name").startsWith(zeppelinNamePrefix + "/notes/") && isExpired(id, jsessionid)) {
deleteNote(id, jsessionid);
}
}
} catch (final Exception e) {
log.error("Error cleaning expired notes", e);
}
}
private List<String> calculateJsessionIDs() throws MDStoreManagerException {
private String obtainJsessionID() throws MDStoreManagerException {
final HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
@ -146,46 +120,124 @@ public class ZeppelinClient {
.map(s -> StringUtils.removeStart(s, "JSESSIONID="))
.filter(s -> !s.equalsIgnoreCase("deleteMe"))
.distinct()
.collect(Collectors.toList());
.filter(this::testConnection)
.findFirst()
.orElseThrow(() -> new MDStoreManagerException("Zeppelin API: login failed (invalid jsessionid)"));
}
}
public Note generateNote(final String mdId) throws MDStoreManagerException, IOException {
final Note note = new Note(zeppelinNamePrefix + "/" + mdId);
note.addParagraph("Configuration", confParagraph(mdId));
note.addParagraph("First Record", getClass().getResource("/zeppelin/firstRecord.py"));
note.addParagraph("Analyze the years in 'date' field", getClass().getResource("/zeppelin/analyzeYears.py"));
note.addParagraph("Analyze the types in 'type' field", getClass().getResource("/zeppelin/analyzeTypes.py"));
private boolean testConnection(final String jsessionid) {
return note;
final String url = zeppelinBaseUrl + "/api/notebook;JSESSIONID=" + jsessionid;
log.info("Performing GET: " + url);
final ResponseEntity<ListResponse> res = new RestTemplate().getForEntity(url, ListResponse.class);
if (res.getStatusCode() != HttpStatus.OK) {
return false;
} else if (res.getBody() == null) {
return false;
} else if (!res.getBody().getStatus().equals("OK")) {
return false;
} else {
log.info("Connected to zeppelin: " + res.getBody());
log.info("Found JSESSIONID: " + jsessionid);
return true;
}
}
private String confParagraph(final String mdId) throws MDStoreManagerException, IOException {
final String currentVersion = databaseUtils.findMdStore(mdId).getCurrentVersion();
private List<Map<String, String>> listNotes(final String jsessionid) throws MDStoreManagerException {
final String url = zeppelinBaseUrl + "/api/notebook;JSESSIONID=" + jsessionid;
log.info("Performing GET: " + url);
final String versions = StreamSupport.stream(databaseUtils.listVersions(mdId).spliterator(), false)
.filter(v -> !v.isWriting())
.filter(v -> v.getLastUpdate() != null)
.sorted((v1, v2) -> {
if (v1.getId().equals(currentVersion)) {
return -1;
} else if (v2.getId().equals(currentVersion)) {
return 1;
} else {
return v1.getLastUpdate().compareTo(v2.getLastUpdate());
}
})
.map(v -> {
final String path = v.getHdfsPath() + "/store";
final String id = v.getId().equals(currentVersion) ? v.getId() + " (main)" : v.getId();
return String.format("(\"%s\", \"%s\")", path, id);
})
.collect(Collectors.joining());
final ResponseEntity<ListResponse> res = new RestTemplate().getForEntity(url, ListResponse.class);
return IOUtils.toString(getClass().getResourceAsStream("/zeppelin/conf.tmpl.py"))
.replaceAll("__MDSTORE_ID__", mdId)
.replaceAll("__LIST_MDSTORE_VERSIONS__", versions);
if (res.getStatusCode() != HttpStatus.OK) {
log.error("Zeppelin API failed with HTTP error: " + res);
throw new MDStoreManagerException("Zeppelin API failed with HTTP error: " + res);
} else if (res.getBody() == null) {
log.error("Zeppelin API returned a null response");
throw new MDStoreManagerException("Zeppelin API returned a null response");
} else if (!res.getBody().getStatus().equals("OK")) {
log.error("Registration of zeppelin note failed: " + res.getBody());
throw new MDStoreManagerException("Registration of zeppelin note failed: " + res.getBody());
} else {
return res.getBody().getBody();
}
}
private String findTemplateNoteId(final String noteTemplate, final String jsessionid) throws MDStoreManagerException {
final String templateName = zeppelinNamePrefix + "/templates/" + noteTemplate;
return listNotes(jsessionid).stream()
.filter(map -> map.get("name").equals(templateName))
.map(map -> map.get("id"))
.findFirst()
.orElseThrow(() -> new MDStoreManagerException("Template Note not found: " + templateName));
}
private String cloneNote(final String noteId, final String newName, final String jsessionid) throws MDStoreManagerException {
final String url = zeppelinBaseUrl + "/api/notebook/" + noteId + ";JSESSIONID=" + jsessionid;
log.debug("Performing POST: " + url);
final ResponseEntity<StringResponse> res = new RestTemplate().postForEntity(url, new Note(newName), StringResponse.class);
if (res.getStatusCode() != HttpStatus.OK) {
log.error("Zeppelin API failed with HTTP error: " + res);
throw new MDStoreManagerException("Zeppelin API failed with HTTP error: " + res);
} else if (res.getBody() == null) {
log.error("Zeppelin API returned a null response");
throw new MDStoreManagerException("Zeppelin API returned a null response");
} else if (!res.getBody().getStatus().equals("OK")) {
log.error("Registration of zeppelin note failed: " + res.getBody());
throw new MDStoreManagerException("Registration of zeppelin note failed: " + res.getBody());
} else {
return res.getBody().getBody();
}
}
private Paragraph confParagraph(final String mdId, final String currentVersion, final String currentVersionPath) throws MDStoreManagerException {
try {
final String code = IOUtils.toString(getClass().getResourceAsStream("/zeppelin/conf.tmpl.py"))
.replaceAll("__MDSTORE_ID__", mdId)
.replaceAll("__VERSION__", currentVersion)
.replaceAll("__PATH__", currentVersionPath);
return new Paragraph("Configuration", code, 0);
} catch (final IOException e) {
log.error("Error preparing configuration paragraph", e);
throw new MDStoreManagerException("Error preparing configuration paragraph", e);
}
}
private String addParagraph(final String noteId, final Paragraph paragraph, final String jsessionid) throws MDStoreManagerException {
final String url = zeppelinBaseUrl + "/api/notebook/" + noteId + "/paragraph;JSESSIONID=" + jsessionid;
log.debug("Performing POST: " + url);
final ResponseEntity<StringResponse> res = new RestTemplate().postForEntity(url, paragraph, StringResponse.class);
if (res.getStatusCode() != HttpStatus.OK) {
log.error("Zeppelin API failed with HTTP error: " + res);
throw new MDStoreManagerException("Zeppelin API failed with HTTP error: " + res);
} else if (res.getBody() == null) {
log.error("Zeppelin API returned a null response");
throw new MDStoreManagerException("Zeppelin API returned a null response");
} else if (!res.getBody().getStatus().equals("OK")) {
log.error("Registration of zeppelin note failed: " + res.getBody());
throw new MDStoreManagerException("Registration of zeppelin note failed: " + res.getBody());
} else {
return res.getBody().getBody();
}
}
private void deleteNote(final String id, final String jsessionid) {
final String url = zeppelinBaseUrl + "/api/notebook/" + id + ";JSESSIONID=" + jsessionid;
log.debug("Performing DELETE: " + url);
new RestTemplate().delete(url);
}
private boolean isExpired(final String id, final String jsessionid) {
// TODO Auto-generated method stub
return false;
}
}

View File

@ -0,0 +1,39 @@
package eu.dnetlib.data.mdstore.manager.utils.zeppelin;
import java.util.Map;
public class MapResponse {
private String status;
private String message;
private Map<String, String> body;
public String getStatus() {
return status;
}
public void setStatus(final String status) {
this.status = status;
}
public String getMessage() {
return message;
}
public void setMessage(final String message) {
this.message = message;
}
public Map<String, String> getBody() {
return body;
}
public void setBody(final Map<String, String> body) {
this.body = body;
}
@Override
public String toString() {
return String.format("Response [status=%s, message=%s, body=%s]", status, message, body);
}
}

View File

@ -1,16 +1,8 @@
package eu.dnetlib.data.mdstore.manager.utils.zeppelin;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.io.IOUtils;
public class Note {
private String name;
private List<Paragraph> paragraphs = new ArrayList<>();
public Note() {}
@ -18,19 +10,6 @@ public class Note {
this.name = name;
}
public Note(final String name, final List<Paragraph> paragraphs) {
this.name = name;
this.paragraphs = paragraphs;
}
public void addParagraph(final String title, final String text) {
paragraphs.add(new Paragraph(title, text));
}
public void addParagraph(final String title, final URL resource) throws IOException {
paragraphs.add(new Paragraph(title, IOUtils.toString(resource.openStream())));
}
public String getName() {
return name;
}
@ -39,12 +18,4 @@ public class Note {
this.name = name;
}
public List<Paragraph> getParagraphs() {
return paragraphs;
}
public void setParagraphs(final List<Paragraph> paragraphs) {
this.paragraphs = paragraphs;
}
}

View File

@ -7,13 +7,16 @@ public class Paragraph {
private String title;
private String text;
private int index = 0;
private Map<String, Object> config = new LinkedHashMap<>();
public Paragraph() {}
public Paragraph(final String title, final String text) {
public Paragraph(final String title, final String text, final int index) {
this.title = title;
this.text = text;
this.index = index;
this.config.put("title", true);
this.config.put("enabled", true);
this.config.put("editorHide", true);
}
@ -34,11 +37,19 @@ public class Paragraph {
this.text = text;
}
protected Map<String, Object> getConfig() {
public int getIndex() {
return index;
}
public void setIndex(final int index) {
this.index = index;
}
public Map<String, Object> getConfig() {
return config;
}
protected void setConfig(final Map<String, Object> config) {
public void setConfig(final Map<String, Object> config) {
this.config = config;
}

View File

@ -0,0 +1,6 @@
Zeppelin
1) Zeppelin può essere configurato per interagire con GitHub (???)
2) Le note vengono precaricate su una cartella "Template" senza il paragrafo "Configuration".
3) Per ispezionare una mdstore si clona una nota template e si aggiunge il paragrafo "Configuration".
5) Le note più vecchie di una settimana vengono cancellate automaticamente (controllare i campi status, dateCreated, dateStarted and dateFinished)

View File

@ -24,14 +24,14 @@ spring.jpa.open-in-view=true
logging.level.io.swagger.models.parameters.AbstractSerializableParameter = error
# Hadoop
dhp.mdstore-manager.hadoop.cluster = MOCK
dhp.mdstore-manager.hadoop.cluster = GARR
dhp.mdstore-manager.hdfs.base-path = /data/dnet.dev/mdstore
dhp.mdstore-manager.hadoop.user = dnet.dev
dhp.mdstore-manager.hadoop.zeppelin.base-url = https://iis-cdh5-test-gw.ocean.icm.edu.pl/zeppelin
dhp.mdstore-manager.hadoop.zeppelin.login =
dhp.mdstore-manager.hadoop.zeppelin.login =
dhp.mdstore-manager.hadoop.zeppelin.password =
dhp.mdstore-manager.hadoop.zeppelin.name-prefix = mdstoreManager/mdstoreInfo
dhp.mdstore-manager.hadoop.zeppelin.name-prefix = mdstoreManager
dhp.mdstore-manager.inspector.records.max = 1000

View File

@ -156,7 +156,7 @@
</property>
<property>
<name>dfs.client.use.datanode.hostname</name>
<value>false</value>
<value>true</value>
</property>
<property>
<name>fs.permissions.umask-mode</name>

View File

@ -76,7 +76,16 @@
<div class="pull-right">
<a href="./mdrecords/{{md.id}}/50" class="btn btn-sm btn-primary" target="_blank">inspect</a>
<a href="./zeppelin/{{md.id}}/go" class="btn btn-sm btn-warning" target="_blank">zeppelin</a>
<div class="btn-group">
<button class="btn btn-sm btn-warning dropdown-toggle" data-toggle="dropdown">zeppelin <span class="caret"></span></button>
<ul class="dropdown-menu dropdown-menu-right">
<li><a href="./zeppelin/{{md.id}}/default" target="_blank">default note</a></li>
<li><a href="./zeppelin/{{md.id}}/dc_native" target="_blank">note for native stores (oai_dc)</a></li>
<li><a href="./zeppelin/{{md.id}}/datacite_native" target="_blank">note for native stores (datacite)</a></li>
<li><a href="./zeppelin/{{md.id}}/oaf_cleaned" target="_blank">note for transformed stores</a></li>
</ul>
</div>
</div>
</div>
</div>
@ -85,7 +94,6 @@
</div>
</div>
<!-- Modals -->
<div class="modal fade" tabindex="-1" id="newMdstoreModal">

View File

@ -1,23 +0,0 @@
%pyspark
from pyspark.sql.types import *
from pyspark.sql.functions import *
from lxml import etree
from datetime import datetime
@udf(ArrayType(StringType()))
def get_type(record):
root = etree.fromstring(record.encode('utf-8'))
r = root.xpath("//*[local-name()='resourceType' and./@resourceTypeGeneral='Other']")
c_types = []
for item in r:
c_types.append(item.text)
return c_types
df = spark.read.load(path)
types = df.select(df.id, explode(get_type(df.body)).alias('type')).groupBy('type').agg(count(df.id).alias('cnt')).collect()
print "%table"
print "type\tcount"
for item in types:
print "{}\t{}".format(item.type, item.cnt)

View File

@ -1,28 +0,0 @@
%pyspark
from pyspark.sql.functions import udf
from pyspark.sql.functions import *
from lxml import etree
from datetime import datetime
@udf("string")
def get_year(record):
root = etree.fromstring(record.encode('utf-8'))
r = root.xpath("//*[local-name()='date']")
c_date = None
for item in r:
if c_date is None and item is not None:
c_date = item.text
else:
if item is not None and len(item.text) > len(c_date):
c_date = item.text
if c_date is not None:
return c_date[:4]
df = spark.read.load(path)
result_per_year = df.select(df.id, get_year(df.body).alias('year')).groupBy('year').agg(count(df.id).alias('cnt')).collect()
print "%table"
print "year\tcount"
for item in result_per_year:
print "{}\t{}".format(item.year, item.cnt)

View File

@ -1,7 +1,9 @@
%pyspark
mdId = "__MDSTORE_ID__"
path = z.select("MdStore Version", [ __LIST_MDSTORE_VERSIONS__ ])
mdVersion = "__VERSION__"
path = "__PATH__"
print "MdStore ID:", mdId
print "MdStore Version Data Path:", path
print "Version ID:", mdVersion
print "Version Data Path:", path

View File

@ -1,5 +0,0 @@
%pyspark
df = spark.read.format("org.apache.spark.sql.parquet").load(path)
print df.first().body