1
0
Fork 0

updated baseline workflow

This commit is contained in:
Sandro La Bruzzo 2021-07-08 11:11:41 +02:00
parent 0799ac9fb6
commit 8a034e46e1
3 changed files with 16 additions and 3 deletions

View File

@ -45,6 +45,11 @@ object SparkCreateBaselineDataFrame {
parser.parseArgument(args) parser.parseArgument(args)
val isLookupUrl: String = parser.get("isLookupUrl") val isLookupUrl: String = parser.get("isLookupUrl")
log.info("isLookupUrl: {}", isLookupUrl) log.info("isLookupUrl: {}", isLookupUrl)
val workingPath = parser.get("workingPath")
log.info("workingPath: {}", workingPath)
val targetPath = parser.get("targetPath")
log.info("targetPath: {}", targetPath)
val isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl) val isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl)
val vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService) val vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService)
@ -59,7 +64,7 @@ object SparkCreateBaselineDataFrame {
val sc = spark.sparkContext val sc = spark.sparkContext
val workingPath = parser.get("workingPath")
implicit val PMEncoder: Encoder[PMArticle] = Encoders.kryo(classOf[PMArticle]) implicit val PMEncoder: Encoder[PMArticle] = Encoders.kryo(classOf[PMArticle])
implicit val PMJEncoder: Encoder[PMJournal] = Encoders.kryo(classOf[PMJournal]) implicit val PMJEncoder: Encoder[PMJournal] = Encoders.kryo(classOf[PMJournal])
@ -81,6 +86,8 @@ object SparkCreateBaselineDataFrame {
exported_dataset exported_dataset
.map(a => PubMedToOaf.convert(a, vocabularies)).as[Result] .map(a => PubMedToOaf.convert(a, vocabularies)).as[Result]
.filter(p => p!= null) .filter(p => p!= null)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/oaf/baseline_oaf") .write.mode(SaveMode.Overwrite).save(targetPath)
//s"$workingPath/oaf/baseline_oaf"
} }
} }

View File

@ -1,5 +1,6 @@
[ [
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"i", "paramLongName":"isLookupUrl","paramDescription": "isLookupUrl", "paramRequired": true}, {"paramName":"i", "paramLongName":"isLookupUrl","paramDescription": "isLookupUrl", "paramRequired": true},
{"paramName":"w", "paramLongName":"workingPath","paramDescription": "the path of the sequencial file to read", "paramRequired": true} {"paramName":"w", "paramLongName":"workingPath","paramDescription": "the path of the sequencial file to read", "paramRequired": true},
{"paramName":"t", "paramLongName":"targetPath","paramDescription": "the oaf path ", "paramRequired": true}
] ]

View File

@ -4,6 +4,10 @@
<name>baselineWorkingPath</name> <name>baselineWorkingPath</name>
<description>the Baseline Working Path</description> <description>the Baseline Working Path</description>
</property> </property>
<property>
<name>targetPath</name>
<description>the Target Path</description>
</property>
<property> <property>
<name>isLookupUrl</name> <name>isLookupUrl</name>
<description>The IS lookUp service endopoint</description> <description>The IS lookUp service endopoint</description>
@ -34,6 +38,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts> </spark-opts>
<arg>--workingPath</arg><arg>${baselineWorkingPath}</arg> <arg>--workingPath</arg><arg>${baselineWorkingPath}</arg>
<arg>--targetPath</arg><arg>${targetPath}</arg>
<arg>--master</arg><arg>yarn</arg> <arg>--master</arg><arg>yarn</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
</spark> </spark>