parquet clinet
This commit is contained in:
parent
504d42e96c
commit
43e0386b78
|
@ -36,7 +36,6 @@
|
|||
<groupId>commons-io</groupId>
|
||||
<artifactId>commons-io</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- Hadoop -->
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
|
@ -57,7 +56,16 @@
|
|||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.twitter</groupId>
|
||||
<artifactId>parquet-hadoop</artifactId>
|
||||
<version>1.5.0-cdh5.13.3</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.twitter</groupId>
|
||||
<artifactId>parquet-avro</artifactId>
|
||||
<version>1.5.0-cdh5.13.3</version>
|
||||
</dependency>
|
||||
<!-- DHP Common -->
|
||||
<dependency>
|
||||
<groupId>eu.dnetlib.dhp</groupId>
|
||||
|
|
|
@ -201,6 +201,20 @@ public class MDStoreController extends AbstractDnetController {
|
|||
return info;
|
||||
}
|
||||
|
||||
@ApiOperation("list the file inside the path of a mdstore version")
|
||||
@GetMapping("/version/{versionId}/parquet/files")
|
||||
public Set<String> listVersionFiles(@PathVariable final String versionId) throws MDStoreManagerException {
|
||||
final String path = databaseUtils.findVersion(versionId).getHdfsPath();
|
||||
return hdfsClient.listContent(path + "/store", HdfsClient::isParquetFile);
|
||||
}
|
||||
|
||||
@ApiOperation("read the parquet file of a mdstore version")
|
||||
@GetMapping("/version/{versionId}/parquet/content/{limit}")
|
||||
public List<Map<String, Object>> listVersionParquet(@PathVariable final String versionId, @PathVariable final long limit) throws MDStoreManagerException {
|
||||
final String path = databaseUtils.findVersion(versionId).getHdfsPath();
|
||||
return hdfsClient.readParquetFiles(path + "/store", limit);
|
||||
}
|
||||
|
||||
protected void setDatabaseUtils(final DatabaseUtils databaseUtils) {
|
||||
this.databaseUtils = databaseUtils;
|
||||
}
|
||||
|
|
|
@ -1,9 +1,15 @@
|
|||
package eu.dnetlib.data.mdstore.manager.utils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -14,6 +20,8 @@ import org.springframework.beans.factory.annotation.Value;
|
|||
import org.springframework.stereotype.Component;
|
||||
|
||||
import eu.dnetlib.data.mdstore.manager.exceptions.MDStoreManagerException;
|
||||
import parquet.avro.AvroParquetReader;
|
||||
import parquet.hadoop.ParquetReader;
|
||||
|
||||
@Component
|
||||
public class HdfsClient {
|
||||
|
@ -45,10 +53,10 @@ public class HdfsClient {
|
|||
|
||||
try (final FileSystem fs = FileSystem.get(conf())) {
|
||||
for (final FileStatus mdDir : fs.listStatus(new Path(hdfsBasePath))) {
|
||||
if (isValidDir(mdDir)) {
|
||||
if (isMdStoreOrVersionDir(mdDir)) {
|
||||
res.add(String.format("%s/%s", hdfsBasePath, mdDir.getPath().getName()));
|
||||
for (final FileStatus verDir : fs.listStatus(mdDir.getPath())) {
|
||||
if (isValidDir(verDir)) {
|
||||
if (isMdStoreOrVersionDir(verDir)) {
|
||||
res.add(String.format("%s/%s/%s", hdfsBasePath, mdDir.getPath().getName(), verDir.getPath().getName()));
|
||||
}
|
||||
}
|
||||
|
@ -61,7 +69,94 @@ public class HdfsClient {
|
|||
return res;
|
||||
}
|
||||
|
||||
private boolean isValidDir(final FileStatus fileStatus) {
|
||||
public Set<String> listContent(final String path, final Predicate<FileStatus> condition) {
|
||||
final Set<String> res = new LinkedHashSet<>();
|
||||
try (final FileSystem fs = FileSystem.get(conf())) {
|
||||
for (final FileStatus f : fs.listStatus(new Path(path))) {
|
||||
if (condition.test(f)) {
|
||||
res.add(String.format("%s/%s", path, f.getPath().getName()));
|
||||
}
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
log.error("Error Listing path: " + path, e);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public List<Map<String, Object>> readParquetFiles(final String path, final long limit) throws MDStoreManagerException {
|
||||
|
||||
final List<Map<String, Object>> list = new ArrayList<>();
|
||||
|
||||
final Configuration conf = conf();
|
||||
|
||||
long i = 0;
|
||||
|
||||
final Set<String> fields = new LinkedHashSet<>();
|
||||
|
||||
for (final String f : listContent(path, HdfsClient::isParquetFile)) {
|
||||
if (i < limit) {
|
||||
log.info("Opening parquet file: " + f);
|
||||
|
||||
try (final ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord> builder(new Path(f)).withConf(conf).build()) {
|
||||
log.debug("File parquet OPENED");
|
||||
|
||||
GenericRecord rec = null;
|
||||
while (i++ < limit && (rec = reader.read()) != null) {
|
||||
if (fields.isEmpty()) {
|
||||
rec.getSchema().getFields().forEach(field -> fields.add(field.name()));
|
||||
log.debug("Found schema: " + fields);
|
||||
}
|
||||
final Map<String, Object> map = new LinkedHashMap<>();
|
||||
for (final String field : fields) {
|
||||
map.put(field, rec.get(field));
|
||||
}
|
||||
list.add(map);
|
||||
log.debug("added record");
|
||||
}
|
||||
} catch (final Throwable e) {
|
||||
log.error("Error reading parquet file: " + f, e);
|
||||
throw new MDStoreManagerException("Error reading parquet file: " + f, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
/*
|
||||
*
|
||||
* private String printGroup(final Group g) { final StringWriter sw = new StringWriter();
|
||||
*
|
||||
* final int fieldCount = g.getType().getFieldCount(); for (int field = 0; field < fieldCount; field++) { final int valueCount =
|
||||
* g.getFieldRepetitionCount(field);
|
||||
*
|
||||
* final Type fieldType = g.getType().getType(field); final String fieldName = fieldType.getName();
|
||||
*
|
||||
* for (int index = 0; index < valueCount; index++) { if (fieldType.isPrimitive()) { sw.append(fieldName + " " +
|
||||
* g.getValueToString(field, index)); sw.append("\n"); } } } return sw.toString(); }
|
||||
*
|
||||
* public List<String> readParquetFile(final String file, final long n) throws MDStoreManagerException {
|
||||
*
|
||||
* final Configuration conf = conf(); final Path path = new Path(file); final List<String> list = new ArrayList<>(); try { final
|
||||
* ParquetMetadata readFooter = ParquetFileReader.readFooter(conf, path, ParquetMetadataConverter.NO_FILTER); final MessageType schema =
|
||||
* readFooter.getFileMetaData().getSchema(); final ParquetFileReader r = new ParquetFileReader(conf, path, readFooter);
|
||||
*
|
||||
* PageReadStore pages = null; try { while (null != (pages = r.readNextRowGroup())) { final long rows = pages.getRowCount();
|
||||
* System.out.println("Number of rows: " + rows);
|
||||
*
|
||||
* final MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema); final RecordReader<Group> recordReader =
|
||||
* columnIO.getRecordReader(pages, new GroupRecordConverter(schema)); for (int i = 0; i < rows; i++) { final Group g =
|
||||
* recordReader.read(); list.add(printGroup(g)); } } } finally { r.close(); } } catch (final IOException e) {
|
||||
* System.out.println("Error reading parquet file."); e.printStackTrace(); } return list; }
|
||||
*
|
||||
*
|
||||
*/
|
||||
|
||||
public static boolean isParquetFile(final FileStatus fileStatus) {
|
||||
return fileStatus.isFile() && fileStatus.getPath().getName().endsWith(".parquet");
|
||||
}
|
||||
|
||||
public static boolean isMdStoreOrVersionDir(final FileStatus fileStatus) {
|
||||
return fileStatus.isDirectory() && fileStatus.getPath().getName().startsWith("md-");
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,54 @@
|
|||
package eu.dnetlib.data.mdstore.manager.utils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import parquet.avro.AvroParquetReader;
|
||||
import parquet.hadoop.ParquetReader;
|
||||
|
||||
@Disabled
|
||||
class HdfsClientTest {
|
||||
|
||||
private static final String PARQUET_FILE = "file:///Users/michele/Desktop/part-00000-e3675dc3-69fb-422e-a159-78e34cfe14d2-c000.snappy.parquet";
|
||||
|
||||
@BeforeEach
|
||||
void setUp() throws Exception {}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
void testParquet() throws IllegalArgumentException, IOException {
|
||||
|
||||
System.out.println("Opening parquet file: " + PARQUET_FILE);
|
||||
|
||||
try (final ParquetReader<GenericRecord> reader =
|
||||
AvroParquetReader.<GenericRecord> builder(new Path(PARQUET_FILE)).withConf(new Configuration()).build()) {
|
||||
System.out.println("File OPENED");
|
||||
|
||||
GenericRecord rec = null;
|
||||
final Set<String> fields = new LinkedHashSet<>();
|
||||
while ((rec = reader.read()) != null) {
|
||||
if (fields.isEmpty()) {
|
||||
rec.getSchema().getFields().forEach(f -> fields.add(f.name()));
|
||||
}
|
||||
|
||||
final Map<String, Object> map = new LinkedHashMap<>();
|
||||
for (final String f : fields) {
|
||||
map.put(f, rec.get(f));
|
||||
}
|
||||
|
||||
System.out.println(map);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue