diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertObjectToJson.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertObjectToJson.scala new file mode 100644 index 000000000..846ac37af --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertObjectToJson.scala @@ -0,0 +1,53 @@ +package eu.dnetlib.dhp.sx.graph + +import com.fasterxml.jackson.databind.ObjectMapper +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.schema.sx.scholix.Scholix +import eu.dnetlib.dhp.schema.sx.summary.ScholixSummary +import org.apache.commons.io.IOUtils +import org.apache.spark.SparkConf +import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession} +import org.slf4j.{Logger, LoggerFactory} +import org.apache.hadoop.io.compress._ + +object SparkConvertObjectToJson { + + def main(args: Array[String]): Unit = { + val log: Logger = LoggerFactory.getLogger(getClass) + val conf: SparkConf = new SparkConf() + val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/convert_object_json_params.json"))) + parser.parseArgument(args) + val spark: SparkSession = + SparkSession + .builder() + .config(conf) + .appName(getClass.getSimpleName) + .master(parser.get("master")).getOrCreate() + + val sourcePath = parser.get("sourcePath") + log.info(s"sourcePath -> $sourcePath") + val targetPath = parser.get("targetPath") + log.info(s"targetPath -> $targetPath") + val objectType = parser.get("objectType") + log.info(s"objectType -> $objectType") + + + implicit val scholixEncoder :Encoder[Scholix]= Encoders.kryo[Scholix] + implicit val summaryEncoder :Encoder[ScholixSummary]= Encoders.kryo[ScholixSummary] + + + val mapper = new ObjectMapper + + objectType.toLowerCase match { + case "scholix" => + log.info("Serialize Scholix") + val d: Dataset[Scholix] = spark.read.load(sourcePath).as[Scholix] + d.map(s => mapper.writeValueAsString(s))(Encoders.STRING).rdd.repartition(6000).saveAsTextFile(targetPath, classOf[GzipCodec]) + case "summary" => + log.info("Serialize Summary") + val d: Dataset[ScholixSummary] = spark.read.load(sourcePath).as[ScholixSummary] + d.map(s => mapper.writeValueAsString(s))(Encoders.STRING).rdd.repartition(1000).saveAsTextFile(targetPath, classOf[GzipCodec]) + } + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_object_json_params.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_object_json_params.json new file mode 100644 index 000000000..4b15da623 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_object_json_params.json @@ -0,0 +1,6 @@ +[ + {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, + {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the source Path", "paramRequired": true}, + {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the raw graph", "paramRequired": true}, + {"paramName":"o", "paramLongName":"objectType", "paramDescription": "should be scholix or Summary", "paramRequired": true} +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml index 1895ab741..3ea4e9d30 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml @@ -117,6 +117,68 @@ --relationPath${targetPath}/resolved/resolvedRelation + + + + + + + + + + + + + + + + yarn + cluster + Serialize scholix to JSON + eu.dnetlib.dhp.sx.graph.SparkConvertObjectToJson + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.shuffle.partitions=6000 + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --masteryarn + --sourcePath${targetPath}/provision/scholix/scholix + --targetPath${targetPath}/index/scholix_json + --objectTypescholix + + + + + + + + + yarn + cluster + Serialize summary to JSON + eu.dnetlib.dhp.sx.graph.SparkConvertObjectToJson + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.shuffle.partitions=6000 + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --masteryarn + --sourcePath${targetPath}/provision/summaries_filtered + --targetPath${targetPath}/index/summaries_json + --objectTypesummary +