From 43e0386b788203db19dfa36bfa3c3841c975c83e Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Thu, 18 Feb 2021 14:28:04 +0100 Subject: [PATCH] parquet clinet --- apps/dhp-mdstore-manager/pom.xml | 12 ++- .../manager/controller/MDStoreController.java | 14 +++ .../mdstore/manager/utils/HdfsClient.java | 101 +++++++++++++++++- .../mdstore/manager/utils/HdfsClientTest.java | 54 ++++++++++ 4 files changed, 176 insertions(+), 5 deletions(-) create mode 100644 apps/dhp-mdstore-manager/src/test/java/eu/dnetlib/data/mdstore/manager/utils/HdfsClientTest.java diff --git a/apps/dhp-mdstore-manager/pom.xml b/apps/dhp-mdstore-manager/pom.xml index 7ca43206..396b63d6 100644 --- a/apps/dhp-mdstore-manager/pom.xml +++ b/apps/dhp-mdstore-manager/pom.xml @@ -36,7 +36,6 @@ commons-io commons-io - org.apache.hadoop @@ -57,7 +56,16 @@ - + + com.twitter + parquet-hadoop + 1.5.0-cdh5.13.3 + + + com.twitter + parquet-avro + 1.5.0-cdh5.13.3 + eu.dnetlib.dhp diff --git a/apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/controller/MDStoreController.java b/apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/controller/MDStoreController.java index 24368433..1cd6b3a4 100644 --- a/apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/controller/MDStoreController.java +++ b/apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/controller/MDStoreController.java @@ -201,6 +201,20 @@ public class MDStoreController extends AbstractDnetController { return info; } + @ApiOperation("list the file inside the path of a mdstore version") + @GetMapping("/version/{versionId}/parquet/files") + public Set listVersionFiles(@PathVariable final String versionId) throws MDStoreManagerException { + final String path = databaseUtils.findVersion(versionId).getHdfsPath(); + return hdfsClient.listContent(path + "/store", HdfsClient::isParquetFile); + } + + @ApiOperation("read the parquet file of a mdstore version") + @GetMapping("/version/{versionId}/parquet/content/{limit}") + public List> listVersionParquet(@PathVariable final String versionId, @PathVariable final long limit) throws MDStoreManagerException { + final String path = databaseUtils.findVersion(versionId).getHdfsPath(); + return hdfsClient.readParquetFiles(path + "/store", limit); + } + protected void setDatabaseUtils(final DatabaseUtils databaseUtils) { this.databaseUtils = databaseUtils; } diff --git a/apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/utils/HdfsClient.java b/apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/utils/HdfsClient.java index 41a6a477..683e63a8 100644 --- a/apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/utils/HdfsClient.java +++ b/apps/dhp-mdstore-manager/src/main/java/eu/dnetlib/data/mdstore/manager/utils/HdfsClient.java @@ -1,9 +1,15 @@ package eu.dnetlib.data.mdstore.manager.utils; import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.function.Predicate; +import org.apache.avro.generic.GenericRecord; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -14,6 +20,8 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import eu.dnetlib.data.mdstore.manager.exceptions.MDStoreManagerException; +import parquet.avro.AvroParquetReader; +import parquet.hadoop.ParquetReader; @Component public class HdfsClient { @@ -45,10 +53,10 @@ public class HdfsClient { try (final FileSystem fs = FileSystem.get(conf())) { for (final FileStatus mdDir : fs.listStatus(new Path(hdfsBasePath))) { - if (isValidDir(mdDir)) { + if (isMdStoreOrVersionDir(mdDir)) { res.add(String.format("%s/%s", hdfsBasePath, mdDir.getPath().getName())); for (final FileStatus verDir : fs.listStatus(mdDir.getPath())) { - if (isValidDir(verDir)) { + if (isMdStoreOrVersionDir(verDir)) { res.add(String.format("%s/%s/%s", hdfsBasePath, mdDir.getPath().getName(), verDir.getPath().getName())); } } @@ -61,7 +69,94 @@ public class HdfsClient { return res; } - private boolean isValidDir(final FileStatus fileStatus) { + public Set listContent(final String path, final Predicate condition) { + final Set res = new LinkedHashSet<>(); + try (final FileSystem fs = FileSystem.get(conf())) { + for (final FileStatus f : fs.listStatus(new Path(path))) { + if (condition.test(f)) { + res.add(String.format("%s/%s", path, f.getPath().getName())); + } + } + } catch (final Exception e) { + log.error("Error Listing path: " + path, e); + } + return res; + } + + @SuppressWarnings("unchecked") + public List> readParquetFiles(final String path, final long limit) throws MDStoreManagerException { + + final List> list = new ArrayList<>(); + + final Configuration conf = conf(); + + long i = 0; + + final Set fields = new LinkedHashSet<>(); + + for (final String f : listContent(path, HdfsClient::isParquetFile)) { + if (i < limit) { + log.info("Opening parquet file: " + f); + + try (final ParquetReader reader = AvroParquetReader. builder(new Path(f)).withConf(conf).build()) { + log.debug("File parquet OPENED"); + + GenericRecord rec = null; + while (i++ < limit && (rec = reader.read()) != null) { + if (fields.isEmpty()) { + rec.getSchema().getFields().forEach(field -> fields.add(field.name())); + log.debug("Found schema: " + fields); + } + final Map map = new LinkedHashMap<>(); + for (final String field : fields) { + map.put(field, rec.get(field)); + } + list.add(map); + log.debug("added record"); + } + } catch (final Throwable e) { + log.error("Error reading parquet file: " + f, e); + throw new MDStoreManagerException("Error reading parquet file: " + f, e); + } + } + } + return list; + } + + /* + * + * private String printGroup(final Group g) { final StringWriter sw = new StringWriter(); + * + * final int fieldCount = g.getType().getFieldCount(); for (int field = 0; field < fieldCount; field++) { final int valueCount = + * g.getFieldRepetitionCount(field); + * + * final Type fieldType = g.getType().getType(field); final String fieldName = fieldType.getName(); + * + * for (int index = 0; index < valueCount; index++) { if (fieldType.isPrimitive()) { sw.append(fieldName + " " + + * g.getValueToString(field, index)); sw.append("\n"); } } } return sw.toString(); } + * + * public List readParquetFile(final String file, final long n) throws MDStoreManagerException { + * + * final Configuration conf = conf(); final Path path = new Path(file); final List list = new ArrayList<>(); try { final + * ParquetMetadata readFooter = ParquetFileReader.readFooter(conf, path, ParquetMetadataConverter.NO_FILTER); final MessageType schema = + * readFooter.getFileMetaData().getSchema(); final ParquetFileReader r = new ParquetFileReader(conf, path, readFooter); + * + * PageReadStore pages = null; try { while (null != (pages = r.readNextRowGroup())) { final long rows = pages.getRowCount(); + * System.out.println("Number of rows: " + rows); + * + * final MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema); final RecordReader recordReader = + * columnIO.getRecordReader(pages, new GroupRecordConverter(schema)); for (int i = 0; i < rows; i++) { final Group g = + * recordReader.read(); list.add(printGroup(g)); } } } finally { r.close(); } } catch (final IOException e) { + * System.out.println("Error reading parquet file."); e.printStackTrace(); } return list; } + * + * + */ + + public static boolean isParquetFile(final FileStatus fileStatus) { + return fileStatus.isFile() && fileStatus.getPath().getName().endsWith(".parquet"); + } + + public static boolean isMdStoreOrVersionDir(final FileStatus fileStatus) { return fileStatus.isDirectory() && fileStatus.getPath().getName().startsWith("md-"); } diff --git a/apps/dhp-mdstore-manager/src/test/java/eu/dnetlib/data/mdstore/manager/utils/HdfsClientTest.java b/apps/dhp-mdstore-manager/src/test/java/eu/dnetlib/data/mdstore/manager/utils/HdfsClientTest.java new file mode 100644 index 00000000..756d7831 --- /dev/null +++ b/apps/dhp-mdstore-manager/src/test/java/eu/dnetlib/data/mdstore/manager/utils/HdfsClientTest.java @@ -0,0 +1,54 @@ +package eu.dnetlib.data.mdstore.manager.utils; + +import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import parquet.avro.AvroParquetReader; +import parquet.hadoop.ParquetReader; + +@Disabled +class HdfsClientTest { + + private static final String PARQUET_FILE = "file:///Users/michele/Desktop/part-00000-e3675dc3-69fb-422e-a159-78e34cfe14d2-c000.snappy.parquet"; + + @BeforeEach + void setUp() throws Exception {} + + @SuppressWarnings("unchecked") + @Test + void testParquet() throws IllegalArgumentException, IOException { + + System.out.println("Opening parquet file: " + PARQUET_FILE); + + try (final ParquetReader reader = + AvroParquetReader. builder(new Path(PARQUET_FILE)).withConf(new Configuration()).build()) { + System.out.println("File OPENED"); + + GenericRecord rec = null; + final Set fields = new LinkedHashSet<>(); + while ((rec = reader.read()) != null) { + if (fields.isEmpty()) { + rec.getSchema().getFields().forEach(f -> fields.add(f.name())); + } + + final Map map = new LinkedHashMap<>(); + for (final String f : fields) { + map.put(f, rec.get(f)); + } + + System.out.println(map); + } + } + } + +}