From 0646d0d0645341020ee12c284e0872e6e450cc11 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Thu, 2 May 2024 15:15:03 +0200 Subject: [PATCH 1/5] Updated main sparkApplication to avoid to require master variable --- .../eu/dnetlib/dhp/application/SparkScalaApplication.scala | 7 ++++--- .../eu/dnetlib/dhp/sx/create_scholix_dump_params.json | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/dhp-common/src/main/scala/eu/dnetlib/dhp/application/SparkScalaApplication.scala b/dhp-common/src/main/scala/eu/dnetlib/dhp/application/SparkScalaApplication.scala index a14c25837..526bbd295 100644 --- a/dhp-common/src/main/scala/eu/dnetlib/dhp/application/SparkScalaApplication.scala +++ b/dhp-common/src/main/scala/eu/dnetlib/dhp/application/SparkScalaApplication.scala @@ -65,12 +65,13 @@ abstract class AbstractScalaApplication( val conf: SparkConf = new SparkConf() val master = parser.get("master") log.info(s"Creating Spark session: Master: $master") - SparkSession + val b = SparkSession .builder() .config(conf) .appName(getClass.getSimpleName) - .master(master) - .getOrCreate() + if (master != null) + b.master(master) + b.getOrCreate() } def reportTotalSize(targetPath: String, outputBasePath: String): Unit = { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/create_scholix_dump_params.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/create_scholix_dump_params.json index fead58ab1..53fe95895 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/create_scholix_dump_params.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/create_scholix_dump_params.json @@ -1,5 +1,5 @@ [ - {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, + {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": false}, {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the source Path", "paramRequired": true}, {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the scholix dump", "paramRequired": true} ] \ No newline at end of file From a860c57bbc2c6ae788c91c103873dc942e7ff473 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Thu, 2 May 2024 15:16:00 +0200 Subject: [PATCH 2/5] updated .gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 14cd4d345..6fafc7055 100644 --- a/.gitignore +++ b/.gitignore @@ -27,3 +27,4 @@ spark-warehouse /**/.factorypath /**/.scalafmt.conf /.java-version +/dhp-shade-package/dependency-reduced-pom.xml From db358ad0d2ffb63cd7215ec89e693274982b78e1 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Thu, 2 May 2024 15:25:57 +0200 Subject: [PATCH 3/5] code formatted --- .../eu/dnetlib/pace/common/PaceCommonUtils.java | 15 ++++++++------- .../main/java/eu/dnetlib/pace/model/Person.java | 11 ++++++----- .../java/eu/dnetlib/pace/util/Capitalise.java | 3 ++- .../pace/common/AbstractPaceFunctions.java | 13 +++++++------ 4 files changed, 23 insertions(+), 19 deletions(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/pace/common/PaceCommonUtils.java b/dhp-common/src/main/java/eu/dnetlib/pace/common/PaceCommonUtils.java index a279271b5..61fbc2470 100644 --- a/dhp-common/src/main/java/eu/dnetlib/pace/common/PaceCommonUtils.java +++ b/dhp-common/src/main/java/eu/dnetlib/pace/common/PaceCommonUtils.java @@ -1,19 +1,20 @@ package eu.dnetlib.pace.common; -import com.google.common.base.Splitter; -import com.google.common.collect.Iterables; -import com.google.common.collect.Sets; -import com.ibm.icu.text.Transliterator; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; - import java.nio.charset.StandardCharsets; import java.text.Normalizer; import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; + +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; +import com.ibm.icu.text.Transliterator; + /** * Set of common functions for the framework * diff --git a/dhp-common/src/main/java/eu/dnetlib/pace/model/Person.java b/dhp-common/src/main/java/eu/dnetlib/pace/model/Person.java index c95c9d823..6a1957183 100644 --- a/dhp-common/src/main/java/eu/dnetlib/pace/model/Person.java +++ b/dhp-common/src/main/java/eu/dnetlib/pace/model/Person.java @@ -1,20 +1,21 @@ package eu.dnetlib.pace.model; +import java.nio.charset.Charset; +import java.text.Normalizer; +import java.util.List; +import java.util.Set; + import com.google.common.base.Joiner; import com.google.common.base.Splitter; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.hash.Hashing; + import eu.dnetlib.pace.common.PaceCommonUtils; import eu.dnetlib.pace.util.Capitalise; import eu.dnetlib.pace.util.DotAbbreviations; -import java.nio.charset.Charset; -import java.text.Normalizer; -import java.util.List; -import java.util.Set; - public class Person { private static final String UTF8 = "UTF-8"; diff --git a/dhp-common/src/main/java/eu/dnetlib/pace/util/Capitalise.java b/dhp-common/src/main/java/eu/dnetlib/pace/util/Capitalise.java index 015386423..671320c71 100644 --- a/dhp-common/src/main/java/eu/dnetlib/pace/util/Capitalise.java +++ b/dhp-common/src/main/java/eu/dnetlib/pace/util/Capitalise.java @@ -1,9 +1,10 @@ package eu.dnetlib.pace.util; -import com.google.common.base.Function; import org.apache.commons.lang3.text.WordUtils; +import com.google.common.base.Function; + public class Capitalise implements Function { private final char[] DELIM = { diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/common/AbstractPaceFunctions.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/common/AbstractPaceFunctions.java index 6bfb8b3f4..b055077d8 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/common/AbstractPaceFunctions.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/common/AbstractPaceFunctions.java @@ -1,12 +1,6 @@ package eu.dnetlib.pace.common; -import com.google.common.base.Joiner; -import com.google.common.collect.Sets; -import com.ibm.icu.text.Transliterator; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; - import java.io.IOException; import java.io.StringWriter; import java.nio.charset.StandardCharsets; @@ -15,6 +9,13 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; + +import com.google.common.base.Joiner; +import com.google.common.collect.Sets; +import com.ibm.icu.text.Transliterator; + /** * Set of common functions for the framework * From 6efab4d88e7ce481896e5569e1801daf81c96777 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Thu, 16 May 2024 16:19:18 +0200 Subject: [PATCH 4/5] fixed scholexplorer bug --- .../dhp/sx/graph/scholix/ScholixUtils.scala | 2 +- dhp-shade-package/pom.xml | 150 +++++++++--------- .../dhp/sx/graph/ScholexplorerUtils.scala | 15 +- .../graph/SparkCreateScholexplorerDump.scala | 23 ++- .../graph/scholix/ScholixGenerationTest.scala | 17 +- 5 files changed, 112 insertions(+), 95 deletions(-) diff --git a/dhp-common/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala b/dhp-common/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala index f256ca1a1..72a17777e 100644 --- a/dhp-common/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala +++ b/dhp-common/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala @@ -389,7 +389,7 @@ object ScholixUtils extends Serializable { if (persistentIdentifiers.isEmpty) return null s.setLocalIdentifier(persistentIdentifiers.asJava) - s.setTypology(r.getResulttype.getClassid) +// s.setTypology(r.getResulttype.getClassid) s.setSubType(r.getInstance().get(0).getInstancetype.getClassname) diff --git a/dhp-shade-package/pom.xml b/dhp-shade-package/pom.xml index 128a57116..fd9c04066 100644 --- a/dhp-shade-package/pom.xml +++ b/dhp-shade-package/pom.xml @@ -31,86 +31,86 @@ dhp-actionmanager ${project.version} - - eu.dnetlib.dhp - dhp-aggregation - ${project.version} - - - eu.dnetlib.dhp - dhp-blacklist - ${project.version} - - - eu.dnetlib.dhp - dhp-broker-events - ${project.version} - - - eu.dnetlib.dhp - dhp-dedup-openaire - ${project.version} - - - eu.dnetlib.dhp - dhp-enrichment - ${project.version} - + + + + + + + + + + + + + + + + + + + + + + + + + eu.dnetlib.dhp dhp-graph-mapper ${project.version} - - eu.dnetlib.dhp - dhp-graph-provision - ${project.version} - - - eu.dnetlib.dhp - dhp-impact-indicators - ${project.version} - - - eu.dnetlib.dhp - dhp-stats-actionsets - ${project.version} - - - eu.dnetlib.dhp - dhp-stats-hist-snaps - ${project.version} - - - eu.dnetlib.dhp - dhp-stats-monitor-irish - ${project.version} - - - eu.dnetlib.dhp - dhp-stats-promote - ${project.version} - - - eu.dnetlib.dhp - dhp-stats-update - ${project.version} - - - eu.dnetlib.dhp - dhp-swh - ${project.version} - - - eu.dnetlib.dhp - dhp-usage-raw-data-update - ${project.version} - - - eu.dnetlib.dhp - dhp-usage-stats-build - ${project.version} - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/ScholexplorerUtils.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/ScholexplorerUtils.scala index 95564d523..f62f271e3 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/ScholexplorerUtils.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/ScholexplorerUtils.scala @@ -1,14 +1,8 @@ package eu.dnetlib.dhp.sx.graph +import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.schema.oaf.{KeyValue, Result, StructuredProperty} -import eu.dnetlib.dhp.schema.sx.scholix.{ - Scholix, - ScholixCollectedFrom, - ScholixEntityId, - ScholixIdentifier, - ScholixRelationship, - ScholixResource -} +import eu.dnetlib.dhp.schema.sx.scholix.{Scholix, ScholixCollectedFrom, ScholixEntityId, ScholixIdentifier, ScholixRelationship, ScholixResource} import org.json4s import org.json4s.DefaultFormats import org.json4s.jackson.JsonMethods.parse @@ -28,6 +22,7 @@ case class RelKeyValue(key: String, value: String) {} object ScholexplorerUtils { val OPENAIRE_IDENTIFIER_SCHEMA: String = "OpenAIRE Identifier" + val mapper= new ObjectMapper() case class RelationVocabulary(original: String, inverse: String) {} @@ -242,7 +237,7 @@ object ScholexplorerUtils { s } - def updateTarget(s: Scholix, t: ScholixResource): Scholix = { + def updateTarget(s: Scholix, t: ScholixResource): String = { s.setTarget(t) val spublishers: Seq[ScholixEntityId] = @@ -251,6 +246,6 @@ object ScholexplorerUtils { if (t.getPublisher != null && !t.getPublisher.isEmpty) t.getPublisher.asScala else List() val mergedPublishers = spublishers.union(tpublishers).distinct.take(10).toList s.setPublisher(mergedPublishers.asJava) - s + mapper.writeValueAsString(s) } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateScholexplorerDump.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateScholexplorerDump.scala index 1211dcc78..32aa68665 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateScholexplorerDump.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateScholexplorerDump.scala @@ -11,7 +11,7 @@ import eu.dnetlib.dhp.schema.oaf.{ Dataset => OafDataset } import eu.dnetlib.dhp.schema.sx.scholix.{Scholix, ScholixResource} -import org.apache.spark.sql.functions.{col, concat, expr, md5} +import org.apache.spark.sql.functions.{col, concat, expr, first, md5} import org.apache.spark.sql.types.StructType import org.apache.spark.sql._ import org.slf4j.{Logger, LoggerFactory} @@ -89,7 +89,13 @@ class SparkCreateScholexplorerDump(propertyPath: String, args: Array[String], lo .withColumn("cf", expr("transform(collectedfrom, x -> struct(x.key, x.value))")) .drop("collectedfrom") .withColumnRenamed("cf", "collectedfrom") - .distinct() + .groupBy(col("id")) + .agg( + first("source").alias("source"), + first("target").alias("target"), + first("relClass").alias("relClass"), + first("collectedfrom").alias("collectedfrom") + ) bidRel.write.mode(SaveMode.Overwrite).save(s"$otuputPath/relation") @@ -97,27 +103,32 @@ class SparkCreateScholexplorerDump(propertyPath: String, args: Array[String], lo def generateScholix(outputPath: String, spark: SparkSession): Unit = { implicit val scholixResourceEncoder: Encoder[ScholixResource] = Encoders.bean(classOf[ScholixResource]) - implicit val scholixEncoder: Encoder[Scholix] = Encoders.bean(classOf[Scholix]) + implicit val scholixEncoder: Encoder[Scholix] = Encoders.kryo(classOf[Scholix]) import spark.implicits._ val relations = spark.read.load(s"$outputPath/relation").as[RelationInfo] val resource = spark.read.load(s"$outputPath/resource").as[ScholixResource] + + val scholix_one_verse = relations .joinWith(resource, relations("source") === resource("dnetIdentifier"), "inner") .map(res => ScholexplorerUtils.generateScholix(res._1, res._2)) + .map(s=> (s.getIdentifier, s))(Encoders.tuple(Encoders.STRING, Encoders.kryo(classOf[Scholix]))) + val resourceTarget = relations .joinWith(resource, relations("target") === resource("dnetIdentifier"), "inner") .map(res => (res._1.id, res._2))(Encoders.tuple(Encoders.STRING, Encoders.kryo(classOf[ScholixResource]))) + scholix_one_verse - .joinWith(resourceTarget, scholix_one_verse("identifier") === resourceTarget("_1"), "inner") - .map(k => ScholexplorerUtils.updateTarget(k._1, k._2._2)) + .joinWith(resourceTarget, scholix_one_verse("_1") === resourceTarget("_1"), "inner") + .map(k => ScholexplorerUtils.updateTarget(k._1._2, k._2._2)) .write .mode(SaveMode.Overwrite) .option("compression", "gzip") - .json(s"$outputPath/scholix") + .text(s"$outputPath/scholix") } } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixGenerationTest.scala b/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixGenerationTest.scala index 0a2872cb4..67d40dcf1 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixGenerationTest.scala +++ b/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixGenerationTest.scala @@ -1,17 +1,28 @@ package eu.dnetlib.dhp.sx.graph.scholix +import eu.dnetlib.dhp.schema.sx.scholix.ScholixResource import eu.dnetlib.dhp.sx.graph.SparkCreateScholexplorerDump -import org.apache.spark.sql.SparkSession +import org.apache.spark.SparkConf +import org.apache.spark.sql.{Encoder, Encoders, SparkSession} import org.junit.jupiter.api.Test +import org.objenesis.strategy.StdInstantiatorStrategy class ScholixGenerationTest { @Test def generateScholix(): Unit = { + val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate() val app = new SparkCreateScholexplorerDump(null, null, null) -// app.generateScholixResource("/home/sandro/Downloads/scholix_sample/", "/home/sandro/Downloads/scholix/", spark) -// app.generateBidirectionalRelations("/home/sandro/Downloads/scholix_sample/", "/home/sandro/Downloads/scholix/", spark) +// app.generateScholixResource("/home/sandro/Downloads/scholix_sample/", "/home/sandro/Downloads/scholix/", spark) +// app.generateBidirectionalRelations( +// "/home/sandro/Downloads/scholix_sample/", +// "/home/sandro/Downloads/scholix/", +// spark +// ) app.generateScholix("/home/sandro/Downloads/scholix/", spark) + + + } } From a87f9ea64317dff7afac5045a4c64bb9c8a26954 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Fri, 17 May 2024 14:16:43 +0200 Subject: [PATCH 5/5] fixed scholexplorer bug --- .../eu/dnetlib/dhp/sx/graph/ScholexplorerUtils.scala | 11 +++++++++-- .../dhp/sx/graph/SparkCreateScholexplorerDump.scala | 6 +----- .../dhp/sx/graph/scholix/ScholixGenerationTest.scala | 2 -- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/ScholexplorerUtils.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/ScholexplorerUtils.scala index f62f271e3..d171d96d9 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/ScholexplorerUtils.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/ScholexplorerUtils.scala @@ -2,7 +2,14 @@ package eu.dnetlib.dhp.sx.graph import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.schema.oaf.{KeyValue, Result, StructuredProperty} -import eu.dnetlib.dhp.schema.sx.scholix.{Scholix, ScholixCollectedFrom, ScholixEntityId, ScholixIdentifier, ScholixRelationship, ScholixResource} +import eu.dnetlib.dhp.schema.sx.scholix.{ + Scholix, + ScholixCollectedFrom, + ScholixEntityId, + ScholixIdentifier, + ScholixRelationship, + ScholixResource +} import org.json4s import org.json4s.DefaultFormats import org.json4s.jackson.JsonMethods.parse @@ -22,7 +29,7 @@ case class RelKeyValue(key: String, value: String) {} object ScholexplorerUtils { val OPENAIRE_IDENTIFIER_SCHEMA: String = "OpenAIRE Identifier" - val mapper= new ObjectMapper() + val mapper = new ObjectMapper() case class RelationVocabulary(original: String, inverse: String) {} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateScholexplorerDump.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateScholexplorerDump.scala index 32aa68665..dd420ab95 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateScholexplorerDump.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateScholexplorerDump.scala @@ -109,19 +109,15 @@ class SparkCreateScholexplorerDump(propertyPath: String, args: Array[String], lo val relations = spark.read.load(s"$outputPath/relation").as[RelationInfo] val resource = spark.read.load(s"$outputPath/resource").as[ScholixResource] - - val scholix_one_verse = relations .joinWith(resource, relations("source") === resource("dnetIdentifier"), "inner") .map(res => ScholexplorerUtils.generateScholix(res._1, res._2)) - .map(s=> (s.getIdentifier, s))(Encoders.tuple(Encoders.STRING, Encoders.kryo(classOf[Scholix]))) - + .map(s => (s.getIdentifier, s))(Encoders.tuple(Encoders.STRING, Encoders.kryo(classOf[Scholix]))) val resourceTarget = relations .joinWith(resource, relations("target") === resource("dnetIdentifier"), "inner") .map(res => (res._1.id, res._2))(Encoders.tuple(Encoders.STRING, Encoders.kryo(classOf[ScholixResource]))) - scholix_one_verse .joinWith(resourceTarget, scholix_one_verse("_1") === resourceTarget("_1"), "inner") .map(k => ScholexplorerUtils.updateTarget(k._1._2, k._2._2)) diff --git a/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixGenerationTest.scala b/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixGenerationTest.scala index 67d40dcf1..204fe9794 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixGenerationTest.scala +++ b/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixGenerationTest.scala @@ -22,7 +22,5 @@ class ScholixGenerationTest { // ) app.generateScholix("/home/sandro/Downloads/scholix/", spark) - - } }