forked from D-Net/dnet-hadoop
removed wrong code
This commit is contained in:
parent
cc0f2b11fb
commit
dfcf78cf24
|
@ -1,21 +1,17 @@
|
||||||
package eu.dnetlib.dhp.sx.ebi
|
package eu.dnetlib.dhp.sx.ebi
|
||||||
|
|
||||||
import com.esotericsoftware.kryo.Kryo
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||||
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
|
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
|
||||||
import eu.dnetlib.dhp.schema.oaf.Result
|
import eu.dnetlib.dhp.schema.oaf.Result
|
||||||
|
import eu.dnetlib.dhp.sx.ebi.model._
|
||||||
|
import eu.dnetlib.dhp.utils.ISLookupClientFactory
|
||||||
import org.apache.commons.io.IOUtils
|
import org.apache.commons.io.IOUtils
|
||||||
import org.apache.spark.SparkConf
|
import org.apache.spark.SparkConf
|
||||||
import org.apache.spark.rdd.RDD
|
import org.apache.spark.rdd.RDD
|
||||||
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
|
|
||||||
import eu.dnetlib.dhp.sx.ebi.model.{PMArticle, PMAuthor, PMJournal, PMParser, PubMedToOaf}
|
|
||||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory
|
|
||||||
import org.apache.spark.sql.expressions.Aggregator
|
import org.apache.spark.sql.expressions.Aggregator
|
||||||
import org.objenesis.strategy.StdInstantiatorStrategy
|
import org.apache.spark.sql._
|
||||||
import org.slf4j.{Logger, LoggerFactory}
|
import org.slf4j.{Logger, LoggerFactory}
|
||||||
|
|
||||||
import scala.collection.JavaConverters._
|
|
||||||
|
|
||||||
import scala.io.Source
|
import scala.io.Source
|
||||||
import scala.xml.pull.XMLEventReader
|
import scala.xml.pull.XMLEventReader
|
||||||
|
|
||||||
|
@ -86,23 +82,5 @@ object SparkCreateBaselineDataFrame {
|
||||||
.map(a => PubMedToOaf.convert(a, vocabularies)).as[Result]
|
.map(a => PubMedToOaf.convert(a, vocabularies)).as[Result]
|
||||||
.filter(p => p!= null)
|
.filter(p => p!= null)
|
||||||
.write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_oaf")
|
.write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_oaf")
|
||||||
|
|
||||||
|
|
||||||
def extract_values(a : PMArticle):(String, String) = {
|
|
||||||
val l:String = a.getPublicationTypes.asScala.map(p => p.getValue).mkString(",")
|
|
||||||
|
|
||||||
(a.getPmid, l)
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
val ks:Dataset[(String,String)] =spark.read.load("/data/scholix/baseline_dataset").as[PMArticle].map(a => extract_values(a))(Encoders.tuple(Encoders.STRING,Encoders.STRING))
|
|
||||||
|
|
||||||
val ids:Dataset[String] = spark.read.load("/tmp/missing_pubmed").as[String]
|
|
||||||
|
|
||||||
ks.joinWith(ids, ks("_1").equalTo(ids("value")), "inner").map(k => k._1._2).distinct.show()
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue