diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveEntities.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveEntities.scala index f8ebb6800b..be217c5c3f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveEntities.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveEntities.scala @@ -80,12 +80,11 @@ object SparkResolveEntities { implicit val resEncoder: Encoder[Result] = Encoders.kryo(classOf[Result]) import spark.implicits._ - val re: Dataset[(String, Result)] = spark.read.load(s"$workingPath/resolvedEntities").as[Result].map(r => (r.getId, r)) + val re: Dataset[(String, Result)] = spark.read.load(s"$workingPath/resolvedEntities").as[Result].map(r => (r.getId, r))(Encoders.tuple(Encoders.STRING, resEncoder)) entities.foreach { e => { - val currentEntityDataset: Dataset[(String, Result)] = spark.read.text(s"$graphBasePath/$e").as[String].map(s => deserializeObject(s, e)).map(r => (r.getId, r)) - + val currentEntityDataset: Dataset[(String, Result)] = spark.read.text(s"$graphBasePath/$e").as[String].map(s => deserializeObject(s, e)).map(r => (r.getId, r))(Encoders.tuple(Encoders.STRING, resEncoder)) currentEntityDataset.joinWith(re, currentEntityDataset("_1").equalTo(re("_1")), "left").map(k => { diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/resolution/ResolveEntitiesTest.scala b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/resolution/ResolveEntitiesTest.scala index 0d7350fdd5..c22243f944 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/resolution/ResolveEntitiesTest.scala +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/resolution/ResolveEntitiesTest.scala @@ -146,11 +146,11 @@ class ResolveEntitiesTest extends Serializable { implicit val resEncoder: Encoder[Result] = Encoders.kryo(classOf[Result]) val m = new ObjectMapper() SparkResolveEntities.resolveEntities(spark,s"$workingDir/work", s"$workingDir/updates" ) - SparkResolveEntities.generateResolvedEntities(spark,s"$workingDir/work",s"$workingDir/graph" ) + SparkResolveEntities.generateResolvedEntities(spark,s"$workingDir/work",s"$workingDir/graph", s"$workingDir/target" ) - val pubDS:Dataset[Result] = spark.read.text(s"$workingDir/work/resolvedGraph/publication").as[String].map(s => SparkResolveEntities.deserializeObject(s, EntityType.publication)) + val pubDS:Dataset[Result] = spark.read.text(s"$workingDir/target/publication").as[String].map(s => SparkResolveEntities.deserializeObject(s, EntityType.publication)) val t = pubDS.filter(p => p.getTitle!=null && p.getSubject!=null).filter(p => p.getTitle.asScala.exists(t => t.getValue.equalsIgnoreCase("FAKETITLE"))).count() @@ -161,21 +161,21 @@ class ResolveEntitiesTest extends Serializable { - val datDS:Dataset[Result] = spark.read.text(s"$workingDir/work/resolvedGraph/dataset").as[String].map(s => SparkResolveEntities.deserializeObject(s, EntityType.dataset)) + val datDS:Dataset[Result] = spark.read.text(s"$workingDir/target/dataset").as[String].map(s => SparkResolveEntities.deserializeObject(s, EntityType.dataset)) val td = datDS.filter(p => p.getTitle!=null && p.getSubject!=null).filter(p => p.getTitle.asScala.exists(t => t.getValue.equalsIgnoreCase("FAKETITLE"))).count() ct = datDS.count() et = datDS.filter(p => p.getTitle!= null && p.getTitle.asScala.forall(t => t.getValue != null && t.getValue.nonEmpty)).count() assertEquals(ct, et) - val softDS:Dataset[Result] = spark.read.text(s"$workingDir/work/resolvedGraph/software").as[String].map(s => SparkResolveEntities.deserializeObject(s, EntityType.software)) + val softDS:Dataset[Result] = spark.read.text(s"$workingDir/target/software").as[String].map(s => SparkResolveEntities.deserializeObject(s, EntityType.software)) val ts = softDS.filter(p => p.getTitle!=null && p.getSubject!=null).filter(p => p.getTitle.asScala.exists(t => t.getValue.equalsIgnoreCase("FAKETITLE"))).count() ct = softDS.count() et = softDS.filter(p => p.getTitle!= null && p.getTitle.asScala.forall(t => t.getValue != null && t.getValue.nonEmpty)).count() assertEquals(ct, et) - val orpDS:Dataset[Result] = spark.read.text(s"$workingDir/work/resolvedGraph/otherresearchproduct").as[String].map(s => SparkResolveEntities.deserializeObject(s, EntityType.otherresearchproduct)) + val orpDS:Dataset[Result] = spark.read.text(s"$workingDir/target/otherresearchproduct").as[String].map(s => SparkResolveEntities.deserializeObject(s, EntityType.otherresearchproduct)) val to = orpDS.filter(p => p.getTitle!=null && p.getSubject!=null).filter(p => p.getTitle.asScala.exists(t => t.getValue.equalsIgnoreCase("FAKETITLE"))).count()