first import of mdstore inspector (hdfs)

This commit is contained in:
Michele Artini 2023-02-06 14:50:10 +01:00
parent 721b55294d
commit 5bdc530e91
31 changed files with 2191 additions and 2 deletions

View File

@ -29,6 +29,22 @@
<groupId>jaxen</groupId>
<artifactId>jaxen</artifactId>
</dependency>
<!-- Hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.5.0-cdh5.13.3</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.5.0-cdh5.13.3</version>
</dependency>
<!-- hot swapping, disable cache for template, enable live reload -->
<dependency>

View File

@ -0,0 +1,86 @@
package eu.dnetlib.data.mdstore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Controller;
import org.springframework.ui.ModelMap;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.servlet.ModelAndView;
import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion;
import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo;
import eu.dnetlib.data.mdstore.manager.exceptions.MDStoreManagerException;
import eu.dnetlib.data.mdstore.utils.ControllerUtils;
import eu.dnetlib.data.mdstore.utils.DatabaseUtils;
@Controller
public class MDInspectorController {
@Autowired
private DatabaseUtils databaseUtils;
private static final Logger log = LoggerFactory.getLogger(MDInspectorController.class);
@Value("${dhp.mdstore-manager.inspector.records.max}")
private Long MAX_MD_RECORDS;
@RequestMapping("/mdrecords/{id}/{limit}")
public String mdstoreInspector(final ModelMap map, @PathVariable final String id, @PathVariable final long limit) throws MDStoreManagerException {
final MDStoreWithInfo md;
final MDStoreVersion ver;
if (isMdstoreId(id)) {
log.debug("MDSTORE: " + id);
md = databaseUtils.findMdStore(id);
ver = databaseUtils.findVersion(md.getCurrentVersion());
} else {
log.debug("VERSION: " + id);
ver = databaseUtils.findVersion(id);
md = databaseUtils.findMdStore(ver.getMdstore());
}
map.addAttribute("mdId", md.getId());
map.addAttribute("versionId", ver.getId());
map.addAttribute("dsId", md.getDatasourceId());
map.addAttribute("dsName", md.getDatasourceName());
map.addAttribute("apiId", md.getApiId());
map.addAttribute("format", md.getFormat());
map.addAttribute("layout", md.getLayout());
map.addAttribute("interpretation", md.getInterpretation());
map.addAttribute("path", ver.getHdfsPath());
map.addAttribute("lastUpdate", ver.getLastUpdate());
map.addAttribute("size", ver.getSize());
map.addAttribute("limit", Math.min(limit, MAX_MD_RECORDS));
if (md.getCurrentVersion().equals(ver.getId())) {
map.addAttribute("status", "current");
} else if (ver.isWriting()) {
map.addAttribute("status", "writing");
} else {
map.addAttribute("status", "expired");
}
return "inspector";
}
@ExceptionHandler(Exception.class)
@ResponseStatus(value = HttpStatus.INTERNAL_SERVER_ERROR)
public ModelAndView handleException(final Exception e) {
return ControllerUtils.errorPage("Metadata Inspector - ERROR", e);
}
private boolean isMdstoreId(final String id) {
return id.length() < 40;
}
}

View File

@ -0,0 +1,232 @@
package eu.dnetlib.data.mdstore;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.google.common.collect.Sets;
import eu.dnetlib.common.controller.AbstractDnetController;
import eu.dnetlib.data.mdstore.manager.exceptions.MDStoreManagerException;
import eu.dnetlib.data.mdstore.utils.DatabaseUtils;
import eu.dnetlib.data.mdstore.utils.HdfsClient;
import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion;
import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
@RestController
@RequestMapping("/mdstores")
@Tag(name = "Metadata Stores")
public class MDStoreController extends AbstractDnetController {
@Autowired
private DatabaseUtils databaseUtils;
@Autowired
private HdfsClient hdfsClient;
private static final Logger log = LoggerFactory.getLogger(DatabaseUtils.class);
@Operation(summary = "Return all the mdstores")
@GetMapping("/")
public Iterable<MDStoreWithInfo> find() {
return databaseUtils.listMdStores();
}
@Operation(summary = "Return all the mdstore identifiers")
@GetMapping("/ids")
public List<String> findIdentifiers() {
return databaseUtils.listMdStoreIDs();
}
@Operation(summary = "Return a mdstores by id")
@GetMapping("/mdstore/{mdId}")
public MDStoreWithInfo getMdStore(@Parameter(name = "the mdstore identifier") @PathVariable final String mdId) throws MDStoreManagerException {
return databaseUtils.findMdStore(mdId);
}
@Operation(summary = "Increase the read count of the current mdstore")
@GetMapping("/mdstore/{mdId}/startReading")
public MDStoreVersion startReading(@Parameter(name = "the mdstore identifier") @PathVariable final String mdId) throws MDStoreManagerException {
return databaseUtils.startReading(mdId);
}
@Operation(summary = "Create a new mdstore")
@GetMapping("/new/{format}/{layout}/{interpretation}")
public MDStoreWithInfo createMDStore(
@Parameter(name = "mdstore format") @PathVariable final String format,
@Parameter(name = "mdstore layout") @PathVariable final String layout,
@Parameter(name = "mdstore interpretation") @PathVariable final String interpretation,
@Parameter(name = "datasource name") @RequestParam(required = true) final String dsName,
@Parameter(name = "datasource id") @RequestParam(required = true) final String dsId,
@Parameter(name = "api id") @RequestParam(required = true) final String apiId) throws MDStoreManagerException {
final String id = databaseUtils.createMDStore(format, layout, interpretation, dsName, dsId, apiId);
return databaseUtils.findMdStore(id);
}
@Operation(summary = "Delete a mdstore by id")
@DeleteMapping("/mdstore/{mdId}")
public StatusResponse delete(@Parameter(name = "the id of the mdstore that will be deleted") @PathVariable final String mdId) throws MDStoreManagerException {
final String hdfsPath = databaseUtils.deleteMdStore(mdId);
hdfsClient.deletePath(hdfsPath);
return StatusResponse.DELETED;
}
@Operation(summary = "Return all the versions of a mdstore")
@GetMapping("/mdstore/{mdId}/versions")
public Iterable<MDStoreVersion> listVersions(@PathVariable final String mdId) throws MDStoreManagerException {
return databaseUtils.listVersions(mdId);
}
@Operation(summary = "Create a new preliminary version of a mdstore")
@GetMapping("/mdstore/{mdId}/newVersion")
public MDStoreVersion prepareNewVersion(@Parameter(name = "the id of the mdstore for which will be created a new version") @PathVariable final String mdId) {
return databaseUtils.prepareMdStoreVersion(mdId);
}
@Operation(summary = "Promote a preliminary version to current")
@GetMapping("/version/{versionId}/commit/{size}")
public MDStoreVersion commitVersion(@Parameter(name = "the id of the version that will be promoted to the current version") @PathVariable final String versionId,
@Parameter(name = "the size of the new current mdstore") @PathVariable final long size) throws MDStoreManagerException {
try {
return databaseUtils.commitMdStoreVersion(versionId, size);
} finally {
deleteExpiredVersions();
}
}
@Operation(summary = "Abort a preliminary version")
@GetMapping("/version/{versionId}/abort")
public StatusResponse commitVersion(@Parameter(name = "the id of the version to abort") @PathVariable final String versionId) throws MDStoreManagerException {
final String hdfsPath = databaseUtils.deleteMdStoreVersion(versionId, true);
hdfsClient.deletePath(hdfsPath);
return StatusResponse.ABORTED;
}
@Operation(summary = "Return an existing mdstore version")
@GetMapping("/version/{versionId}")
public MDStoreVersion getVersion(@Parameter(name = "the id of the version that has to be deleted") @PathVariable final String versionId)
throws MDStoreManagerException {
return databaseUtils.findVersion(versionId);
}
@Operation(summary = "Delete a mdstore version")
@DeleteMapping("/version/{versionId}")
public StatusResponse deleteVersion(@Parameter(name = "the id of the version that has to be deleted") @PathVariable final String versionId,
@Parameter(name = "if true, the controls on writing and readcount values will be skipped") @RequestParam(required = false, defaultValue = "false") final boolean force)
throws MDStoreManagerException {
final String hdfsPath = databaseUtils.deleteMdStoreVersion(versionId, force);
hdfsClient.deletePath(hdfsPath);
return StatusResponse.DELETED;
}
@Operation(summary = "Decrease the read count of a mdstore version")
@GetMapping("/version/{versionId}/endReading")
public MDStoreVersion endReading(@Parameter(name = "the id of the version that has been completely read") @PathVariable final String versionId)
throws MDStoreManagerException {
return databaseUtils.endReading(versionId);
}
@Operation(summary = "Reset the read count of a mdstore version")
@GetMapping("/version/{versionId}/resetReading")
public MDStoreVersion resetReading(@Parameter(name = "the id of the version") @PathVariable final String versionId)
throws MDStoreManagerException {
return databaseUtils.resetReading(versionId);
}
@Operation(summary = "Delete expired versions")
@DeleteMapping("/versions/expired")
public StatusResponse deleteExpiredVersions() {
new Thread(this::performDeleteOfExpiredVersions).start();
return StatusResponse.DELETING;
}
private synchronized void performDeleteOfExpiredVersions() {
log.info("Deleting expired version...");
for (final String versionId : databaseUtils.listExpiredVersions()) {
try {
final String hdfsPath = databaseUtils.deleteMdStoreVersion(versionId, true);
hdfsClient.deletePath(hdfsPath);
} catch (final MDStoreManagerException e) {
log.warn("Error deleteting version " + versionId, e);
}
}
log.info("Done.");
}
@Operation(summary = "Fix the inconsistencies on HDFS")
@GetMapping("/hdfs/inconsistencies")
public Set<String> fixHdfsInconsistencies(
@Parameter(name = "force the deletion of hdfs paths") @RequestParam(required = false, defaultValue = "false") final boolean delete)
throws MDStoreManagerException {
final Set<String> hdfsDirs = hdfsClient.listHadoopDirs();
final Set<String> validDirs = databaseUtils.listValidHdfsPaths();
final Set<String> toDelete = Sets.difference(hdfsDirs, validDirs);
log.info("Found " + toDelete.size() + " hdfs paths to remove");
if (delete) {
for (final String p : toDelete) {
hdfsClient.deletePath(p);
}
}
return toDelete;
}
@Operation(summary = "Show informations")
@GetMapping("/info")
public Map<String, Object> info() {
final Map<String, Object> info = new LinkedHashMap<>();
info.put("number_of_mdstores", databaseUtils.countMdStores());
info.put("hadoop_user", hdfsClient.getHadoopUser());
info.put("hadoop_cluster", hdfsClient.getHadoopCluster());
info.put("hdfs_base_path", databaseUtils.getHdfsBasePath());
info.put("expired_versions", databaseUtils.listExpiredVersions());
return info;
}
@Operation(summary = "list the file inside the path of a mdstore version")
@GetMapping("/version/{versionId}/parquet/files")
public Set<String> listVersionFiles(@PathVariable final String versionId) throws MDStoreManagerException {
final String path = databaseUtils.findVersion(versionId).getHdfsPath();
return hdfsClient.listContent(path + "/store", HdfsClient::isParquetFile);
}
@Operation(summary = "read the parquet file of a mdstore version")
@GetMapping("/version/{versionId}/parquet/content/{limit}")
public List<Map<String, String>> listVersionParquet(@PathVariable final String versionId, @PathVariable final long limit) throws MDStoreManagerException {
final String path = databaseUtils.findVersion(versionId).getHdfsPath();
return hdfsClient.readParquetFiles(path + "/store", limit);
}
@Operation(summary = "read the parquet file of a mdstore (current version)")
@GetMapping("/mdstore/{mdId}/parquet/content/{limit}")
public List<Map<String, String>> listMdstoreParquet(@PathVariable final String mdId, @PathVariable final long limit) throws MDStoreManagerException {
final String versionId = databaseUtils.findMdStore(mdId).getCurrentVersion();
final String path = databaseUtils.findVersion(versionId).getHdfsPath();
return hdfsClient.readParquetFiles(path + "/store", limit);
}
protected void setDatabaseUtils(final DatabaseUtils databaseUtils) {
this.databaseUtils = databaseUtils;
}
protected void setHdfsClient(final HdfsClient hdfsClient) {
this.hdfsClient = hdfsClient;
}
}

