From 0ea576745f265e4d44460463580dd48a0da16c6a Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Fri, 9 Jul 2021 10:29:24 +0200 Subject: [PATCH] updated CreateInputGraph because ggenerics don't work on Spark Dataset --- .../dhp/sx/graph/SparkCreateInputGraph.scala | 25 +++++++++++++++++-- .../dhp/sx/graph/scholix/ScholixUtils.scala | 2 +- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala index 5605ad001..a37dd2132 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala @@ -34,6 +34,15 @@ object SparkCreateInputGraph { ) implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf]) + implicit val publicationEncoder: Encoder[Publication] = Encoders.kryo(classOf[Publication]) + implicit val datasetEncoder: Encoder[OafDataset] = Encoders.kryo(classOf[OafDataset]) + implicit val softwareEncoder: Encoder[Software] = Encoders.kryo(classOf[Software]) + implicit val orpEncoder: Encoder[OtherResearchProduct] = Encoders.kryo(classOf[OtherResearchProduct]) + implicit val relEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation]) + + + + val sourcePath = parser.get("sourcePath") log.info(s"sourcePath -> $sourcePath") @@ -43,9 +52,21 @@ object SparkCreateInputGraph { val oafDs:Dataset[Oaf] = spark.read.load(s"$sourcePath/*").as[Oaf] - resultObject.foreach(r => extractEntities(oafDs,s"$targetPath/extracted/${r._1}", r._2, log)) - extractEntities(oafDs,s"$targetPath/extracted/relation", classOf[Relation], log) + log.info("Extract Publication") + oafDs.filter(o => o.isInstanceOf[Publication]).map(p => p.asInstanceOf[Publication]).write.mode(SaveMode.Overwrite).save(s"$targetPath/extracted/publication") + + log.info("Extract dataset") + oafDs.filter(o => o.isInstanceOf[OafDataset]).map(p => p.asInstanceOf[OafDataset]).write.mode(SaveMode.Overwrite).save(s"$targetPath/extracted/dataset") + + log.info("Extract software") + oafDs.filter(o => o.isInstanceOf[Software]).map(p => p.asInstanceOf[Software]).write.mode(SaveMode.Overwrite).save(s"$targetPath/extracted/software") + + log.info("Extract otherResearchProduct") + oafDs.filter(o => o.isInstanceOf[OtherResearchProduct]).map(p => p.asInstanceOf[OtherResearchProduct]).write.mode(SaveMode.Overwrite).save(s"$targetPath/extracted/otherResearchProduct") + + log.info("Extract Relation") + oafDs.filter(o => o.isInstanceOf[Relation]).map(p => p.asInstanceOf[Relation]).write.mode(SaveMode.Overwrite).save(s"$targetPath/extracted/relation") resultObject.foreach { r => log.info(s"Make ${r._1} unique") diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala index eacab631b..6a7ee7803 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala @@ -331,7 +331,7 @@ object ScholixUtils { s.setDate(dt.distinct.asJava) } if (r.getDescription!= null && !r.getDescription.isEmpty) { - val d = r.getDescription.asScala.find(f => f.getValue!=null) + val d = r.getDescription.asScala.find(f => f!= null && f.getValue!=null) if (d.isDefined) s.setDescription(d.get.getValue) }