added workflow to serialize scholix and summary in json

pull/124/head
Sandro La Bruzzo 3 years ago
parent 0ea576745f
commit 09fccf8000

@ -0,0 +1,53 @@
package eu.dnetlib.dhp.sx.graph
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.sx.scholix.Scholix
import eu.dnetlib.dhp.schema.sx.summary.ScholixSummary
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
import org.apache.hadoop.io.compress._
object SparkConvertObjectToJson {
def main(args: Array[String]): Unit = {
val log: Logger = LoggerFactory.getLogger(getClass)
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/convert_object_json_params.json")))
parser.parseArgument(args)
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.appName(getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
val sourcePath = parser.get("sourcePath")
log.info(s"sourcePath -> $sourcePath")
val targetPath = parser.get("targetPath")
log.info(s"targetPath -> $targetPath")
val objectType = parser.get("objectType")
log.info(s"objectType -> $objectType")
implicit val scholixEncoder :Encoder[Scholix]= Encoders.kryo[Scholix]
implicit val summaryEncoder :Encoder[ScholixSummary]= Encoders.kryo[ScholixSummary]
val mapper = new ObjectMapper
objectType.toLowerCase match {
case "scholix" =>
log.info("Serialize Scholix")
val d: Dataset[Scholix] = spark.read.load(sourcePath).as[Scholix]
d.map(s => mapper.writeValueAsString(s))(Encoders.STRING).rdd.repartition(6000).saveAsTextFile(targetPath, classOf[GzipCodec])
case "summary" =>
log.info("Serialize Summary")
val d: Dataset[ScholixSummary] = spark.read.load(sourcePath).as[ScholixSummary]
d.map(s => mapper.writeValueAsString(s))(Encoders.STRING).rdd.repartition(1000).saveAsTextFile(targetPath, classOf[GzipCodec])
}
}
}

@ -0,0 +1,6 @@
[
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the source Path", "paramRequired": true},
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the raw graph", "paramRequired": true},
{"paramName":"o", "paramLongName":"objectType", "paramDescription": "should be scholix or Summary", "paramRequired": true}
]

@ -117,6 +117,68 @@
<arg>--relationPath</arg><arg>${targetPath}/resolved/resolvedRelation</arg>
</spark>
<ok to="DropJSONPath"/>
<error to="Kill"/>
</action>
<action name="DropJSONPath">
<fs>
<delete path='${targetPath}/index'/>
<mkdir path='${targetPath}/index/'/>
</fs>
<ok to="SerializeScholix"/>
<error to="Kill"/>
</action>
<action name="SerializeScholix">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Serialize scholix to JSON</name>
<class>eu.dnetlib.dhp.sx.graph.SparkConvertObjectToJson</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.shuffle.partitions=6000
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--master</arg><arg>yarn</arg>
<arg>--sourcePath</arg><arg>${targetPath}/provision/scholix/scholix</arg>
<arg>--targetPath</arg><arg>${targetPath}/index/scholix_json</arg>
<arg>--objectType</arg><arg>scholix</arg>
</spark>
<ok to="SerializeSummary"/>
<error to="Kill"/>
</action>
<action name="SerializeSummary">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Serialize summary to JSON</name>
<class>eu.dnetlib.dhp.sx.graph.SparkConvertObjectToJson</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.shuffle.partitions=6000
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--master</arg><arg>yarn</arg>
<arg>--sourcePath</arg><arg>${targetPath}/provision/summaries_filtered</arg>
<arg>--targetPath</arg><arg>${targetPath}/index/summaries_json</arg>
<arg>--objectType</arg><arg>summary</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>

Loading…
Cancel
Save