View File

@ -0,0 +1,25 @@
package eu.dnetlib.data.mdstore;
public class StatusResponse {
public static final StatusResponse DELETED = new StatusResponse("DELETED");
public static final StatusResponse DELETING = new StatusResponse("DELETING...");
public static final StatusResponse ABORTED = new StatusResponse("ABORTED");;
private String status;
public StatusResponse() {}
public StatusResponse(final String status) {
this.status = status;
}
public String getStatus() {
return status;
}
public void setStatus(final String status) {
this.status = status;
}
}

View File

@ -0,0 +1,31 @@
package eu.dnetlib.data.mdstore;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import eu.dnetlib.common.controller.AbstractDnetController;
import eu.dnetlib.data.mdstore.manager.exceptions.MDStoreManagerException;
import eu.dnetlib.data.mdstore.utils.ZeppelinClient;
@RestController
@RequestMapping("/zeppelin")
public class ZeppelinAjaxController extends AbstractDnetController {
@Autowired
private ZeppelinClient zeppelinClient;
@GetMapping("/templates")
public List<String> getTemplates() throws MDStoreManagerException {
try {
// if (zeppelinClient.get)
return zeppelinClient.listTemplates();
} catch (final Throwable e) {
throw new MDStoreManagerException("Zeppelin is unreachable", e);
}
}
}

View File

@ -0,0 +1,41 @@
package eu.dnetlib.data.mdstore;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.servlet.ModelAndView;
import eu.dnetlib.data.mdstore.manager.exceptions.MDStoreManagerException;
import eu.dnetlib.data.mdstore.utils.ControllerUtils;
import eu.dnetlib.data.mdstore.utils.DatabaseUtils;
import eu.dnetlib.data.mdstore.utils.ZeppelinClient;
import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo;
@Controller
public class ZeppelinController {
@Autowired
private ZeppelinClient zeppelinClient;
@Autowired
private DatabaseUtils databaseUtils;
@RequestMapping("/zeppelin/{mdId}/{note}")
public String goToZeppelin(@PathVariable final String mdId, final @PathVariable String note) throws MDStoreManagerException {
final MDStoreWithInfo mdstore = databaseUtils.findMdStore(mdId);
final String currentVersion = mdstore.getCurrentVersion();
final String path = databaseUtils.findVersion(currentVersion).getHdfsPath() + "/store";
return "redirect:" + zeppelinClient.zeppelinNote(note, mdstore, path);
}
@ExceptionHandler(Exception.class)
@ResponseStatus(value = HttpStatus.INTERNAL_SERVER_ERROR)
public ModelAndView handleException(final Exception e) {
return ControllerUtils.errorPage("Metadata Store Manager - Zeppelin Client", e);
}
}

View File

