package eu.dnetlib.data.mdstore.hadoop; import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Predicate; import java.util.stream.Stream; import org.apache.avro.generic.GenericRecord; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.errors.MDStoreManagerException; import parquet.avro.AvroParquetReader; import parquet.hadoop.ParquetReader; @Component public class HdfsClient { @Value("${dhp.mdstore-manager.hadoop.cluster}") private String hadoopCluster; @Value("${dhp.mdstore-manager.hadoop.user}") private String hadoopUser; @Value("${dhp.mdstore-manager.hdfs.base-path}") private String hdfsBasePath; private static final Log log = LogFactory.getLog(HdfsClient.class); public void deletePath(final String path) throws MDStoreManagerException { try (final FileSystem fs = FileSystem.get(conf())) { fs.delete(new Path(path), true); log.info("HDFS Path deleted: " + path); } catch (final FileNotFoundException e) { log.warn("Missing path: " + hdfsBasePath); } catch (IllegalArgumentException | IOException e) { log.error("Eror deleting path: " + path, e); throw new MDStoreManagerException("Eror deleting path: " + path, e); } } public Set listHadoopDirs() { final Set res = new LinkedHashSet<>(); try (final FileSystem fs = FileSystem.get(conf())) { for (final FileStatus mdDir : fs.listStatus(new Path(hdfsBasePath))) { if (isMdStoreOrVersionDir(mdDir)) { res.add(String.format("%s/%s", hdfsBasePath, mdDir.getPath().getName())); for (final FileStatus verDir : fs.listStatus(mdDir.getPath())) { if (isMdStoreOrVersionDir(verDir)) { res.add(String.format("%s/%s/%s", hdfsBasePath, mdDir.getPath().getName(), verDir.getPath().getName())); } } } } } catch (final FileNotFoundException e) { log.warn("Missing path: " + hdfsBasePath); } catch (final Exception e) { log.error("Error Listing path: " + hdfsBasePath, e); } return res; } public Set listContent(final String path, final Predicate condition) { // TODO: remove the following line (it is only for tests) // if (1 != 2) { return // Sets.newHashSet("file:///Users/michele/Desktop/part-00000-e3675dc3-69fb-422e-a159-78e34cfe14d2-c000.snappy.parquet"); } final Set res = new LinkedHashSet<>(); try (final FileSystem fs = FileSystem.get(conf())) { for (final FileStatus f : fs.listStatus(new Path(path))) { if (condition.test(f)) { res.add(String.format("%s/%s", path, f.getPath().getName())); } } } catch (final FileNotFoundException e) { log.warn("Missing path: " + hdfsBasePath); } catch (final Exception e) { log.error("Error Listing path: " + path, e); } return res; } @SuppressWarnings("unchecked") public List readParquetFiles(final String path, final long limit, final Class clazz) throws MDStoreManagerException { final List list = new ArrayList<>(); final Configuration conf = conf(); long i = 0; final Set fields = new LinkedHashSet<>(); for (final String f : listContent(path, HdfsClient::isParquetFile)) { if (i < limit) { log.info("Opening parquet file: " + f); try (final ParquetReader reader = AvroParquetReader. builder(new Path(f)).withConf(conf).build()) { log.debug("File parquet OPENED"); final ObjectMapper mapper = new ObjectMapper(); GenericRecord rec = null; while (i++ < limit && (rec = reader.read()) != null) { if (fields.isEmpty()) { rec.getSchema().getFields().forEach(field -> fields.add(field.name())); log.debug("Found schema: " + fields); } final Map map = new LinkedHashMap<>(); for (final String field : fields) { final Object v = rec.get(field); map.put(field, v != null ? v.toString() : ""); } list.add(mapper.convertValue(map, clazz)); log.debug("added record"); } } catch (final FileNotFoundException e) { log.warn("Missing path: " + hdfsBasePath); } catch (final Throwable e) { log.error("Error reading parquet file: " + f, e); throw new MDStoreManagerException("Error reading parquet file: " + f, e); } } } return list; } public Stream streamParquetFiles(final String path, final Class clazz) throws MDStoreManagerException { // TODO Re-implement the method without list final List list = new ArrayList<>(); final Configuration conf = conf(); final Set fields = new LinkedHashSet<>(); for (final String f : listContent(path, HdfsClient::isParquetFile)) { log.info("Opening parquet file: " + f); try (final ParquetReader reader = AvroParquetReader. builder(new Path(f)).withConf(conf).build()) { log.debug("File parquet OPENED"); final ObjectMapper mapper = new ObjectMapper(); GenericRecord rec = null; while ((rec = reader.read()) != null) { if (fields.isEmpty()) { rec.getSchema().getFields().forEach(field -> fields.add(field.name())); log.debug("Found schema: " + fields); } final Map map = new LinkedHashMap<>(); for (final String field : fields) { final Object v = rec.get(field); map.put(field, v != null ? v.toString() : ""); } list.add(mapper.convertValue(map, clazz)); log.debug("added record"); } } catch (final FileNotFoundException e) { log.warn("Missing path: " + hdfsBasePath); } catch (final Throwable e) { log.error("Error reading parquet file: " + f, e); throw new MDStoreManagerException("Error reading parquet file: " + f, e); } } return list.stream(); } /* * * private String printGroup(final Group g) { final StringWriter sw = new StringWriter(); * * final int fieldCount = g.getType().getFieldCount(); for (int field = 0; field < fieldCount; field++) { final int valueCount = * g.getFieldRepetitionCount(field); * * final Type fieldType = g.getType().getType(field); final String fieldName = fieldType.getName(); * * for (int index = 0; index < valueCount; index++) { if (fieldType.isPrimitive()) { sw.append(fieldName + " " + * g.getValueToString(field, index)); sw.append("\n"); } } } return sw.toString(); } * * public List readParquetFile(final String file, final long n) throws MDStoreManagerException { * * final Configuration conf = conf(); final Path path = new Path(file); final List list = new ArrayList<>(); try { final * ParquetMetadata readFooter = ParquetFileReader.readFooter(conf, path, ParquetMetadataConverter.NO_FILTER); final MessageType schema = * readFooter.getFileMetaData().getSchema(); final ParquetFileReader r = new ParquetFileReader(conf, path, readFooter); * * PageReadStore pages = null; try { while (null != (pages = r.readNextRowGroup())) { final long rows = pages.getRowCount(); * System.out.println("Number of rows: " + rows); * * final MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema); final RecordReader recordReader = * columnIO.getRecordReader(pages, new GroupRecordConverter(schema)); for (int i = 0; i < rows; i++) { final Group g = * recordReader.read(); list.add(printGroup(g)); } } } finally { r.close(); } } catch (final IOException e) { * System.out.println("Error reading parquet file."); e.printStackTrace(); } return list; } * * */ public static boolean isParquetFile(final FileStatus fileStatus) { return fileStatus.isFile() && fileStatus.getPath().getName().endsWith(".parquet"); } public static boolean isMdStoreOrVersionDir(final FileStatus fileStatus) { return fileStatus.isDirectory() && fileStatus.getPath().getName().startsWith("md-"); } private Configuration conf() throws MDStoreManagerException { final Configuration conf = new Configuration(); System.setProperty("HADOOP_USER_NAME", hadoopUser); if (hadoopCluster.equalsIgnoreCase("OCEAN")) { conf.addResource(getClass().getResourceAsStream("/hadoop/OCEAN/core-site.xml")); conf.addResource(getClass().getResourceAsStream("/hadoop/OCEAN/ocean-hadoop-conf.xml")); } else if (hadoopCluster.equalsIgnoreCase("GARR")) { conf.addResource(getClass().getResourceAsStream("/hadoop/GARR/core-site.xml")); conf.addResource(getClass().getResourceAsStream("/hadoop/GARR/garr-hadoop-conf.xml")); } else if (hadoopCluster.equalsIgnoreCase("MOCK")) { // NOTHING } else { log.error("Invalid Haddop Cluster: " + hadoopCluster); throw new MDStoreManagerException("Invalid Haddop Cluster: " + hadoopCluster); } return conf; } public String getHadoopCluster() { return hadoopCluster; } public void setHadoopCluster(final String hadoopCluster) { this.hadoopCluster = hadoopCluster; } public String getHadoopUser() { return hadoopUser; } public void setHadoopUser(final String hadoopUser) { this.hadoopUser = hadoopUser; } }