diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveEntities.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveEntities.scala index 316b8afed..f8ebb6800 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveEntities.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveEntities.scala @@ -14,7 +14,7 @@ import org.slf4j.{Logger, LoggerFactory} object SparkResolveEntities { val mapper = new ObjectMapper() - val entities = List(EntityType.dataset,EntityType.publication, EntityType.software, EntityType.otherresearchproduct) + val entities = List(EntityType.dataset, EntityType.publication, EntityType.software, EntityType.otherresearchproduct) def main(args: Array[String]): Unit = { val log: Logger = LoggerFactory.getLogger(getClass) @@ -36,25 +36,19 @@ object SparkResolveEntities { val unresolvedPath = parser.get("unresolvedPath") log.info(s"unresolvedPath -> $unresolvedPath") + val targetPath = parser.get("targetPath") + log.info(s"targetPath -> $targetPath") + + val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration) fs.mkdirs(new Path(workingPath)) resolveEntities(spark, workingPath, unresolvedPath) - generateResolvedEntities(spark, workingPath, graphBasePath) - - // TO BE conservative we keep the original entities in the working dir - // and save the resolved entities on the graphBasePath - //In future these lines of code should be removed - entities.foreach { - e => - fs.rename(new Path(s"$graphBasePath/$e"), new Path(s"$workingPath/${e}_old")) - fs.rename(new Path(s"$workingPath/resolvedGraph/$e"), new Path(s"$graphBasePath/$e")) - } - -} + generateResolvedEntities(spark, workingPath, graphBasePath, targetPath) + } -def resolveEntities(spark: SparkSession, workingPath: String, unresolvedPath: String) = { + def resolveEntities(spark: SparkSession, workingPath: String, unresolvedPath: String) = { implicit val resEncoder: Encoder[Result] = Encoders.kryo(classOf[Result]) import spark.implicits._ @@ -71,37 +65,43 @@ def resolveEntities(spark: SparkSession, workingPath: String, unresolvedPath: St } - def deserializeObject(input:String, entity:EntityType ) :Result = { + def deserializeObject(input: String, entity: EntityType): Result = { - entity match { - case EntityType.publication => mapper.readValue(input, classOf[Publication]) - case EntityType.dataset => mapper.readValue(input, classOf[OafDataset]) - case EntityType.software=> mapper.readValue(input, classOf[Software]) - case EntityType.otherresearchproduct=> mapper.readValue(input, classOf[OtherResearchProduct]) - } + entity match { + case EntityType.publication => mapper.readValue(input, classOf[Publication]) + case EntityType.dataset => mapper.readValue(input, classOf[OafDataset]) + case EntityType.software => mapper.readValue(input, classOf[Software]) + case EntityType.otherresearchproduct => mapper.readValue(input, classOf[OtherResearchProduct]) + } } - def generateResolvedEntities(spark:SparkSession, workingPath: String, graphBasePath:String) = { + def generateResolvedEntities(spark: SparkSession, workingPath: String, graphBasePath: String, targetPath:String) = { implicit val resEncoder: Encoder[Result] = Encoders.kryo(classOf[Result]) import spark.implicits._ - val re:Dataset[Result] = spark.read.load(s"$workingPath/resolvedEntities").as[Result] + val re: Dataset[(String, Result)] = spark.read.load(s"$workingPath/resolvedEntities").as[Result].map(r => (r.getId, r)) entities.foreach { - e => + e => { + + val currentEntityDataset: Dataset[(String, Result)] = spark.read.text(s"$graphBasePath/$e").as[String].map(s => deserializeObject(s, e)).map(r => (r.getId, r)) + + + currentEntityDataset.joinWith(re, currentEntityDataset("_1").equalTo(re("_1")), "left").map(k => { + + val a = k._1 + val b = k._2 + if (b == null) + a._2 + else { + a._2.mergeFrom(b._2) + a._2 + } + }).map(r => mapper.writeValueAsString(r))(Encoders.STRING) + .write.mode(SaveMode.Overwrite).option("compression", "gzip").text(s"$targetPath/$e") + } + - spark.read.text(s"$graphBasePath/$e").as[String] - .map(s => deserializeObject(s, e)) - .union(re) - .groupByKey(_.getId) - .reduceGroups { - (x, y) => - x.mergeFrom(y) - x - }.map(_._2) - .filter(r => r.getClass.getSimpleName.toLowerCase != "result") - .map(r => mapper.writeValueAsString(r))(Encoders.STRING) - .write.mode(SaveMode.Overwrite).option("compression", "gzip").text(s"$workingPath/resolvedGraph/$e") } } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveRelation.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveRelation.scala index cd517dd5e..a194f2694 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveRelation.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveRelation.scala @@ -35,6 +35,9 @@ object SparkResolveRelation { val workingPath = parser.get("workingPath") log.info(s"workingPath -> $workingPath") + val targetPath = parser.get("targetPath") + log.info(s"targetPath -> $targetPath") + implicit val relEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation]) import spark.implicits._ @@ -80,20 +83,13 @@ object SparkResolveRelation { .mode(SaveMode.Overwrite) .save(s"$workingPath/relation_resolved") - - // TO BE conservative we keep the original relation in the working dir - // and save the relation resolved on the graphBasePath - //In future this two line of code should be removed - - fs.rename(new Path(s"$graphBasePath/relation"), new Path(s"$workingPath/relation")) - spark.read.load(s"$workingPath/relation_resolved").as[Relation] .filter(r => !r.getSource.startsWith("unresolved") && !r.getTarget.startsWith("unresolved")) .map(r => mapper.writeValueAsString(r)) .write .option("compression", "gzip") .mode(SaveMode.Overwrite) - .text(s"$graphBasePath/relation") + .text(s"$targetPath/relation") } def extractInstanceCF(input: String): List[(String, String)] = { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/oozie_app/workflow.xml index ceb13c5e8..8859295bf 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/oozie_app/workflow.xml @@ -8,6 +8,10 @@ unresolvedPath the path of the unresolved Entities + + targetPath + the target path after resolution + @@ -36,6 +40,7 @@ --masteryarn --graphBasePath${graphBasePath} --workingPath${workingDir} + --targetPath${targetPath} @@ -62,6 +67,7 @@ --graphBasePath${graphBasePath} --unresolvedPath${unresolvedPath} --workingPath${workingDir} + --targetPath${targetPath} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/resolve_entities_params.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/resolve_entities_params.json index f38cc1291..67e315664 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/resolve_entities_params.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/resolve_entities_params.json @@ -2,5 +2,6 @@ {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, {"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the source Path", "paramRequired": true}, {"paramName":"u", "paramLongName":"unresolvedPath", "paramDescription": "the source Path", "paramRequired": true}, - {"paramName":"g", "paramLongName":"graphBasePath", "paramDescription": "the path of the raw graph", "paramRequired": true} + {"paramName":"g", "paramLongName":"graphBasePath", "paramDescription": "the path of the raw graph", "paramRequired": true}, + {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the target path", "paramRequired": true} ] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/resolve_relations_params.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/resolve_relations_params.json index 1fbe20648..66a035da5 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/resolve_relations_params.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/resolve_relations_params.json @@ -1,5 +1,6 @@ [ {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, {"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the source Path", "paramRequired": true}, - {"paramName":"g", "paramLongName":"graphBasePath", "paramDescription": "the path of the raw graph", "paramRequired": true} + {"paramName":"g", "paramLongName":"graphBasePath", "paramDescription": "the path of the raw graph", "paramRequired": true}, + {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the target path", "paramRequired": true} ] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/resolution/ResolveEntitiesTest.scala b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/resolution/ResolveEntitiesTest.scala index 46bf48974..0d7350fdd 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/resolution/ResolveEntitiesTest.scala +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/resolution/ResolveEntitiesTest.scala @@ -4,7 +4,7 @@ package eu.dnetlib.dhp.oa.graph.resolution import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.schema.common.EntityType import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils -import eu.dnetlib.dhp.schema.oaf.{Result, StructuredProperty} +import eu.dnetlib.dhp.schema.oaf.{Publication, Result, StructuredProperty} import org.apache.commons.io.FileUtils import org.apache.spark.SparkConf import org.apache.spark.sql._ @@ -154,19 +154,39 @@ class ResolveEntitiesTest extends Serializable { val t = pubDS.filter(p => p.getTitle!=null && p.getSubject!=null).filter(p => p.getTitle.asScala.exists(t => t.getValue.equalsIgnoreCase("FAKETITLE"))).count() + var ct = pubDS.count() + var et = pubDS.filter(p => p.getTitle!= null && p.getTitle.asScala.forall(t => t.getValue != null && t.getValue.nonEmpty)).count() + + assertEquals(ct, et) + + val datDS:Dataset[Result] = spark.read.text(s"$workingDir/work/resolvedGraph/dataset").as[String].map(s => SparkResolveEntities.deserializeObject(s, EntityType.dataset)) val td = datDS.filter(p => p.getTitle!=null && p.getSubject!=null).filter(p => p.getTitle.asScala.exists(t => t.getValue.equalsIgnoreCase("FAKETITLE"))).count() + ct = datDS.count() + et = datDS.filter(p => p.getTitle!= null && p.getTitle.asScala.forall(t => t.getValue != null && t.getValue.nonEmpty)).count() + assertEquals(ct, et) val softDS:Dataset[Result] = spark.read.text(s"$workingDir/work/resolvedGraph/software").as[String].map(s => SparkResolveEntities.deserializeObject(s, EntityType.software)) val ts = softDS.filter(p => p.getTitle!=null && p.getSubject!=null).filter(p => p.getTitle.asScala.exists(t => t.getValue.equalsIgnoreCase("FAKETITLE"))).count() + ct = softDS.count() + et = softDS.filter(p => p.getTitle!= null && p.getTitle.asScala.forall(t => t.getValue != null && t.getValue.nonEmpty)).count() + assertEquals(ct, et) val orpDS:Dataset[Result] = spark.read.text(s"$workingDir/work/resolvedGraph/otherresearchproduct").as[String].map(s => SparkResolveEntities.deserializeObject(s, EntityType.otherresearchproduct)) val to = orpDS.filter(p => p.getTitle!=null && p.getSubject!=null).filter(p => p.getTitle.asScala.exists(t => t.getValue.equalsIgnoreCase("FAKETITLE"))).count() + ct = orpDS.count() + et = orpDS.filter(p => p.getTitle!= null && p.getTitle.asScala.forall(t => t.getValue != null && t.getValue.nonEmpty)).count() + assertEquals(ct, et) + + + + + assertEquals(0, t) assertEquals(2, td) assertEquals(1, ts) @@ -178,6 +198,32 @@ class ResolveEntitiesTest extends Serializable { + @Test + def testMerge():Unit = { + + val r = new Result + r.setSubject(List(OafMapperUtils.structuredProperty(FAKE_SUBJECT, OafMapperUtils.qualifier("fos","fosCS", "fossSchema", "fossiFIgo"), null)).asJava) + + val mapper = new ObjectMapper() + + val p = mapper.readValue(Source.fromInputStream(this.getClass.getResourceAsStream(s"publication")).mkString.lines.next(), classOf[Publication]) + + + r.mergeFrom(p) + + + println(mapper.writeValueAsString(r)) + + + + + + + + + } + +