added params skip update to direct transform in OAF, this should be set to true in production

This commit is contained in:
Sandro La Bruzzo 2021-10-12 12:36:30 +02:00
parent 511da98d0c
commit 7387416e90
3 changed files with 22 additions and 13 deletions

View File

@ -32,7 +32,7 @@ object SparkCreateBaselineDataFrame {
val start = l.indexOf("<a href=\"") val start = l.indexOf("<a href=\"")
if (start >= 0 && end > start) if (start >= 0 && end > start)
l.substring(start + 9, (end - start)) l.substring(start + 9, end - start)
else else
"" ""
}.filter(s => s.endsWith(".gz")).filter(s => s > maxFile).map(s => (s, s"https://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/$s")).toList }.filter(s => s.endsWith(".gz")).filter(s => s > maxFile).map(s => (s, s"https://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/$s")).toList
@ -158,6 +158,9 @@ object SparkCreateBaselineDataFrame {
val hdfsServerUri = parser.get("hdfsServerUri") val hdfsServerUri = parser.get("hdfsServerUri")
log.info("hdfsServerUri: {}", targetPath) log.info("hdfsServerUri: {}", targetPath)
val skipUpdate = parser.get("skipUpdate")
log.info("skipUpdate: {}", skipUpdate)
val isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl) val isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl)
val vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService) val vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService)
@ -176,18 +179,17 @@ object SparkCreateBaselineDataFrame {
implicit val PMAEncoder: Encoder[PMAuthor] = Encoders.kryo(classOf[PMAuthor]) implicit val PMAEncoder: Encoder[PMAuthor] = Encoders.kryo(classOf[PMAuthor])
implicit val resultEncoder: Encoder[Result] = Encoders.kryo(classOf[Result]) implicit val resultEncoder: Encoder[Result] = Encoders.kryo(classOf[Result])
if (!"true".equalsIgnoreCase(skipUpdate)) {
downloadBaseLineUpdate(s"$workingPath/baseline", hdfsServerUri) downloadBaseLineUpdate(s"$workingPath/baseline", hdfsServerUri)
val k: RDD[(String, String)] = sc.wholeTextFiles(s"$workingPath/baseline", 2000) val k: RDD[(String, String)] = sc.wholeTextFiles(s"$workingPath/baseline", 2000)
val ds: Dataset[PMArticle] = spark.createDataset(k.filter(i => i._1.endsWith(".gz")).flatMap(i => { val ds: Dataset[PMArticle] = spark.createDataset(k.filter(i => i._1.endsWith(".gz")).flatMap(i => {
val xml = new XMLEventReader(Source.fromBytes(i._2.getBytes())) val xml = new XMLEventReader(Source.fromBytes(i._2.getBytes()))
new PMParser(xml) new PMParser(xml)
})) }))
ds.map(p => (p.getPmid, p))(Encoders.tuple(Encoders.STRING, PMEncoder)).groupByKey(_._1) ds.map(p => (p.getPmid, p))(Encoders.tuple(Encoders.STRING, PMEncoder)).groupByKey(_._1)
.agg(pmArticleAggregator.toColumn) .agg(pmArticleAggregator.toColumn)
.map(p => p._2).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_dataset") .map(p => p._2).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_dataset")
}
val exported_dataset = spark.read.load(s"$workingPath/baseline_dataset").as[PMArticle] val exported_dataset = spark.read.load(s"$workingPath/baseline_dataset").as[PMArticle]
exported_dataset exported_dataset

View File

@ -3,5 +3,6 @@
{"paramName":"i", "paramLongName":"isLookupUrl", "paramDescription": "isLookupUrl", "paramRequired": true}, {"paramName":"i", "paramLongName":"isLookupUrl", "paramDescription": "isLookupUrl", "paramRequired": true},
{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}, {"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true},
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the oaf path ", "paramRequired": true}, {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the oaf path ", "paramRequired": true},
{"paramName":"s", "paramLongName":"skipUpdate", "paramDescription": "skip update ", "paramRequired": false},
{"paramName":"h", "paramLongName":"hdfsServerUri", "paramDescription": "the working path ", "paramRequired": true} {"paramName":"h", "paramLongName":"hdfsServerUri", "paramDescription": "the working path ", "paramRequired": true}
] ]

View File

@ -12,6 +12,11 @@
<name>targetPath</name> <name>targetPath</name>
<description>The target path</description> <description>The target path</description>
</property> </property>
<property>
<name>skipUpdate</name>
<value>false</value>
<description>The request block size</description>
</property>
</parameters> </parameters>
<start to="ConvertDataset"/> <start to="ConvertDataset"/>
@ -42,6 +47,7 @@
<arg>--master</arg><arg>yarn</arg> <arg>--master</arg><arg>yarn</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--hdfsServerUri</arg><arg>${nameNode}</arg> <arg>--hdfsServerUri</arg><arg>${nameNode}</arg>
<arg>--skipUpdate</arg><arg>${skipUpdate}</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>