forked from antonis.lempesis/dnet-hadoop
updated CreateInputGraph because ggenerics don't work on Spark Dataset
This commit is contained in:
parent
cd17e19044
commit
0ea576745f
|
@ -34,6 +34,15 @@ object SparkCreateInputGraph {
|
||||||
)
|
)
|
||||||
|
|
||||||
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf])
|
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf])
|
||||||
|
implicit val publicationEncoder: Encoder[Publication] = Encoders.kryo(classOf[Publication])
|
||||||
|
implicit val datasetEncoder: Encoder[OafDataset] = Encoders.kryo(classOf[OafDataset])
|
||||||
|
implicit val softwareEncoder: Encoder[Software] = Encoders.kryo(classOf[Software])
|
||||||
|
implicit val orpEncoder: Encoder[OtherResearchProduct] = Encoders.kryo(classOf[OtherResearchProduct])
|
||||||
|
implicit val relEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation])
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
val sourcePath = parser.get("sourcePath")
|
val sourcePath = parser.get("sourcePath")
|
||||||
log.info(s"sourcePath -> $sourcePath")
|
log.info(s"sourcePath -> $sourcePath")
|
||||||
|
@ -43,9 +52,21 @@ object SparkCreateInputGraph {
|
||||||
|
|
||||||
val oafDs:Dataset[Oaf] = spark.read.load(s"$sourcePath/*").as[Oaf]
|
val oafDs:Dataset[Oaf] = spark.read.load(s"$sourcePath/*").as[Oaf]
|
||||||
|
|
||||||
resultObject.foreach(r => extractEntities(oafDs,s"$targetPath/extracted/${r._1}", r._2, log))
|
|
||||||
|
|
||||||
extractEntities(oafDs,s"$targetPath/extracted/relation", classOf[Relation], log)
|
log.info("Extract Publication")
|
||||||
|
oafDs.filter(o => o.isInstanceOf[Publication]).map(p => p.asInstanceOf[Publication]).write.mode(SaveMode.Overwrite).save(s"$targetPath/extracted/publication")
|
||||||
|
|
||||||
|
log.info("Extract dataset")
|
||||||
|
oafDs.filter(o => o.isInstanceOf[OafDataset]).map(p => p.asInstanceOf[OafDataset]).write.mode(SaveMode.Overwrite).save(s"$targetPath/extracted/dataset")
|
||||||
|
|
||||||
|
log.info("Extract software")
|
||||||
|
oafDs.filter(o => o.isInstanceOf[Software]).map(p => p.asInstanceOf[Software]).write.mode(SaveMode.Overwrite).save(s"$targetPath/extracted/software")
|
||||||
|
|
||||||
|
log.info("Extract otherResearchProduct")
|
||||||
|
oafDs.filter(o => o.isInstanceOf[OtherResearchProduct]).map(p => p.asInstanceOf[OtherResearchProduct]).write.mode(SaveMode.Overwrite).save(s"$targetPath/extracted/otherResearchProduct")
|
||||||
|
|
||||||
|
log.info("Extract Relation")
|
||||||
|
oafDs.filter(o => o.isInstanceOf[Relation]).map(p => p.asInstanceOf[Relation]).write.mode(SaveMode.Overwrite).save(s"$targetPath/extracted/relation")
|
||||||
|
|
||||||
resultObject.foreach { r =>
|
resultObject.foreach { r =>
|
||||||
log.info(s"Make ${r._1} unique")
|
log.info(s"Make ${r._1} unique")
|
||||||
|
|
|
@ -331,7 +331,7 @@ object ScholixUtils {
|
||||||
s.setDate(dt.distinct.asJava)
|
s.setDate(dt.distinct.asJava)
|
||||||
}
|
}
|
||||||
if (r.getDescription!= null && !r.getDescription.isEmpty) {
|
if (r.getDescription!= null && !r.getDescription.isEmpty) {
|
||||||
val d = r.getDescription.asScala.find(f => f.getValue!=null)
|
val d = r.getDescription.asScala.find(f => f!= null && f.getValue!=null)
|
||||||
if (d.isDefined)
|
if (d.isDefined)
|
||||||
s.setDescription(d.get.getValue)
|
s.setDescription(d.get.getValue)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue