From ad56a44fda8ff76202c963326cfcb5e6a5b753f6 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Fri, 28 May 2021 14:45:39 +0200 Subject: [PATCH] save as gzipped sequence file --- .../raw/MigrateHdfsMdstoresApplication.java | 75 ++++++------ .../raw/MigrateMongoMdstoresApplication.java | 3 - .../oozie_app/config-default.xml | 18 +++ .../raw_hdfs_stores/oozie_app/workflow.xml | 109 ++++++++++++++++++ 4 files changed, 169 insertions(+), 36 deletions(-) create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_hdfs_stores/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_hdfs_stores/oozie_app/workflow.xml diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java index b261b5e76..3e78edf4c 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java @@ -6,10 +6,14 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.util.Arrays; import java.util.Optional; import java.util.Set; +import java.util.UUID; import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; @@ -29,24 +33,18 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo; +import scala.Tuple2; public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication { private static final Logger log = LoggerFactory.getLogger(MigrateHdfsMdstoresApplication.class); - private final String mdstoreManagerUrl; - - private final String format; - - private final String layout; - - private final String interpretation; - public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString(MigrateHdfsMdstoresApplication.class - .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/migrate_hdfs_mstores_parameters.json"))); + .toString( + MigrateHdfsMdstoresApplication.class + .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/migrate_hdfs_mstores_parameters.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional @@ -62,39 +60,51 @@ public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication final String hdfsPath = parser.get("hdfsPath"); + final Set paths = mdstorePaths(mdstoreManagerUrl, mdFormat, mdLayout, mdInterpretation); + final SparkConf conf = new SparkConf(); runWithSparkSession(conf, isSparkSessionManaged, spark -> { - try (final MigrateHdfsMdstoresApplication app = - new MigrateHdfsMdstoresApplication(hdfsPath, mdstoreManagerUrl, mdFormat, mdLayout, mdInterpretation)) { - app.execute(spark); - } + HdfsSupport.remove(hdfsPath, spark.sparkContext().hadoopConfiguration()); + processPaths(spark, hdfsPath, paths, String.format("%s-%s-%s", mdFormat, mdLayout, mdInterpretation)); }); } - public MigrateHdfsMdstoresApplication(final String hdfsPath, final String mdstoreManagerUrl, final String format, final String layout, - final String interpretation) throws Exception { - super(hdfsPath); - this.mdstoreManagerUrl = mdstoreManagerUrl; - this.format = format; - this.layout = layout; - this.interpretation = interpretation; - } - - public void execute(final SparkSession spark) throws Exception { + public static void processPaths(final SparkSession spark, + final String outputPath, + final Set paths, + final String type) throws Exception { final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - final Set paths = mdstorePaths(sc); log.info("Found " + paths.size() + " not empty mdstores"); + paths.forEach(log::info); - spark.read() - .parquet(paths.toArray(new String[paths.size()])) + final String[] validPaths = paths + .stream() + .filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration())) + .toArray(size -> new String[size]); + + spark + .read() + .parquet(validPaths) .map((MapFunction) r -> r.getAs("body"), Encoders.STRING()) - .foreach(xml -> emit(xml, String.format("%s-%s-%s", format, layout, interpretation))); + .toJavaRDD() + .mapToPair(xml -> new Tuple2<>(new Text(UUID.randomUUID() + ":" + type), new Text(xml))) + .coalesce(1) + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); + + /* + * .foreach(xml -> { try { writer.append(new Text(UUID.randomUUID() + ":" + type), new Text(xml)); } catch + * (final Exception e) { throw new RuntimeException(e); } }); + */ } - private Set mdstorePaths(final JavaSparkContext sc) throws Exception { - final String url = mdstoreManagerUrl + "/mdstores"; + private static Set mdstorePaths(final String mdstoreManagerUrl, + final String format, + final String layout, + final String interpretation) + throws Exception { + final String url = mdstoreManagerUrl + "/mdstores/"; final ObjectMapper objectMapper = new ObjectMapper(); final HttpGet req = new HttpGet(url); @@ -103,7 +113,8 @@ public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication try (final CloseableHttpResponse response = client.execute(req)) { final String json = IOUtils.toString(response.getEntity().getContent()); final MDStoreWithInfo[] mdstores = objectMapper.readValue(json, MDStoreWithInfo[].class); - return Arrays.stream(mdstores) + return Arrays + .stream(mdstores) .filter(md -> md.getFormat().equalsIgnoreCase(format)) .filter(md -> md.getLayout().equalsIgnoreCase(layout)) .filter(md -> md.getInterpretation().equalsIgnoreCase(interpretation)) @@ -111,10 +122,8 @@ public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication .filter(md -> StringUtils.isNotBlank(md.getCurrentVersion())) .filter(md -> md.getSize() > 0) .map(md -> md.getHdfsPath() + "/" + md.getCurrentVersion() + "/store") - .filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration())) .collect(Collectors.toSet()); } } } - } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java index 9acdabb37..3f6afbeac 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java @@ -10,9 +10,6 @@ import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.mongodb.MongoClient; -import com.mongodb.MongoClientURI; - import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.MdstoreClient; import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_hdfs_stores/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_hdfs_stores/oozie_app/config-default.xml new file mode 100644 index 000000000..2e0ed9aee --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_hdfs_stores/oozie_app/config-default.xml @@ -0,0 +1,18 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_hdfs_stores/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_hdfs_stores/oozie_app/workflow.xml new file mode 100644 index 000000000..2c24aeb1a --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_hdfs_stores/oozie_app/workflow.xml @@ -0,0 +1,109 @@ + + + + + graphOutputPath + the target path to store raw graph + + + contentPath + path location to store (or reuse) content from the aggregator + + + mdstoreManagerUrl + the address of the Mdstore Manager + + + isLookupUrl + the address of the lookUp service + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + yarn + cluster + ImportODF_hdfs + eu.dnetlib.dhp.oa.graph.raw.MigrateHdfsMdstoresApplication + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --hdfsPath${contentPath}/odf_records_hdfs + --mdstoreManagerUrl${mdstoreManagerUrl} + --mdFormatODF + --mdLayoutstore + --mdInterpretationcleaned + + + + + + + + \ No newline at end of file