@ -0,0 +1,34 @@
package eu.dnetlib.data.mdstore.manager.exceptions;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.ResponseStatus;
@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
public class MDStoreManagerException extends Exception{
/**
*
*/
private static final long serialVersionUID = -7503316126409002675L;
public MDStoreManagerException() {
super();
}
public MDStoreManagerException(String message, Throwable cause, boolean enableSuppression,
boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
public MDStoreManagerException(String message, Throwable cause) {
super(message, cause);
}
public MDStoreManagerException(String message) {
super(message);
}
public MDStoreManagerException(Throwable cause) {
super(cause);
}
}

View File

@ -0,0 +1,22 @@
package eu.dnetlib.data.mdstore.utils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.servlet.ModelAndView;
public class ControllerUtils {
private static final Logger log = LoggerFactory.getLogger(ControllerUtils.class);
public static ModelAndView errorPage(final String title, final Throwable e) {
log.debug(e.getMessage(), e);
final ModelAndView mv = new ModelAndView();
mv.setViewName("error");
mv.addObject("title", title);
mv.addObject("error", e.getMessage());
mv.addObject("stacktrace", ExceptionUtils.getStackTrace(e));
return mv;
}
}

View File

@ -0,0 +1,200 @@
package eu.dnetlib.data.mdstore.utils;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.transaction.Transactional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import eu.dnetlib.data.mdstore.manager.exceptions.MDStoreManagerException;
import eu.dnetlib.data.mdstore.repository.MDStoreCurrentVersionRepository;
import eu.dnetlib.data.mdstore.repository.MDStoreRepository;
import eu.dnetlib.data.mdstore.repository.MDStoreVersionRepository;
import eu.dnetlib.data.mdstore.repository.MDStoreWithInfoRepository;
import eu.dnetlib.dhp.schema.mdstore.MDStore;
import eu.dnetlib.dhp.schema.mdstore.MDStoreCurrentVersion;
import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion;
import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo;
@Service
public class DatabaseUtils {
@Autowired
private MDStoreRepository mdstoreRepository;
@Autowired
private MDStoreVersionRepository mdstoreVersionRepository;
@Autowired
private MDStoreCurrentVersionRepository mdstoreCurrentVersionRepository;
@Autowired
private MDStoreWithInfoRepository mdstoreWithInfoRepository;
@Autowired
private JdbcTemplate jdbcTemplate;
@Value("${dhp.mdstore-manager.hdfs.base-path}")
private String hdfsBasePath;
private static final Logger log = LoggerFactory.getLogger(DatabaseUtils.class);
public Iterable<MDStoreWithInfo> listMdStores() {
return mdstoreWithInfoRepository.findAll();
}
public List<String> listMdStoreIDs() {
return mdstoreRepository.findAll().stream().map(MDStore::getId).collect(Collectors.toList());
}
public long countMdStores() {
return mdstoreRepository.count();
}
public Iterable<MDStoreVersion> listVersions(final String mdId) {
return mdstoreVersionRepository.findByMdstore(mdId);
}
public List<String> listExpiredVersions() {
return jdbcTemplate
.queryForList("select v.id from mdstore_versions v left outer join mdstore_current_versions cv on (v.id = cv.current_version) where v.writing = false and v.readcount = 0 and cv.mdstore is null;", String.class);
}
public MDStoreWithInfo findMdStore(final String mdId) throws MDStoreManagerException {
return mdstoreWithInfoRepository.findById(mdId).orElseThrow(() -> new MDStoreManagerException("Missing mdstore: " + mdId));
}
public MDStoreVersion findVersion(final String versionId) throws MDStoreManagerException {
return mdstoreVersionRepository.findById(versionId).orElseThrow(() -> new MDStoreManagerException("Missing mdstore version: " + versionId));
}
@Transactional
public String createMDStore(final String format,
final String layout,
final String interpretation,
final String dsName,
final String dsId,
final String apiId) {
final MDStore md = MDStore.newInstance(format, layout, interpretation, dsName, dsId, apiId, hdfsBasePath);
mdstoreRepository.save(md);
final MDStoreVersion v = MDStoreVersion.newInstance(md.getId(), false, hdfsBasePath);
v.setLastUpdate(new Date());
mdstoreVersionRepository.save(v);
mdstoreCurrentVersionRepository.save(MDStoreCurrentVersion.newInstance(v));
return md.getId();
}
@Transactional
public String deleteMdStore(final String mdId) throws MDStoreManagerException {
final Optional<MDStore> md = mdstoreRepository.findById(mdId);
if (!md.isPresent()) {
log.error("MDStore not found: " + mdId);
throw new MDStoreManagerException("MDStore not found: " + mdId);
}
if (mdstoreVersionRepository.countByMdstoreAndReadCountGreaterThan(mdId, 0) > 0) {
log.error("Read transactions found on mdstore: " + mdId);
throw new MDStoreManagerException("Read transactions found on mdstore: " + mdId);
}
if (mdstoreVersionRepository.countByMdstoreAndWriting(mdId, true) > 0) {
log.error("Write transactions found on mdstore: " + mdId);
throw new MDStoreManagerException("Write transactions found on mdstore: " + mdId);
}
mdstoreCurrentVersionRepository.deleteById(mdId);
mdstoreVersionRepository.deleteByMdstore(mdId);
mdstoreRepository.deleteById(mdId);
return md.get().getHdfsPath();
}
@Transactional
public MDStoreVersion startReading(final String mdId) throws MDStoreManagerException {
final MDStoreCurrentVersion cv =
mdstoreCurrentVersionRepository.findById(mdId).orElseThrow(() -> new MDStoreManagerException("Missing mdstore: " + mdId));
final MDStoreVersion v = mdstoreVersionRepository.findById(cv.getCurrentVersion())
.orElseThrow(() -> new MDStoreManagerException("Missing version: " + cv.getCurrentVersion()));
v.setReadCount(v.getReadCount() + 1);
mdstoreVersionRepository.save(v);
return v;
}
@Transactional
public MDStoreVersion endReading(final String versionId) throws MDStoreManagerException {
final MDStoreVersion v = mdstoreVersionRepository.findById(versionId).orElseThrow(() -> new MDStoreManagerException("Version not found"));
v.setReadCount(Math.max(0, v.getReadCount() - 1));
return v;
}
@Transactional
public MDStoreVersion resetReading(final String versionId) throws MDStoreManagerException {
final MDStoreVersion v = mdstoreVersionRepository.findById(versionId).orElseThrow(() -> new MDStoreManagerException("Version not found"));
v.setReadCount(0);
return v;
}
@Transactional
public MDStoreVersion prepareMdStoreVersion(final String mdId) {
final MDStoreVersion v = MDStoreVersion.newInstance(mdId, true, hdfsBasePath);
mdstoreVersionRepository.save(v);
return v;
}
@Transactional
public MDStoreVersion commitMdStoreVersion(final String versionId, final long size) throws MDStoreManagerException {
final MDStoreVersion v = mdstoreVersionRepository.findById(versionId).orElseThrow(() -> new MDStoreManagerException("Invalid version: " + versionId));
mdstoreCurrentVersionRepository.save(MDStoreCurrentVersion.newInstance(v));
v.setWriting(false);
v.setSize(size);
v.setLastUpdate(new Date());
mdstoreVersionRepository.save(v);
return v;
}
@Transactional
public String deleteMdStoreVersion(final String versionId, final boolean force) throws MDStoreManagerException {
final MDStoreVersion v = mdstoreVersionRepository.findById(versionId).orElseThrow(() -> new MDStoreManagerException("Version not found"));
if (mdstoreCurrentVersionRepository
.countByCurrentVersion(versionId) > 0) {
throw new MDStoreManagerException("I cannot delete this version because it is the current version");
}
if (!force) {
if (v.isWriting()) { throw new MDStoreManagerException("I cannot delete this version because it is in write mode"); }
if (v.getReadCount() > 0) { throw new MDStoreManagerException("I cannot delete this version because it is in read mode"); }
}
mdstoreVersionRepository.delete(v);
return v.getHdfsPath();
}
public Set<String> listValidHdfsPaths() {
return new HashSet<>(jdbcTemplate
.queryForList(" select hdfs_path from mdstores union all select hdfs_path from mdstore_versions", String.class));
}
public String getHdfsBasePath() {
return hdfsBasePath;
}
public void setHdfsBasePath(final String hdfsBasePath) {
this.hdfsBasePath = hdfsBasePath;
}
}

View File

@ -0,0 +1,213 @@
package eu.dnetlib.data.mdstore.utils;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import eu.dnetlib.data.mdstore.manager.exceptions.MDStoreManagerException;
import parquet.avro.AvroParquetReader;
import parquet.hadoop.ParquetReader;
@Component
public class HdfsClient {
@Value("${dhp.mdstore-manager.hadoop.cluster}")
private String hadoopCluster;
@Value("${dhp.mdstore-manager.hadoop.user}")
private String hadoopUser;
@Value("${dhp.mdstore-manager.hdfs.base-path}")
private String hdfsBasePath;
private static final Log log = LogFactory.getLog(HdfsClient.class);
public void deletePath(final String path) throws MDStoreManagerException {
try (final FileSystem fs = FileSystem.get(conf())) {
fs.delete(new Path(path), true);
log.info("HDFS Path deleted: " + path);
} catch (final FileNotFoundException e) {
log.warn("Missing path: " + hdfsBasePath);
} catch (IllegalArgumentException | IOException e) {
log.error("Eror deleting path: " + path, e);
throw new MDStoreManagerException("Eror deleting path: " + path, e);
}
}
public Set<String> listHadoopDirs() {
final Set<String> res = new LinkedHashSet<>();
try (final FileSystem fs = FileSystem.get(conf())) {
for (final FileStatus mdDir : fs.listStatus(new Path(hdfsBasePath))) {
if (isMdStoreOrVersionDir(mdDir)) {
res.add(String.format("%s/%s", hdfsBasePath, mdDir.getPath().getName()));
for (final FileStatus verDir : fs.listStatus(mdDir.getPath())) {
if (isMdStoreOrVersionDir(verDir)) {
res.add(String.format("%s/%s/%s", hdfsBasePath, mdDir.getPath().getName(), verDir.getPath().getName()));
}
}
}
}
} catch (final FileNotFoundException e) {
log.warn("Missing path: " + hdfsBasePath);
} catch (final Exception e) {
log.error("Error Listing path: " + hdfsBasePath, e);
}
return res;
}
public Set<String> listContent(final String path, final Predicate<FileStatus> condition) {
// TODO: remove the following line (it is only for tests)
// if (1 != 2) { return
// Sets.newHashSet("file:///Users/michele/Desktop/part-00000-e3675dc3-69fb-422e-a159-78e34cfe14d2-c000.snappy.parquet"); }
final Set<String> res = new LinkedHashSet<>();
try (final FileSystem fs = FileSystem.get(conf())) {
for (final FileStatus f : fs.listStatus(new Path(path))) {
if (condition.test(f)) {
res.add(String.format("%s/%s", path, f.getPath().getName()));
}
}
} catch (final FileNotFoundException e) {
log.warn("Missing path: " + hdfsBasePath);
} catch (final Exception e) {
log.error("Error Listing path: " + path, e);
}
return res;
}
@SuppressWarnings("unchecked")
public List<Map<String, String>> readParquetFiles(final String path, final long limit) throws MDStoreManagerException {
final List<Map<String, String>> list = new ArrayList<>();
final Configuration conf = conf();
long i = 0;
final Set<String> fields = new LinkedHashSet<>();
for (final String f : listContent(path, HdfsClient::isParquetFile)) {
if (i < limit) {
log.info("Opening parquet file: " + f);
try (final ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord> builder(new Path(f)).withConf(conf).build()) {
log.debug("File parquet OPENED");
GenericRecord rec = null;
while (i++ < limit && (rec = reader.read()) != null) {
if (fields.isEmpty()) {
rec.getSchema().getFields().forEach(field -> fields.add(field.name()));
log.debug("Found schema: " + fields);
}
final Map<String, String> map = new LinkedHashMap<>();
for (final String field : fields) {
final Object v = rec.get(field);
map.put(field, v != null ? v.toString() : "");
}
list.add(map);
log.debug("added record");
}
} catch (final FileNotFoundException e) {
log.warn("Missing path: " + hdfsBasePath);
} catch (final Throwable e) {
log.error("Error reading parquet file: " + f, e);
throw new MDStoreManagerException("Error reading parquet file: " + f, e);
}
}
}
return list;
}
/*
*
* private String printGroup(final Group g) { final StringWriter sw = new StringWriter();
*
* final int fieldCount = g.getType().getFieldCount(); for (int field = 0; field < fieldCount; field++) { final int valueCount =
* g.getFieldRepetitionCount(field);
*
* final Type fieldType = g.getType().getType(field); final String fieldName = fieldType.getName();
*
* for (int index = 0; index < valueCount; index++) { if (fieldType.isPrimitive()) { sw.append(fieldName + " " +
* g.getValueToString(field, index)); sw.append("\n"); } } } return sw.toString(); }
*
* public List<String> readParquetFile(final String file, final long n) throws MDStoreManagerException {
*
* final Configuration conf = conf(); final Path path = new Path(file); final List<String> list = new ArrayList<>(); try { final
* ParquetMetadata readFooter = ParquetFileReader.readFooter(conf, path, ParquetMetadataConverter.NO_FILTER); final MessageType schema =
* readFooter.getFileMetaData().getSchema(); final ParquetFileReader r = new ParquetFileReader(conf, path, readFooter);
*
* PageReadStore pages = null; try { while (null != (pages = r.readNextRowGroup())) { final long rows = pages.getRowCount();
* System.out.println("Number of rows: " + rows);
*
* final MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema); final RecordReader<Group> recordReader =
* columnIO.getRecordReader(pages, new GroupRecordConverter(schema)); for (int i = 0; i < rows; i++) { final Group g =
* recordReader.read(); list.add(printGroup(g)); } } } finally { r.close(); } } catch (final IOException e) {
* System.out.println("Error reading parquet file."); e.printStackTrace(); } return list; }
*
*
*/
public static boolean isParquetFile(final FileStatus fileStatus) {
return fileStatus.isFile() && fileStatus.getPath().getName().endsWith(".parquet");
}
public static boolean isMdStoreOrVersionDir(final FileStatus fileStatus) {
return fileStatus.isDirectory() && fileStatus.getPath().getName().startsWith("md-");
}
private Configuration conf() throws MDStoreManagerException {
final Configuration conf = new Configuration();
System.setProperty("HADOOP_USER_NAME", hadoopUser);
if (hadoopCluster.equalsIgnoreCase("OCEAN")) {
conf.addResource(getClass().getResourceAsStream("/hadoop/OCEAN/core-site.xml"));
conf.addResource(getClass().getResourceAsStream("/hadoop/OCEAN/ocean-hadoop-conf.xml"));
} else if (hadoopCluster.equalsIgnoreCase("GARR")) {
conf.addResource(getClass().getResourceAsStream("/hadoop/GARR/core-site.xml"));
conf.addResource(getClass().getResourceAsStream("/hadoop/GARR/garr-hadoop-conf.xml"));
} else if (hadoopCluster.equalsIgnoreCase("MOCK")) {
// NOTHING
} else {
log.error("Invalid Haddop Cluster: " + hadoopCluster);
throw new MDStoreManagerException("Invalid Haddop Cluster: " + hadoopCluster);
}
return conf;
}
public String getHadoopCluster() {
return hadoopCluster;
}
public void setHadoopCluster(final String hadoopCluster) {
this.hadoopCluster = hadoopCluster;
}
public String getHadoopUser() {
return hadoopUser;
}
public void setHadoopUser(final String hadoopUser) {
this.hadoopUser = hadoopUser;
}
}

View File

@ -0,0 +1,317 @@
package eu.dnetlib.data.mdstore.utils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.client.RestTemplate;
import eu.dnetlib.data.mdstore.manager.exceptions.MDStoreManagerException;
import eu.dnetlib.data.mdstore.zeppelin.HasStatus;
import eu.dnetlib.data.mdstore.zeppelin.ListResponse;
import eu.dnetlib.data.mdstore.zeppelin.Note;
import eu.dnetlib.data.mdstore.zeppelin.Paragraph;
import eu.dnetlib.data.mdstore.zeppelin.SimpleResponse;
import eu.dnetlib.data.mdstore.zeppelin.StringResponse;
import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo;
@Component
public class ZeppelinClient {
@Value("${dhp.mdstore-manager.hadoop.zeppelin.login}")
private String zeppelinLogin;
@Value("${dhp.mdstore-manager.hadoop.zeppelin.password}")
private String zeppelinPassword;
@Value("${dhp.mdstore-manager.hadoop.zeppelin.base-url}")
private String zeppelinBaseUrl;
@Value("${dhp.mdstore-manager.hadoop.zeppelin.name-prefix}")
private String zeppelinNamePrefix;
private static final Log log = LogFactory.getLog(ZeppelinClient.class);
private static final Map<String, List<String>> DEFAULT_RIGHTS = new LinkedHashMap<>();
private static final Integer MAX_NUMBER_OF_MD_NOTES = 2;
@PostConstruct
public void init() {
DEFAULT_RIGHTS.put("owners", Arrays.asList(zeppelinLogin));
DEFAULT_RIGHTS.put("readers", new ArrayList<>()); // ALL
DEFAULT_RIGHTS.put("runners", new ArrayList<>()); // ALL
DEFAULT_RIGHTS.put("writers", new ArrayList<>()); // ALL
}
private String jsessionid;
public String zeppelinNote(final String note, final MDStoreWithInfo mdstore, final String currentVersionPath) throws MDStoreManagerException {
if (notConfigured()) { throw new MDStoreManagerException("A zeppelin property is empty"); }
final String newName =
StringUtils.join(Arrays.asList(zeppelinNamePrefix, "notes", mdstore.getDatasourceName().replaceAll("/", "-"), mdstore.getApiId()
.replaceAll("/", "-"), note.replaceAll("/", "-"), mdstore.getCurrentVersion().replaceAll("/", "-")), "/");
final List<Map<String, String>> notes = listNotes();
final Optional<String> oldNoteId = notes.stream()
.filter(Objects::nonNull)
.filter(map -> newName.equals(map.get("name")))
.map(map -> map.get("id"))
.findFirst();
if (oldNoteId.isPresent()) {
log.debug("Returning existing note: " + oldNoteId.get());
return zeppelinBaseUrl + "/#/notebook/" + oldNoteId.get();
}
final String templateName = zeppelinNamePrefix + "/templates/" + note;
final String templateNoteId = notes.stream()
.filter(map -> map.get("name").equals(templateName))
.map(map -> map.get("id"))
.findFirst()
.orElseThrow(() -> new MDStoreManagerException("Template Note not found: " + templateName));
final String newId = cloneNote(templateNoteId, newName, mdstore, currentVersionPath);
return zeppelinBaseUrl + "/#/notebook/" + newId;
}
public List<String> listTemplates() {
final String prefix = zeppelinNamePrefix + "/templates/";
if (notConfigured()) {
return new ArrayList<>();
} else {
return listNotes().stream()
.map(map -> map.get("name"))
.filter(s -> s.startsWith(prefix))
.map(s -> StringUtils.substringAfter(s, prefix))
.sorted()
.collect(Collectors.toList());
}
}
private List<Map<String, String>> listNotes() {
return callApi(HttpMethod.GET, "notebook", ListResponse.class, null).getBody();
}
private String cloneNote(final String noteId, final String newName, final MDStoreWithInfo mdstore, final String currentVersionPath)
throws MDStoreManagerException {
final String newId = callApi(HttpMethod.POST, "notebook/" + noteId, StringResponse.class, new Note(newName)).getBody();
callApi(HttpMethod.POST, "notebook/" + newId + "/paragraph", StringResponse.class, confParagraph(mdstore, currentVersionPath)).getBody();
callApi(HttpMethod.PUT, "notebook/" + newId + "/permissions", SimpleResponse.class, DEFAULT_RIGHTS);
log.info("New note created, id: " + newId + ", name: " + newName);
return newId;
}
private Paragraph confParagraph(final MDStoreWithInfo mdstore, final String currentVersionPath) throws MDStoreManagerException {
try {
final String code = IOUtils.toString(getClass().getResourceAsStream("/zeppelin/paragraph_conf.tmpl"), StandardCharsets.UTF_8)
.replaceAll("__DS_NAME__", StringEscapeUtils.escapeJava(mdstore.getDatasourceName()))
.replaceAll("__DS_ID__", StringEscapeUtils.escapeJava(mdstore.getDatasourceId()))
.replaceAll("__API_ID__", StringEscapeUtils.escapeJava(mdstore.getApiId()))
.replaceAll("__MDSTORE_ID__", mdstore.getId())
.replaceAll("__VERSION__", mdstore.getCurrentVersion())
.replaceAll("__PATH__", currentVersionPath);
return new Paragraph("Configuration", code, 0);
} catch (final IOException e) {
log.error("Error preparing configuration paragraph", e);
throw new MDStoreManagerException("Error preparing configuration paragraph", e);
}
}
@Scheduled(fixedRate = 12 * 60 * 60 * 1000) // 12 hours
public void cleanExpiredNotes() {
if (notConfigured()) { return; }
try {
// I sort the notes according to the version datestamp (more recent first)
final List<Map<String, String>> notes = listNotes()
.stream()
.filter(n -> n.get("name").startsWith(zeppelinNamePrefix + "/notes/"))
.sorted((o1, o2) -> StringUtils.compare(o2.get("name"), o1.get("name")))
.collect(Collectors.toList());
final Map<String, Integer> map = new HashMap<>();
for (final Map<String, String> n : notes) {
final String firstPart = StringUtils.substringBeforeLast(n.get("name"), "-");
if (!map.containsKey(firstPart)) {
log.debug("Evaluating note " + n.get("name") + " for deletion: CONFIRMED");
map.put(firstPart, 1);
} else if (map.get(firstPart) < MAX_NUMBER_OF_MD_NOTES) {
log.debug("Evaluating note " + n.get("name") + " for deletion: CONFIRMED");
map.put(firstPart, map.get(firstPart) + 1);
} else {
log.debug("Evaluating note " + n.get("name") + " for deletion: TO_DELETE");
callApi(HttpMethod.DELETE, "notebook/" + n.get("id"), SimpleResponse.class, null);
}
}
} catch (final Exception e) {
log.error("Error cleaning expired notes", e);
}
}
private <T extends HasStatus> T callApi(final HttpMethod method, final String api, final Class<T> resClazz, final Object objRequest) {
if (jsessionid == null) {
final T res = findNewJsessionId(method, api, resClazz, objRequest);
if (res != null) { return res; }
} else {
try {
return callApi(method, api, resClazz, objRequest, jsessionid);
} catch (final MDStoreManagerException e) {
final T res = findNewJsessionId(method, api, resClazz, objRequest);
if (res != null) { return res; }
}
}
throw new RuntimeException("All attempted calls are failed");
}
@SuppressWarnings("unchecked")
private <T extends HasStatus> T callApi(final HttpMethod method,
final String api,
final Class<T> resClazz,
final Object objRequest,
final String jsessionid)
throws MDStoreManagerException {
final String url = String.format("%s/api/%s;JSESSIONID=%s", zeppelinBaseUrl, api, jsessionid);
final RestTemplate restTemplate = new RestTemplate();
ResponseEntity<T> res = null;
switch (method) {
case GET:
log.debug("Performing GET: " + url);
res = restTemplate.getForEntity(url, resClazz);
break;
case POST:
log.debug("Performing POST: " + url);
res = restTemplate.postForEntity(url, objRequest, resClazz);
break;
case PUT:
log.debug("Performing PUT: " + url);
restTemplate.put(url, objRequest);
break;
case DELETE:
log.debug("Performing DELETE: " + url);
restTemplate.delete(url);
break;
default:
throw new RuntimeException("Unsupported method: " + method);
}
if (method == HttpMethod.PUT || method == HttpMethod.DELETE) {
return (T) new SimpleResponse("OK");
} else if (res == null) {
log.error("NULL response from the API");
throw new MDStoreManagerException("NULL response from the API");
} else if (res.getStatusCode() != HttpStatus.OK) {
log.error("Zeppelin API failed with HTTP error: " + res);
throw new MDStoreManagerException("Zeppelin API failed with HTTP error: " + res);
} else if (res.getBody() == null) {
log.error("Zeppelin API returned a null response");
throw new MDStoreManagerException("Zeppelin API returned a null response");
} else if (!res.getBody().getStatus().equals("OK")) {
log.error("Zeppelin API Operation failed: " + res.getBody());
throw new MDStoreManagerException("Registration of zeppelin note failed: " + res.getBody());
} else {
return res.getBody();
}
}
private <T extends HasStatus> T findNewJsessionId(final HttpMethod method, final String api, final Class<T> resClazz, final Object objRequest) {
for (final String id : obtainJsessionIDs()) {
try {
final T res = callApi(method, api, resClazz, objRequest, id);
setJsessionid(id);
return res;
} catch (final MDStoreManagerException e) {
log.warn("Skipping invalid jsessionid: " + id);
}
}
return null;
}
private Set<String> obtainJsessionIDs() {
final HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
final MultiValueMap<String, String> map = new LinkedMultiValueMap<>();
map.add("userName", zeppelinLogin);
map.add("password", zeppelinPassword);
final HttpEntity<MultiValueMap<String, String>> request = new HttpEntity<>(map, headers);
final String url = zeppelinBaseUrl + "/api/login";
final ResponseEntity<?> res = new RestTemplate().postForEntity(url, request, Object.class);
if (res.getStatusCode() != HttpStatus.OK) {
log.error("Zeppelin API: login failed with HTTP error: " + res);
throw new RuntimeException("Zeppelin API: login failed with HTTP error: " + res);
} else if (!res.getHeaders().containsKey(HttpHeaders.SET_COOKIE)) {
log.error("Zeppelin API: login failed (missing SET_COOKIE header)");
throw new RuntimeException("Zeppelin API: login failed (missing SET_COOKIE header)");
} else {
return res.getHeaders()
.get(HttpHeaders.SET_COOKIE)
.stream()
.map(s -> s.split(";"))
.flatMap(Arrays::stream)
.map(String::trim)
.filter(s -> s.startsWith("JSESSIONID="))
.map(s -> StringUtils.removeStart(s, "JSESSIONID="))
.filter(s -> !s.equalsIgnoreCase("deleteMe"))
.collect(Collectors.toSet());
}
}
public String getJsessionid() {
return jsessionid;
}
public void setJsessionid(final String jsessionid) {
this.jsessionid = jsessionid;
}
private boolean notConfigured() {
return StringUtils.isAnyBlank(zeppelinBaseUrl, zeppelinLogin, zeppelinPassword, zeppelinNamePrefix);
}
}

View File

@ -0,0 +1,6 @@
package eu.dnetlib.data.mdstore.zeppelin;
public interface HasStatus {
String getStatus();
}

View File

@ -0,0 +1,41 @@
package eu.dnetlib.data.mdstore.zeppelin;
import java.util.List;
import java.util.Map;
public class ListResponse implements HasStatus {
private String status;
private String message;
private List<Map<String, String>> body;
@Override
public String getStatus() {
return status;
}
public void setStatus(final String status) {
this.status = status;
}
public String getMessage() {
return message;
}
public void setMessage(final String message) {
this.message = message;
}
public List<Map<String, String>> getBody() {
return body;
}
public void setBody(final List<Map<String, String>> body) {
this.body = body;
}
@Override
public String toString() {
return String.format("Response [status=%s, message=%s, body=%s]", status, message, body);
}
}

View File

@ -0,0 +1,39 @@
package eu.dnetlib.data.mdstore.zeppelin;
import java.util.Map;
public class MapResponse {
private String status;
private String message;
private Map<String, String> body;
public String getStatus() {
return status;
}
public void setStatus(final String status) {
this.status = status;
}
public String getMessage() {
return message;
}
public void setMessage(final String message) {
this.message = message;
}
public Map<String, String> getBody() {
return body;
}
public void setBody(final Map<String, String> body) {
this.body = body;
}
@Override
public String toString() {
return String.format("Response [status=%s, message=%s, body=%s]", status, message, body);
}
}

View File

@ -0,0 +1,21 @@
package eu.dnetlib.data.mdstore.zeppelin;
public class Note {
private String name;
public Note() {}
public Note(final String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(final String name) {
this.name = name;
}
}

View File

@ -0,0 +1,56 @@
package eu.dnetlib.data.mdstore.zeppelin;
import java.util.LinkedHashMap;
import java.util.Map;
public class Paragraph {
private String title;
private String text;
private int index = 0;
private Map<String, Object> config = new LinkedHashMap<>();
public Paragraph() {}
public Paragraph(final String title, final String text, final int index) {
this.title = title;
this.text = text;
this.index = index;
this.config.put("title", true);
this.config.put("enabled", true);
this.config.put("editorHide", true);
}
public String getTitle() {
return title;
}
public void setTitle(final String title) {
this.title = title;
}
public String getText() {
return text;
}
public void setText(final String text) {
this.text = text;
}
public int getIndex() {
return index;
}
public void setIndex(final int index) {
this.index = index;
}
public Map<String, Object> getConfig() {
return config;
}
public void setConfig(final Map<String, Object> config) {
this.config = config;
}
}

View File

@ -0,0 +1,16 @@
package eu.dnetlib.data.mdstore.zeppelin;
public class SimpleResponse implements HasStatus {
private final String status;
public SimpleResponse(final String status) {
this.status = status;
}
@Override
public String getStatus() {
return status;
}
}

View File

@ -0,0 +1,38 @@
package eu.dnetlib.data.mdstore.zeppelin;
public class StringResponse implements HasStatus {
private String status;
private String message;
private String body;
@Override
public String getStatus() {
return status;
}
public void setStatus(final String status) {
this.status = status;
}
public String getMessage() {
return message;
}
public void setMessage(final String message) {
this.message = message;
}
public String getBody() {
return body;
}
public void setBody(final String body) {
this.body = body;
}
@Override
public String toString() {
return String.format("Response [status=%s, message=%s, body=%s]", status, message, body);
}
}

View File

@ -20,7 +20,7 @@ import org.springframework.transaction.annotation.EnableTransactionManagement;
@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(entityManagerFactoryRef = "mainEntityManagerFactory", transactionManagerRef = "mainTransactionManager", basePackages = {
"eu.dnetlib.data.is"
"eu.dnetlib.data.is", "eu.dnetlib.data.mdstore"
})
public class MainDBConfig {
@ -38,7 +38,7 @@ public class MainDBConfig {
@Qualifier("mainDataSource") final DataSource ds) {
return builder
.dataSource(ds)
.packages("eu.dnetlib.data.is")
.packages("eu.dnetlib.data.is", "eu.dnetlib.dhp.schema.mdstore")
.persistenceUnit("is")
.build();
}

View File

@ -43,3 +43,21 @@ spring.jpa.database=default
openaire.api.enable.dsm = true
# Hadoop
dhp.mdstore-manager.hadoop.cluster = GARR
dhp.mdstore-manager.hdfs.base-path = /data/dnet.dev/mdstore
dhp.mdstore-manager.hadoop.user = dnet.dev
#dhp.mdstore-manager.hadoop.zeppelin.base-url = https://iis-cdh5-test-gw.ocean.icm.edu.pl/zeppelin
#dhp.mdstore-manager.hadoop.zeppelin.login =
#dhp.mdstore-manager.hadoop.zeppelin.password =
dhp.mdstore-manager.hadoop.zeppelin.base-url = https://hadoop-zeppelin.garr-pa1.d4science.org
dhp.mdstore-manager.hadoop.zeppelin.login =
dhp.mdstore-manager.hadoop.zeppelin.password =
dhp.mdstore-manager.hadoop.zeppelin.name-prefix = mdstoreManager
dhp.mdstore-manager.inspector.records.max = 1000

View File

@ -0,0 +1,149 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--Autogenerated by Cloudera Manager-->
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://hadoop-rm1.garr-pa1.d4science.org:8020</value>
</property>
<property>
<name>fs.trash.interval</name>
<value>1440</value>
</property>
<property>
<name>fs.trash.checkpoint.interval</name>
<value>60</value>
</property>
<property>
<name>net.topology.script.file.name</name>
<value>{{CMF_CONF_DIR}}/topology.py</value>
</property>
<property>
<name>hadoop.security.authentication</name>
<value>simple</value>
</property>
<property>
<name>hadoop.security.authorization</name>
<value>false</value>
</property>
<property>
<name>hadoop.rpc.protection</name>
<value>authentication</value>
</property>
<property>
<name>hadoop.ssl.enabled</name>
<value>false</value>
</property>
<property>
<name>hadoop.ssl.require.client.cert</name>
<value>false</value>
<final>true</final>
</property>
<property>
<name>hadoop.ssl.keystores.factory.class</name>
<value>org.apache.hadoop.security.ssl.FileBasedKeyStoresFactory</value>
<final>true</final>
</property>
<property>
<name>hadoop.ssl.server.conf</name>
<value>ssl-server.xml</value>
<final>true</final>
</property>
<property>
<name>hadoop.ssl.client.conf</name>
<value>ssl-client.xml</value>
<final>true</final>
</property>
<property>
<name>hadoop.security.auth_to_local</name>
<value>DEFAULT</value>
</property>
<property>
<name>hadoop.proxyuser.oozie.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.oozie.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.mapred.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.mapred.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.flume.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.flume.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.HTTP.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.HTTP.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hive.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hive.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hue.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hue.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.httpfs.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.httpfs.groups</name>
<value>*</value>
</property>
<property>
<name>hue.kerberos.principal.shortname</name>
<value>hue</value>
</property>
<property>
<name>hadoop.proxyuser.hdfs.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hdfs.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.yarn.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.yarn.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.security.group.mapping</name>
<value>org.apache.hadoop.security.ShellBasedUnixGroupsMapping</value>
</property>
<property>
<name>hadoop.security.instrumentation.requires.admin</name>
<value>false</value>
</property>
<property>
<name>hadoop.http.logs.enabled</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,217 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--Autogenerated by Cloudera Manager-->
<configuration>
<property>
<name>dfs.hosts</name>
<value>{{CMF_CONF_DIR}}/dfs_all_hosts.txt</value>
</property>
<property>
<name>dfs.namenode.hosts.provider.classname</name>
<value>org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>file:///dfs/nn</value>
</property>
<property>
<name>dfs.namenode.servicerpc-address</name>
<value>hadoop-rm1.garr-pa1.d4science.org:8022</value>
</property>
<property>
<name>dfs.namenode.rpc-address</name>
<value>hadoop-rm1.garr-pa1.d4science.org:8020</value>
</property>
<property>
<name>dfs.https.address</name>
<value>hadoop-rm1.garr-pa1.d4science.org:50470</value>
</property>
<property>
<name>dfs.https.port</name>
<value>50470</value>
</property>
<property>
<name>dfs.namenode.http-address</name>
<value>hadoop-rm1.garr-pa1.d4science.org:50070</value>
</property>
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>hadoop-rm2.garr-pa1.d4science.org:50090</value>
</property>
<property>
<name>dfs.permissions.superusergroup</name>
<value>supergroup</value>
</property>
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
<property>
<name>dfs.namenode.replication.min</name>
<value>1</value>
</property>
<property>
<name>dfs.replication.max</name>
<value>512</value>
</property>
<property>
<name>dfs.namenode.maintenance.replication.min</name>
<value>1</value>
</property>
<property>
<name>dfs.blocksize</name>
<value>134217728</value>
</property>
<property>
<name>dfs.image.transfer.timeout</name>
<value>60000</value>
</property>
<property>
<name>dfs.image.transfer.bandwidthPerSec</name>
<value>0</value>
</property>
<property>
<name>dfs.namenode.plugins</name>
<value></value>
</property>
<property>
<name>dfs.namenode.handler.count</name>
<value>59</value>
</property>
<property>
<name>dfs.namenode.service.handler.count</name>
<value>59</value>
</property>
<property>
<name>dfs.namenode.name.dir.restore</name>
<value>false</value>
</property>
<property>
<name>dfs.thrift.threads.max</name>
<value>20</value>
</property>
<property>
<name>dfs.thrift.threads.min</name>
<value>10</value>
</property>
<property>
<name>dfs.thrift.timeout</name>
<value>60</value>
</property>
<property>
<name>dfs.webhdfs.enabled</name>
<value>true</value>
</property>
<property>
<name>dfs.permissions</name>
<value>true</value>
</property>
<property>
<name>dfs.namenode.safemode.threshold-pct</name>
<value>0.999</value>
</property>
<property>
<name>dfs.namenode.invalidate.work.pct.per.iteration</name>
<value>0.32</value>
</property>
<property>
<name>dfs.namenode.replication.work.multiplier.per.iteration</name>
<value>10</value>
</property>
<property>
<name>dfs.namenode.replication.max-streams</name>
<value>20</value>
</property>
<property>
<name>dfs.namenode.replication.max-streams-hard-limit</name>
<value>40</value>
</property>
<property>
<name>dfs.namenode.avoid.read.stale.datanode</name>
<value>false</value>
</property>
<property>
<name>dfs.namenode.snapshot.capture.openfiles</name>
<value>false</value>
</property>
<property>
<name>dfs.namenode.avoid.write.stale.datanode</name>
<value>false</value>
</property>
<property>
<name>dfs.namenode.stale.datanode.interval</name>
<value>30000</value>
</property>
<property>
<name>dfs.namenode.write.stale.datanode.ratio</name>
<value>0.5</value>
</property>
<property>
<name>dfs.namenode.safemode.min.datanodes</name>
<value>1</value>
</property>
<property>
<name>dfs.namenode.safemode.extension</name>
<value>30000</value>
</property>
<property>
<name>dfs.client.use.datanode.hostname</name>
<value>false</value>
</property>
<property>
<name>fs.permissions.umask-mode</name>
<value>022</value>
</property>
<property>
<name>dfs.encrypt.data.transfer</name>
<value>false</value>
</property>
<property>
<name>dfs.encrypt.data.transfer.algorithm</name>
<value>rc4</value>
</property>
<property>
<name>dfs.namenode.acls.enabled</name>
<value>false</value>
</property>
<property>
<name>dfs.access.time.precision</name>
<value>3600000</value>
</property>
<property>
<name>dfs.qjournal.write-txns.timeout.ms</name>
<value>20000</value>
</property>
<property>
<name>dfs.qjournal.start-segment.timeout.ms</name>
<value>20000</value>
</property>
<property>
<name>dfs.qjournal.prepare-recovery.timeout.ms</name>
<value>120000</value>
</property>
<property>
<name>dfs.qjournal.accept-recovery.timeout.ms</name>
<value>120000</value>
</property>
<property>
<name>dfs.qjournal.finalize-segment.timeout.ms</name>
<value>120000</value>
</property>
<property>
<name>dfs.qjournal.select-input-streams.timeout.ms</name>
<value>20000</value>
</property>
<property>
<name>dfs.qjournal.get-journal-state.timeout.ms</name>
<value>120000</value>
</property>
<property>
<name>dfs.qjournal.new-epoch.timeout.ms</name>
<value>120000</value>
</property>
<property>
<name>dfs.datanode.hdfs-blocks-metadata.enabled</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,145 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--Autogenerated by Cloudera Manager-->
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>fs.trash.interval</name>
<value>1</value>
</property>
<property>
<name>io.compression.codecs</name>
<value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.DeflateCodec,org.apache.hadoop.io.compress.SnappyCodec,org.apache.hadoop.io.compress.Lz4Codec</value>
</property>
<property>
<name>hadoop.security.authentication</name>
<value>simple</value>
</property>
<property>
<name>hadoop.security.authorization</name>
<value>false</value>
</property>
<property>
<name>hadoop.rpc.protection</name>
<value>authentication</value>
</property>
<property>
<name>hadoop.security.auth_to_local</name>
<value>DEFAULT</value>
</property>
<property>
<name>hadoop.proxyuser.oozie.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.oozie.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.mapred.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.mapred.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.flume.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.flume.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.HTTP.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.HTTP.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hive.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hive.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hue.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hue.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.httpfs.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.httpfs.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hdfs.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hdfs.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.yarn.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.yarn.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.security.group.mapping</name>
<value>org.apache.hadoop.security.ShellBasedUnixGroupsMapping</value>
</property>
<property>
<name>hadoop.security.instrumentation.requires.admin</name>
<value>false</value>
</property>
<property>
<name>net.topology.script.file.name</name>
<value>/etc/hadoop/conf.cloudera.yarn2/topology.py</value>
</property>
<property>
<name>io.file.buffer.size</name>
<value>65536</value>
</property>
<property>
<name>hadoop.ssl.enabled</name>
<value>false</value>
</property>
<property>
<name>hadoop.ssl.require.client.cert</name>
<value>false</value>
<final>true</final>
</property>
<property>
<name>hadoop.ssl.keystores.factory.class</name>
<value>org.apache.hadoop.security.ssl.FileBasedKeyStoresFactory</value>
<final>true</final>
</property>
<property>
<name>hadoop.ssl.server.conf</name>
<value>ssl-server.xml</value>
<final>true</final>
</property>
<property>
<name>hadoop.ssl.client.conf</name>
<value>ssl-client.xml</value>
<final>true</final>
</property>
</configuration>

View File

@ -0,0 +1,101 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--Autogenerated by Cloudera Manager-->
<configuration>
<property>
<name>dfs.nameservices</name>
<value>nameservice1</value>
</property>
<property>
<name>dfs.client.failover.proxy.provider.nameservice1</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
<property>
<name>dfs.ha.automatic-failover.enabled.nameservice1</name>
<value>true</value>
</property>
<property>
<name>ha.zookeeper.quorum</name>
<value>iis-cdh5-test-m1.ocean.icm.edu.pl:2181,iis-cdh5-test-m2.ocean.icm.edu.pl:2181,iis-cdh5-test-m3.ocean.icm.edu.pl:2181</value>
</property>
<property>
<name>dfs.ha.namenodes.nameservice1</name>
<value>namenode528,namenode434</value>
</property>
<property>
<name>dfs.namenode.rpc-address.nameservice1.namenode528</name>
<value>iis-cdh5-test-m1.ocean.icm.edu.pl:8020</value>
</property>
<property>
<name>dfs.namenode.servicerpc-address.nameservice1.namenode528</name>
<value>iis-cdh5-test-m1.ocean.icm.edu.pl:8022</value>
</property>
<property>
<name>dfs.namenode.http-address.nameservice1.namenode528</name>
<value>iis-cdh5-test-m1.ocean.icm.edu.pl:50070</value>
</property>
<property>
<name>dfs.namenode.https-address.nameservice1.namenode528</name>
<value>iis-cdh5-test-m1.ocean.icm.edu.pl:50470</value>
</property>
<property>
<name>dfs.namenode.rpc-address.nameservice1.namenode434</name>
<value>iis-cdh5-test-m2.ocean.icm.edu.pl:8020</value>
</property>
<property>
<name>dfs.namenode.servicerpc-address.nameservice1.namenode434</name>
<value>iis-cdh5-test-m2.ocean.icm.edu.pl:8022</value>
</property>
<property>
<name>dfs.namenode.http-address.nameservice1.namenode434</name>
<value>iis-cdh5-test-m2.ocean.icm.edu.pl:50070</value>
</property>
<property>
<name>dfs.namenode.https-address.nameservice1.namenode434</name>
<value>iis-cdh5-test-m2.ocean.icm.edu.pl:50470</value>
</property>
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
<property>
<name>dfs.blocksize</name>
<value>134217728</value>
</property>
<property>
<name>dfs.client.use.datanode.hostname</name>
<value>false</value>
</property>
<property>
<name>fs.permissions.umask-mode</name>
<value>022</value>
</property>
<property>
<name>dfs.namenode.acls.enabled</name>
<value>false</value>
</property>
<property>
<name>dfs.client.use.legacy.blockreader</name>
<value>false</value>
</property>
<property>
<name>dfs.client.read.shortcircuit</name>
<value>false</value>
</property>
<property>
<name>dfs.domain.socket.path</name>
<value>/var/run/hdfs-sockets/dn</value>
</property>
<property>
<name>dfs.client.read.shortcircuit.skip.checksum</name>
<value>false</value>
</property>
<property>
<name>dfs.client.domain.socket.data.traffic</name>
<value>false</value>
</property>
<property>
<name>dfs.datanode.hdfs-blocks-metadata.enabled</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,8 @@
%spark
val dsName = "__DS_NAME__"
val dsId = "__DS_ID__"
val apiId = "__API_ID__"
val mdId = "__MDSTORE_ID__"
val mdVersion = "__VERSION__"
val path = "__PATH__"

View File

@ -28,6 +28,10 @@
<groupId>com.vladmihalcea</groupId>
<artifactId>hibernate-types-52</artifactId>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-schemas</artifactId>
</dependency>
<!-- Tests -->
<dependency>

View File

@ -0,0 +1,12 @@
package eu.dnetlib.data.mdstore.repository;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
import eu.dnetlib.dhp.schema.mdstore.MDStoreCurrentVersion;
@Repository
public interface MDStoreCurrentVersionRepository extends JpaRepository<MDStoreCurrentVersion, String> {
long countByCurrentVersion(String versionId);
}

View File

@ -0,0 +1,11 @@
package eu.dnetlib.data.mdstore.repository;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
import eu.dnetlib.dhp.schema.mdstore.MDStore;
@Repository
public interface MDStoreRepository extends JpaRepository<MDStore, String> {
}

View File

@ -0,0 +1,19 @@
package eu.dnetlib.data.mdstore.repository;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion;
@Repository
public interface MDStoreVersionRepository extends JpaRepository<MDStoreVersion, String> {
void deleteByMdstore(String id);
long countByMdstoreAndWriting(String id, boolean b);
long countByMdstoreAndReadCountGreaterThan(String id, int count);
Iterable<MDStoreVersion> findByMdstore(String mdId);
}

View File

@ -0,0 +1,11 @@
package eu.dnetlib.data.mdstore.repository;
import org.springframework.stereotype.Repository;
import eu.dnetlib.data.is.common.ReadOnlyRepository;
import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo;
@Repository
public interface MDStoreWithInfoRepository extends ReadOnlyRepository<MDStoreWithInfo, String> {
}

View File

@ -178,3 +178,63 @@ CREATE VIEW resource_types_view AS (
false AS simple
FROM protocols
);
CREATE TABLE mdstores (
id text PRIMARY KEY,
format text,
layout text,
interpretation text,
datasource_name text,
datasource_id text,
api_id text,
creation_date timestamp,
hdfs_path text
);
CREATE TABLE mdstore_versions (
id text PRIMARY KEY,
mdstore text REFERENCES mdstores(id),
writing boolean,
readcount int,
lastupdate timestamp,
size bigint,
hdfs_path text
);
CREATE TABLE mdstore_current_versions (
mdstore text PRIMARY KEY REFERENCES mdstores(id),
current_version text REFERENCES mdstore_versions(id)
);
CREATE VIEW mdstores_with_info AS SELECT
md.id AS id,
md.format AS format,
md.layout AS layout,
md.interpretation AS interpretation,
md.datasource_name AS datasource_name,
md.datasource_id AS datasource_id,
md.api_id AS api_id,
md.hdfs_path as hdfs_path,
md.creation_date as creation_date,
cv.current_version AS current_version,
v1.lastupdate AS lastupdate,
v1.size AS size,
count(v2.id) AS n_versions
FROM
mdstores md
LEFT OUTER JOIN mdstore_current_versions cv ON (md.id = cv.mdstore)
LEFT OUTER JOIN mdstore_versions v1 ON (cv.current_version = v1.id)
LEFT OUTER JOIN mdstore_versions v2 ON (md.id = v2.mdstore)
GROUP BY md.id,
md.format,
md.layout,
md.interpretation,
md.datasource_name,
md.datasource_id,
md.hdfs_path,
md.creation_date,
md.api_id,
cv.current_version,
v1.lastupdate,
v1.size;