From 112dd75f02739f6cd5ca5d7378038676d0bb5388 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Thu, 4 Mar 2021 15:33:32 +0100 Subject: [PATCH] integration with zeppelin --- .../controller/MDInspectorController.java | 9 +- .../manager/controller/SwaggerController.java | 4 +- .../controller/ZeppelinController.java | 45 +++++ .../manager/utils/ControllerUtils.java | 22 ++ .../mdstore/manager/utils/ZeppelinClient.java | 191 ++++++++++++++++++ .../manager/utils/zeppelin/ListResponse.java | 40 ++++ .../mdstore/manager/utils/zeppelin/Note.java | 50 +++++ .../manager/utils/zeppelin/Paragraph.java | 45 +++++ .../utils/zeppelin/StringResponse.java | 37 ++++ .../src/main/resources/application.properties | 5 + .../src/main/resources/static/index.html | 8 +- .../src/main/resources/templates/error.html | 4 +- .../main/resources/zeppelin/analyzeTypes.py | 23 +++ .../main/resources/zeppelin/analyzeYears.py | 28 +++ .../src/main/resources/zeppelin/conf.tmpl.py | 7 + .../main/resources/zeppelin/firstRecord.py | 5 + 16 files changed, 511 insertions(+), 12 deletions(-) create mode 100644 apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/controller/ZeppelinController.java create mode 100644 apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/utils/ControllerUtils.java create mode 100644 apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/utils/ZeppelinClient.java create mode 100644 apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/utils/zeppelin/ListResponse.java create mode 100644 apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/utils/zeppelin/Note.java create mode 100644 apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/utils/zeppelin/Paragraph.java create mode 100644 apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/utils/zeppelin/StringResponse.java create mode 100644 apps/dhp-mdstore-manager/src/main/resources/zeppelin/analyzeTypes.py create mode 100644 apps/dhp-mdstore-manager/src/main/resources/zeppelin/analyzeYears.py create mode 100644 apps/dhp-mdstore-manager/src/main/resources/zeppelin/conf.tmpl.py create mode 100644 apps/dhp-mdstore-manager/src/main/resources/zeppelin/firstRecord.py diff --git a/apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/controller/MDInspectorController.java b/apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/controller/MDInspectorController.java index 2aa6a64d..e094c685 100644 --- a/apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/controller/MDInspectorController.java +++ b/apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/controller/MDInspectorController.java @@ -1,6 +1,5 @@ package eu.dnetlib.data.mdstore.manager.controller; -import org.apache.commons.lang3.exception.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -17,6 +16,7 @@ import org.springframework.web.servlet.ModelAndView; import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion; import eu.dnetlib.data.mdstore.manager.common.model.MDStoreWithInfo; import eu.dnetlib.data.mdstore.manager.exceptions.MDStoreManagerException; +import eu.dnetlib.data.mdstore.manager.utils.ControllerUtils; import eu.dnetlib.data.mdstore.manager.utils.DatabaseUtils; @Controller @@ -77,12 +77,7 @@ public class MDInspectorController { @ExceptionHandler(Exception.class) @ResponseStatus(value = HttpStatus.INTERNAL_SERVER_ERROR) public ModelAndView handleException(final Exception e) { - log.debug(e.getMessage(), e); - final ModelAndView mv = new ModelAndView(); - mv.setViewName("error"); - mv.addObject("error", e.getMessage()); - mv.addObject("stacktrace", ExceptionUtils.getStackTrace(e)); - return mv; + return ControllerUtils.errorPage("Metadata Inspector - ERROR", e); } private boolean isMdstoreId(final String id) { diff --git a/apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/controller/SwaggerController.java b/apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/controller/SwaggerController.java index 0802f435..7f757719 100644 --- a/apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/controller/SwaggerController.java +++ b/apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/controller/SwaggerController.java @@ -7,7 +7,9 @@ import org.springframework.web.bind.annotation.RequestMethod; @Controller public class SwaggerController { - @RequestMapping(value = { "/apidoc", "/api-doc", "/doc", "/swagger" }, method = RequestMethod.GET) + @RequestMapping(value = { + "/apidoc", "/api-doc", "/doc", "/swagger" + }, method = RequestMethod.GET) public String apiDoc() { return "redirect:swagger-ui.html"; } diff --git a/apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/controller/ZeppelinController.java b/apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/controller/ZeppelinController.java new file mode 100644 index 00000000..5b8756f1 --- /dev/null +++ b/apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/controller/ZeppelinController.java @@ -0,0 +1,45 @@ +package eu.dnetlib.data.mdstore.manager.controller; + +import java.io.IOException; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpStatus; +import org.springframework.stereotype.Controller; +import org.springframework.web.bind.annotation.ExceptionHandler; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.ResponseBody; +import org.springframework.web.bind.annotation.ResponseStatus; +import org.springframework.web.servlet.ModelAndView; + +import eu.dnetlib.data.mdstore.manager.exceptions.MDStoreManagerException; +import eu.dnetlib.data.mdstore.manager.utils.ControllerUtils; +import eu.dnetlib.data.mdstore.manager.utils.ZeppelinClient; +import eu.dnetlib.data.mdstore.manager.utils.zeppelin.Note; + +@Controller +@RequestMapping("/zeppelin") +public class ZeppelinController { + + @Autowired + private ZeppelinClient zeppelinClient; + + @RequestMapping("/{mdId}/go") + public String goToZeppelin(@PathVariable final String mdId) throws IOException, MDStoreManagerException { + final Note note = zeppelinClient.generateNote(mdId); + final String url = zeppelinClient.submitNode(note); + return "redirect:" + url; + } + + @RequestMapping("/{mdId}/note") + public @ResponseBody Note showNote(@PathVariable final String mdId) throws IOException, MDStoreManagerException { + return zeppelinClient.generateNote(mdId); + } + + @ExceptionHandler(Exception.class) + @ResponseStatus(value = HttpStatus.INTERNAL_SERVER_ERROR) + public ModelAndView handleException(final Exception e) { + return ControllerUtils.errorPage("Metadata Store Manager - Zeppelin Client", e); + } + +} diff --git a/apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/utils/ControllerUtils.java b/apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/utils/ControllerUtils.java new file mode 100644 index 00000000..98c07ec0 --- /dev/null +++ b/apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/utils/ControllerUtils.java @@ -0,0 +1,22 @@ +package eu.dnetlib.data.mdstore.manager.utils; + +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.web.servlet.ModelAndView; + +public class ControllerUtils { + + private static final Logger log = LoggerFactory.getLogger(ControllerUtils.class); + + public static ModelAndView errorPage(final String title, final Throwable e) { + log.debug(e.getMessage(), e); + final ModelAndView mv = new ModelAndView(); + mv.setViewName("error"); + mv.addObject("title", title); + mv.addObject("error", e.getMessage()); + mv.addObject("stacktrace", ExceptionUtils.getStackTrace(e)); + return mv; + + } +} diff --git a/apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/utils/ZeppelinClient.java b/apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/utils/ZeppelinClient.java new file mode 100644 index 00000000..46be0073 --- /dev/null +++ b/apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/utils/ZeppelinClient.java @@ -0,0 +1,191 @@ +package eu.dnetlib.data.mdstore.manager.utils; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.stereotype.Component; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; +import org.springframework.web.client.RestTemplate; + +import eu.dnetlib.data.mdstore.manager.exceptions.MDStoreManagerException; +import eu.dnetlib.data.mdstore.manager.utils.zeppelin.ListResponse; +import eu.dnetlib.data.mdstore.manager.utils.zeppelin.Note; +import eu.dnetlib.data.mdstore.manager.utils.zeppelin.StringResponse; + +@Component +public class ZeppelinClient { + + @Autowired + private DatabaseUtils databaseUtils; + + @Value("${dhp.mdstore-manager.hadoop.zeppelin.login}") + private String zeppelinLogin; + + @Value("${dhp.mdstore-manager.hadoop.zeppelin.password}") + private String zeppelinPassword; + + @Value("${dhp.mdstore-manager.hadoop.zeppelin.base-url}") + private String zeppelinBaseUrl; + + @Value("${dhp.mdstore-manager.hadoop.zeppelin.name-prefix}") + private String zeppelinNamePrefix; + + private static final Log log = LogFactory.getLog(ZeppelinClient.class); + + public String submitNode(final Note n) throws MDStoreManagerException { + + final List jsessionIds = calculateJsessionIDs(); + + final Optional noteUrl = jsessionIds.stream() + .map(this::listNotes) + .filter(Objects::nonNull) + .flatMap(List::stream) + .filter(map -> n.getName().equals(map.get("name"))) + .map(map -> zeppelinBaseUrl + "/#/notebook/" + map.get("id")) + .findFirst(); + + if (noteUrl.isPresent()) { + // TODO the paragraph "configuration" should be updated + return noteUrl.get(); + } + + return jsessionIds.stream() + .map(jid -> registerNote(n, jid)) + .filter(Objects::nonNull) + .map(id -> zeppelinBaseUrl + "/#/notebook/" + id) + .findFirst() + .orElseThrow(() -> new MDStoreManagerException("Zeppelin note not uploaded")); + } + + private List> listNotes(final String jsessionid) { + final String url = zeppelinBaseUrl + "/api/notebook;JSESSIONID=" + jsessionid; + log.debug("Performing POST: " + url); + + final ResponseEntity res = new RestTemplate().getForEntity(url, ListResponse.class); + + if (res.getStatusCode() != HttpStatus.OK) { + log.debug("Zeppelin API failed with HTTP error: " + res); + return null; + } else if (res.getBody() == null) { + log.debug("Zeppelin API returned a null response"); + return null; + } else if (!res.getBody().getStatus().equals("OK")) { + log.debug("Registration of zeppelin note failed: " + res.getBody()); + return null; + } else { + return res.getBody().getBody(); + } + + } + + private String registerNote(final Note n, final String jsessionid) { + final String url = zeppelinBaseUrl + "/api/notebook;JSESSIONID=" + jsessionid; + log.debug("Performing POST: " + url); + + final ResponseEntity res = new RestTemplate().postForEntity(url, n, StringResponse.class); + + if (res.getStatusCode() != HttpStatus.OK) { + log.debug("Zeppelin API failed with HTTP error: " + res); + return null; + } else if (res.getBody() == null) { + log.debug("Zeppelin API returned a null response"); + return null; + } else if (!res.getBody().getStatus().equals("OK")) { + log.debug("Registration of zeppelin note failed: " + res.getBody()); + return null; + } else { + return res.getBody().getBody(); + } + } + + private List calculateJsessionIDs() throws MDStoreManagerException { + + final HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED); + + final MultiValueMap map = new LinkedMultiValueMap<>(); + map.add("userName", zeppelinLogin); + map.add("password", zeppelinPassword); + final HttpEntity> request = new HttpEntity<>(map, headers); + + final String url = zeppelinBaseUrl + "/api/login"; + final ResponseEntity res = new RestTemplate().postForEntity(url, request, Object.class); + + if (res.getStatusCode() != HttpStatus.OK) { + log.error("Zeppelin API: login failed with HTTP error: " + res); + throw new MDStoreManagerException("Zeppelin API: login failed with HTTP error: " + res); + } else if (!res.getHeaders().containsKey(HttpHeaders.SET_COOKIE)) { + log.error("Zeppelin API: login failed (missing SET_COOKIE header)"); + throw new MDStoreManagerException("Zeppelin API: login failed (missing SET_COOKIE header)"); + } else { + return res.getHeaders() + .get(HttpHeaders.SET_COOKIE) + .stream() + .map(s -> s.split(";")) + .flatMap(Arrays::stream) + .map(String::trim) + .filter(s -> s.startsWith("JSESSIONID=")) + .map(s -> StringUtils.removeStart(s, "JSESSIONID=")) + .filter(s -> !s.equalsIgnoreCase("deleteMe")) + .distinct() + .collect(Collectors.toList()); + } + } + + public Note generateNote(final String mdId) throws MDStoreManagerException, IOException { + final Note note = new Note(zeppelinNamePrefix + "/" + mdId); + note.addParagraph("Configuration", confParagraph(mdId)); + note.addParagraph("First Record", getClass().getResource("/zeppelin/firstRecord.py")); + note.addParagraph("Analyze the years in 'date' field", getClass().getResource("/zeppelin/analyzeYears.py")); + note.addParagraph("Analyze the types in 'type' field", getClass().getResource("/zeppelin/analyzeTypes.py")); + + return note; + } + + private String confParagraph(final String mdId) throws MDStoreManagerException, IOException { + final String currentVersion = databaseUtils.findMdStore(mdId).getCurrentVersion(); + + final String versions = StreamSupport.stream(databaseUtils.listVersions(mdId).spliterator(), false) + .filter(v -> !v.isWriting()) + .filter(v -> v.getLastUpdate() != null) + .sorted((v1, v2) -> { + if (v1.getId().equals(currentVersion)) { + return -1; + } else if (v2.getId().equals(currentVersion)) { + return 1; + } else { + return v1.getLastUpdate().compareTo(v2.getLastUpdate()); + } + }) + .map(v -> { + final String path = v.getHdfsPath() + "/store"; + final String id = v.getId().equals(currentVersion) ? v.getId() + " (main)" : v.getId(); + return String.format("(\"%s\", \"%s\")", path, id); + }) + .collect(Collectors.joining()); + + return IOUtils.toString(getClass().getResourceAsStream("/zeppelin/conf.tmpl.py")) + .replaceAll("__MDSTORE_ID__", mdId) + .replaceAll("__LIST_MDSTORE_VERSIONS__", versions); + + } + +} diff --git a/apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/utils/zeppelin/ListResponse.java b/apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/utils/zeppelin/ListResponse.java new file mode 100644 index 00000000..5bc234a3 --- /dev/null +++ b/apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/utils/zeppelin/ListResponse.java @@ -0,0 +1,40 @@ +package eu.dnetlib.data.mdstore.manager.utils.zeppelin; + +import java.util.List; +import java.util.Map; + +public class ListResponse { + + private String status; + private String message; + private List> body; + + public String getStatus() { + return status; + } + + public void setStatus(final String status) { + this.status = status; + } + + public String getMessage() { + return message; + } + + public void setMessage(final String message) { + this.message = message; + } + + public List> getBody() { + return body; + } + + public void setBody(final List> body) { + this.body = body; + } + + @Override + public String toString() { + return String.format("Response [status=%s, message=%s, body=%s]", status, message, body); + } +} diff --git a/apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/utils/zeppelin/Note.java b/apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/utils/zeppelin/Note.java new file mode 100644 index 00000000..d2fae600 --- /dev/null +++ b/apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/utils/zeppelin/Note.java @@ -0,0 +1,50 @@ +package eu.dnetlib.data.mdstore.manager.utils.zeppelin; + +import java.io.IOException; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.io.IOUtils; + +public class Note { + + private String name; + private List paragraphs = new ArrayList<>(); + + public Note() {} + + public Note(final String name) { + this.name = name; + } + + public Note(final String name, final List paragraphs) { + this.name = name; + this.paragraphs = paragraphs; + } + + public void addParagraph(final String title, final String text) { + paragraphs.add(new Paragraph(title, text)); + } + + public void addParagraph(final String title, final URL resource) throws IOException { + paragraphs.add(new Paragraph(title, IOUtils.toString(resource.openStream()))); + } + + public String getName() { + return name; + } + + public void setName(final String name) { + this.name = name; + } + + public List getParagraphs() { + return paragraphs; + } + + public void setParagraphs(final List paragraphs) { + this.paragraphs = paragraphs; + } + +} diff --git a/apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/utils/zeppelin/Paragraph.java b/apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/utils/zeppelin/Paragraph.java new file mode 100644 index 00000000..9e3bea1a --- /dev/null +++ b/apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/utils/zeppelin/Paragraph.java @@ -0,0 +1,45 @@ +package eu.dnetlib.data.mdstore.manager.utils.zeppelin; + +import java.util.LinkedHashMap; +import java.util.Map; + +public class Paragraph { + + private String title; + private String text; + private Map config = new LinkedHashMap<>(); + + public Paragraph() {} + + public Paragraph(final String title, final String text) { + this.title = title; + this.text = text; + this.config.put("enabled", true); + this.config.put("editorHide", true); + } + + public String getTitle() { + return title; + } + + public void setTitle(final String title) { + this.title = title; + } + + public String getText() { + return text; + } + + public void setText(final String text) { + this.text = text; + } + + protected Map getConfig() { + return config; + } + + protected void setConfig(final Map config) { + this.config = config; + } + +} diff --git a/apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/utils/zeppelin/StringResponse.java b/apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/utils/zeppelin/StringResponse.java new file mode 100644 index 00000000..f27a7a18 --- /dev/null +++ b/apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/utils/zeppelin/StringResponse.java @@ -0,0 +1,37 @@ +package eu.dnetlib.data.mdstore.manager.utils.zeppelin; + +public class StringResponse { + + private String status; + private String message; + private String body; + + public String getStatus() { + return status; + } + + public void setStatus(final String status) { + this.status = status; + } + + public String getMessage() { + return message; + } + + public void setMessage(final String message) { + this.message = message; + } + + public String getBody() { + return body; + } + + public void setBody(final String body) { + this.body = body; + } + + @Override + public String toString() { + return String.format("Response [status=%s, message=%s, body=%s]", status, message, body); + } +} diff --git a/apps/dhp-mdstore-manager/src/main/resources/application.properties b/apps/dhp-mdstore-manager/src/main/resources/application.properties index be6297cd..35635a2d 100644 --- a/apps/dhp-mdstore-manager/src/main/resources/application.properties +++ b/apps/dhp-mdstore-manager/src/main/resources/application.properties @@ -28,6 +28,11 @@ dhp.mdstore-manager.hadoop.cluster = MOCK dhp.mdstore-manager.hdfs.base-path = /data/dnet.dev/mdstore dhp.mdstore-manager.hadoop.user = dnet.dev +dhp.mdstore-manager.hadoop.zeppelin.base-url = https://iis-cdh5-test-gw.ocean.icm.edu.pl/zeppelin +dhp.mdstore-manager.hadoop.zeppelin.login = +dhp.mdstore-manager.hadoop.zeppelin.password = +dhp.mdstore-manager.hadoop.zeppelin.name-prefix = mdstoreManager/mdstoreInfo + dhp.mdstore-manager.inspector.records.max = 1000 dhp.swagger.api.host = localhost diff --git a/apps/dhp-mdstore-manager/src/main/resources/static/index.html b/apps/dhp-mdstore-manager/src/main/resources/static/index.html index d8a44b92..a9650408 100644 --- a/apps/dhp-mdstore-manager/src/main/resources/static/index.html +++ b/apps/dhp-mdstore-manager/src/main/resources/static/index.html @@ -73,7 +73,11 @@ @@ -155,7 +159,7 @@ {{v.id}}
Path: {{v.hdfsPath}}
- inspect + inspect diff --git a/apps/dhp-mdstore-manager/src/main/resources/templates/error.html b/apps/dhp-mdstore-manager/src/main/resources/templates/error.html index aa99d367..ec36aede 100644 --- a/apps/dhp-mdstore-manager/src/main/resources/templates/error.html +++ b/apps/dhp-mdstore-manager/src/main/resources/templates/error.html @@ -2,7 +2,7 @@ - Metadata Inspector - ERROR + <link rel="stylesheet" href="./css/bootstrap.min.css" /> <link rel="stylesheet" href="./css/bootstrap-theme.min.css" /> <script src="./js/jquery-1.12.3.min.js"></script> @@ -17,7 +17,7 @@ <body> <div class="container-fluid"> - <h1>Metadata Inspector - ERROR</h1> + <h1 th:text="${title}" /> <hr /> <h4 class="text-danger" th:text="${error}" /> <hr /> diff --git a/apps/dhp-mdstore-manager/src/main/resources/zeppelin/analyzeTypes.py b/apps/dhp-mdstore-manager/src/main/resources/zeppelin/analyzeTypes.py new file mode 100644 index 00000000..f56a1c51 --- /dev/null +++ b/apps/dhp-mdstore-manager/src/main/resources/zeppelin/analyzeTypes.py @@ -0,0 +1,23 @@ +%pyspark +from pyspark.sql.types import * +from pyspark.sql.functions import * +from lxml import etree +from datetime import datetime + +@udf(ArrayType(StringType())) +def get_type(record): + root = etree.fromstring(record.encode('utf-8')) + r = root.xpath("//*[local-name()='resourceType' and./@resourceTypeGeneral='Other']") + c_types = [] + for item in r: + c_types.append(item.text) + return c_types + + +df = spark.read.load(path) +types = df.select(df.id, explode(get_type(df.body)).alias('type')).groupBy('type').agg(count(df.id).alias('cnt')).collect() + +print "%table" +print "type\tcount" +for item in types: + print "{}\t{}".format(item.type, item.cnt) diff --git a/apps/dhp-mdstore-manager/src/main/resources/zeppelin/analyzeYears.py b/apps/dhp-mdstore-manager/src/main/resources/zeppelin/analyzeYears.py new file mode 100644 index 00000000..80cd3cd1 --- /dev/null +++ b/apps/dhp-mdstore-manager/src/main/resources/zeppelin/analyzeYears.py @@ -0,0 +1,28 @@ +%pyspark +from pyspark.sql.functions import udf +from pyspark.sql.functions import * +from lxml import etree +from datetime import datetime + +@udf("string") +def get_year(record): + root = etree.fromstring(record.encode('utf-8')) + r = root.xpath("//*[local-name()='date']") + c_date = None + for item in r: + if c_date is None and item is not None: + c_date = item.text + else: + if item is not None and len(item.text) > len(c_date): + c_date = item.text + if c_date is not None: + return c_date[:4] + + +df = spark.read.load(path) +result_per_year = df.select(df.id, get_year(df.body).alias('year')).groupBy('year').agg(count(df.id).alias('cnt')).collect() + +print "%table" +print "year\tcount" +for item in result_per_year: + print "{}\t{}".format(item.year, item.cnt) diff --git a/apps/dhp-mdstore-manager/src/main/resources/zeppelin/conf.tmpl.py b/apps/dhp-mdstore-manager/src/main/resources/zeppelin/conf.tmpl.py new file mode 100644 index 00000000..ff8230e3 --- /dev/null +++ b/apps/dhp-mdstore-manager/src/main/resources/zeppelin/conf.tmpl.py @@ -0,0 +1,7 @@ +%pyspark + +mdId = "__MDSTORE_ID__" +path = z.select("MdStore Version", [ __LIST_MDSTORE_VERSIONS__ ]) + +print "MdStore ID:", mdId +print "MdStore Version Data Path:", path diff --git a/apps/dhp-mdstore-manager/src/main/resources/zeppelin/firstRecord.py b/apps/dhp-mdstore-manager/src/main/resources/zeppelin/firstRecord.py new file mode 100644 index 00000000..667225d5 --- /dev/null +++ b/apps/dhp-mdstore-manager/src/main/resources/zeppelin/firstRecord.py @@ -0,0 +1,5 @@ +%pyspark + +df = spark.read.format("org.apache.spark.sql.parquet").load(path) + +print df.first().body \ No newline at end of file