1
0
Fork 0

- fixed bug on download pmc Article

- removed unused line of code in SparkCreateActionset
This commit is contained in:
Sandro La Bruzzo 2021-10-12 11:47:49 +02:00
parent 5606014b17
commit 511da98d0c
3 changed files with 8 additions and 12 deletions

View File

@ -60,14 +60,10 @@ object SparkCreateActionset {
val entities: Dataset[(String, Result)] = spark.read.load(s"$sourcePath/entities/*").as[Result].map(p => (p.getId, p))(Encoders.tuple(Encoders.STRING, resultEncoders))
entities.filter(r => r.isInstanceOf[Result]).map(r => r.asInstanceOf[Result])
entities
.joinWith(idRelation, entities("_1").equalTo(idRelation("value")))
.map(p => p._1._2)
.write.mode(SaveMode.Append).save(s"$workingDirFolder/actionSetOaf")
}
}

View File

@ -114,11 +114,7 @@ object SparkCreateBaselineDataFrame {
val hdfsWritePath: Path = new Path(s"$baselinePath/${u._1}")
val fsDataOutputStream: FSDataOutputStream = fs.create(hdfsWritePath, true)
val i = downloadBaselinePart(u._2)
val buffer = Array.fill[Byte](1024)(0)
while (i.read(buffer) > 0) {
fsDataOutputStream.write(buffer)
}
i.close()
IOUtils.copy(i, fsDataOutputStream)
println(s"Downloaded ${u._2} into $baselinePath/${u._1}")
fsDataOutputStream.close()
}
@ -182,7 +178,7 @@ object SparkCreateBaselineDataFrame {
downloadBaseLineUpdate(s"$workingPath/baseline", hdfsServerUri)
val k: RDD[(String, String)] = sc.wholeTextFiles(s"$workingPath/baseline_ftp", 2000)
val k: RDD[(String, String)] = sc.wholeTextFiles(s"$workingPath/baseline", 2000)
val ds: Dataset[PMArticle] = spark.createDataset(k.filter(i => i._1.endsWith(".gz")).flatMap(i => {
val xml = new XMLEventReader(Source.fromBytes(i._2.getBytes()))
new PMParser(xml)

View File

@ -8,6 +8,10 @@
<name>isLookupUrl</name>
<description>The IS lookUp service endopoint</description>
</property>
<property>
<name>targetPath</name>
<description>The target path</description>
</property>
</parameters>
<start to="ConvertDataset"/>
@ -22,7 +26,7 @@
<mode>cluster</mode>
<name>Convert Baseline to OAF Dataset</name>
<class>eu.dnetllib.dhp.sx.bio.ebi.SparkCreateBaselineDataFrame</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
@ -34,7 +38,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--workingPath</arg><arg>${baselineWorkingPath}</arg>
<arg>--targetPath</arg><arg>${baselineWorkingPath}/transformed</arg>
<arg>--targetPath</arg><arg>${targetPath}</arg>
<arg>--master</arg><arg>yarn</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--hdfsServerUri</arg><arg>${nameNode}</arg>