package eu.dnetlib.dhp.oa.graph.raw; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.util.Arrays; import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo; import scala.Tuple2; public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication { private static final Logger log = LoggerFactory.getLogger(MigrateHdfsMdstoresApplication.class); public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils .toString( MigrateHdfsMdstoresApplication.class .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/migrate_hdfs_mstores_parameters.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional .ofNullable(parser.get("isSparkSessionManaged")) .map(Boolean::valueOf) .orElse(Boolean.TRUE); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); final String mdstoreManagerUrl = parser.get("mdstoreManagerUrl"); final String mdFormat = parser.get("mdFormat"); final String mdLayout = parser.get("mdLayout"); final String mdInterpretation = parser.get("mdInterpretation"); final String hdfsPath = parser.get("hdfsPath"); final Set paths = mdstorePaths(mdstoreManagerUrl, mdFormat, mdLayout, mdInterpretation); final SparkConf conf = new SparkConf(); runWithSparkSession(conf, isSparkSessionManaged, spark -> { HdfsSupport.remove(hdfsPath, spark.sparkContext().hadoopConfiguration()); processPaths(spark, hdfsPath, paths, String.format("%s-%s-%s", mdFormat, mdLayout, mdInterpretation)); }); } public static void processPaths(final SparkSession spark, final String outputPath, final Set paths, final String type) throws Exception { final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); log.info("Found " + paths.size() + " not empty mdstores"); paths.forEach(log::info); final String[] validPaths = paths .stream() .filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration())) .toArray(size -> new String[size]); spark .read() .parquet(validPaths) .map((MapFunction) r -> r.getAs("body"), Encoders.STRING()) .toJavaRDD() .mapToPair(xml -> new Tuple2<>(new Text(UUID.randomUUID() + ":" + type), new Text(xml))) .coalesce(1) .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); /* * .foreach(xml -> { try { writer.append(new Text(UUID.randomUUID() + ":" + type), new Text(xml)); } catch * (final Exception e) { throw new RuntimeException(e); } }); */ } private static Set mdstorePaths(final String mdstoreManagerUrl, final String format, final String layout, final String interpretation) throws Exception { final String url = mdstoreManagerUrl + "/mdstores/"; final ObjectMapper objectMapper = new ObjectMapper(); final HttpGet req = new HttpGet(url); try (final CloseableHttpClient client = HttpClients.createDefault()) { try (final CloseableHttpResponse response = client.execute(req)) { final String json = IOUtils.toString(response.getEntity().getContent()); final MDStoreWithInfo[] mdstores = objectMapper.readValue(json, MDStoreWithInfo[].class); return Arrays .stream(mdstores) .filter(md -> md.getFormat().equalsIgnoreCase(format)) .filter(md -> md.getLayout().equalsIgnoreCase(layout)) .filter(md -> md.getInterpretation().equalsIgnoreCase(interpretation)) .filter(md -> StringUtils.isNotBlank(md.getHdfsPath())) .filter(md -> StringUtils.isNotBlank(md.getCurrentVersion())) .filter(md -> md.getSize() > 0) .map(md -> md.getHdfsPath() + "/" + md.getCurrentVersion() + "/store") .collect(Collectors.toSet()); } } } }