From 613ec5ffceebb11740fc7ec29a406cbf7490ac14 Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Thu, 21 Sep 2023 14:23:37 +0200 Subject: [PATCH] Add profiles for different spark versions: spark-24, spark-34, spark-35 --- .../eu/dnetlib/dhp/common/PacePerson.java | 2 +- dhp-pace-core/pom.xml | 88 ++++++++++++++++- .../eu/dnetlib/pace/model/SparkModel.scala | 9 +- .../eu/dnetlib/pace/util/DiffPatchMatch.java | 1 - .../dnetlib/pace/util/SparkCompatUtils.scala | 12 +++ .../dnetlib/pace/util/SparkCompatUtils.scala | 12 +++ .../ebi/SparkCreateBaselineDataFrame.scala | 5 +- .../createunresolvedentities/ProduceTest.java | 5 +- .../opencitations/ReadCOCITest.java | 4 +- dhp-workflows/dhp-graph-provision/pom.xml | 42 ++++++++- .../dnetlib/dhp/swh/PrepareSWHActionsets.java | 3 +- pom.xml | 94 ++++++++++++++++--- 12 files changed, 245 insertions(+), 32 deletions(-) create mode 100644 dhp-pace-core/src/main/spark-2/eu/dnetlib/pace/util/SparkCompatUtils.scala create mode 100644 dhp-pace-core/src/main/spark-35/eu/dnetlib/pace/util/SparkCompatUtils.scala diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/PacePerson.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/PacePerson.java index fac9a7565..fbf586f8c 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/PacePerson.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/PacePerson.java @@ -38,7 +38,7 @@ public class PacePerson { PacePerson.class .getResourceAsStream( "/eu/dnetlib/dhp/common/name_particles.txt"))); - } catch (IOException e) { + } catch (Exception e) { throw new RuntimeException(e); } } diff --git a/dhp-pace-core/pom.xml b/dhp-pace-core/pom.xml index a6d2538f2..6449b7ec8 100644 --- a/dhp-pace-core/pom.xml +++ b/dhp-pace-core/pom.xml @@ -24,7 +24,7 @@ scala-compile-first - initialize + process-resources add-source compile @@ -95,4 +95,90 @@ + + + spark-24 + + true + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 3.4.0 + + + generate-sources + + add-source + + + + src/main/spark-2 + + + + + + + + + + + spark-34 + + + + + org.codehaus.mojo + build-helper-maven-plugin + 3.4.0 + + + generate-sources + + add-source + + + + src/main/spark-2 + + + + + + + + + + + spark-35 + + + + + org.codehaus.mojo + build-helper-maven-plugin + 3.4.0 + + + generate-sources + + add-source + + + + src/main/spark-35 + + + + + + + + + + diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkModel.scala b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkModel.scala index aa997c6e9..63322738f 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkModel.scala +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkModel.scala @@ -2,11 +2,10 @@ package eu.dnetlib.pace.model import com.jayway.jsonpath.{Configuration, JsonPath} import eu.dnetlib.pace.config.{DedupConfig, Type} -import eu.dnetlib.pace.util.MapDocumentUtil -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import eu.dnetlib.pace.util.{MapDocumentUtil, SparkCompatUtils} +import org.apache.spark.sql.{Dataset, Row} import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} -import org.apache.spark.sql.{Dataset, Row} import java.util.regex.Pattern import scala.collection.JavaConverters._ @@ -48,8 +47,8 @@ case class SparkModel(conf: DedupConfig) { val orderingFieldPosition: Int = schema.fieldIndex(orderingFieldName) - val parseJsonDataset: (Dataset[String] => Dataset[Row]) = df => { - df.map(r => rowFromJson(r))(RowEncoder(schema)) + val parseJsonDataset: (Dataset[String] => Dataset[Row]) = df => { + df.map(r => rowFromJson(r))(SparkCompatUtils.encoderFor(schema)) } def rowFromJson(json: String): Row = { diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/DiffPatchMatch.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/DiffPatchMatch.java index 154bac62c..ac37c5e5a 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/DiffPatchMatch.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/DiffPatchMatch.java @@ -18,7 +18,6 @@ package eu.dnetlib.pace.util; * See the License for the specific language governing permissions and * limitations under the License. */ - /* * Diff Match and Patch * Copyright 2018 The diff-match-patch Authors. diff --git a/dhp-pace-core/src/main/spark-2/eu/dnetlib/pace/util/SparkCompatUtils.scala b/dhp-pace-core/src/main/spark-2/eu/dnetlib/pace/util/SparkCompatUtils.scala new file mode 100644 index 000000000..a426703d6 --- /dev/null +++ b/dhp-pace-core/src/main/spark-2/eu/dnetlib/pace/util/SparkCompatUtils.scala @@ -0,0 +1,12 @@ +package eu.dnetlib.pace.util + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} +import org.apache.spark.sql.types.StructType + +object SparkCompatUtils { + + def encoderFor(schema: StructType): ExpressionEncoder[Row] = { + RowEncoder(schema) + } +} \ No newline at end of file diff --git a/dhp-pace-core/src/main/spark-35/eu/dnetlib/pace/util/SparkCompatUtils.scala b/dhp-pace-core/src/main/spark-35/eu/dnetlib/pace/util/SparkCompatUtils.scala new file mode 100644 index 000000000..cbc454ae2 --- /dev/null +++ b/dhp-pace-core/src/main/spark-35/eu/dnetlib/pace/util/SparkCompatUtils.scala @@ -0,0 +1,12 @@ +package eu.dnetlib.pace.util + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.types.StructType + +object SparkCompatUtils { + + def encoderFor(schema: StructType): ExpressionEncoder[Row] = { + ExpressionEncoder(schema) + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala index 6f5b7110f..11d087583 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala @@ -155,7 +155,8 @@ object SparkCreateBaselineDataFrame { IOUtils.toString( SparkEBILinksToOaf.getClass.getResourceAsStream( "/eu/dnetlib/dhp/sx/bio/ebi/baseline_to_oaf_params.json" - ),Charset.defaultCharset() + ), + Charset.defaultCharset() ) ) parser.parseArgument(args) @@ -198,7 +199,7 @@ object SparkCreateBaselineDataFrame { val ds: Dataset[PMArticle] = spark.createDataset( k.filter(i => i._1.endsWith(".gz")) .flatMap(i => { - val xml =inputFactory.createXMLEventReader(new ByteArrayInputStream(i._2.getBytes())) + val xml = inputFactory.createXMLEventReader(new ByteArrayInputStream(i._2.getBytes())) new PMParser(xml) }) ) diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java index ce116688a..0a4dfc00b 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java @@ -15,10 +15,7 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCITest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCITest.java index 3b416caf2..ebde0ed0c 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCITest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCITest.java @@ -119,7 +119,9 @@ public class ReadCOCITest { workingDir.toString() + "/COCI", "-outputPath", workingDir.toString() + "/COCI_json/", - "-inputFile", "input1;input2;input3;input4;input5" + "-inputFile", "input1;input2;input3;input4;input5", + "-format", + "COCI" }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); diff --git a/dhp-workflows/dhp-graph-provision/pom.xml b/dhp-workflows/dhp-graph-provision/pom.xml index 60c925227..4b4e6c1c4 100644 --- a/dhp-workflows/dhp-graph-provision/pom.xml +++ b/dhp-workflows/dhp-graph-provision/pom.xml @@ -162,6 +162,18 @@ antlr4-runtime org.antlr + + woodstox-core + com.fasterxml.woodstox + + + log4j + * + + + org.apache.logging.log4j + * + @@ -210,7 +222,7 @@ - scala-2.11 + spark-24 true @@ -240,7 +252,7 @@ - scala-2.12 + spark-34 @@ -266,6 +278,32 @@ + + spark-35 + + + + + org.codehaus.mojo + build-helper-maven-plugin + 3.4.0 + + + generate-sources + + add-source + + + + src/main/sparksolr-4 + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/PrepareSWHActionsets.java b/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/PrepareSWHActionsets.java index 2691d4b7e..230a077f7 100644 --- a/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/PrepareSWHActionsets.java +++ b/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/PrepareSWHActionsets.java @@ -17,6 +17,7 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; import org.apache.spark.sql.Dataset; @@ -117,7 +118,7 @@ public class PrepareSWHActionsets { .map( (MapFunction) t -> OBJECT_MAPPER.readValue(t, Software.class), Encoders.bean(Software.class)) - .filter(t -> t.getCodeRepositoryUrl() != null) + .filter((FilterFunction) t -> t.getCodeRepositoryUrl() != null) .select(col("id"), col("codeRepositoryUrl.value").as("repoUrl")); } diff --git a/pom.xml b/pom.xml index 1480af2a6..8c6bcd3d1 100644 --- a/pom.xml +++ b/pom.xml @@ -174,7 +174,7 @@ eu.dnetlib.dhp - ${dhp-schemas.artifact} + dhp-schemas ${dhp-schemas.version} @@ -233,6 +233,13 @@ provided + + org.slf4j + slf4j-log4j12 + ${org.slf4j.version} + provided + + org.slf4j jcl-over-slf4j @@ -240,6 +247,28 @@ provided + + org.apache.logging.log4j + log4j-slf4j2-impl + ${log4j.version} + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + + + + org.apache.logging.log4j + log4j-1.2-api + ${log4j.version} + + org.apache.commons commons-lang3 @@ -381,7 +410,7 @@ org.apache.zookeeper zookeeper - 3.4.11 + ${zookeeper.version} @@ -713,6 +742,7 @@ 3.0.0-M4 true + false @@ -782,7 +812,7 @@ net.revelc.code impsort-maven-plugin - 1.4.1 + 1.6.2 java.,javax.,org.,com. java,* @@ -918,8 +948,6 @@ 4.0.1 - dhp-schemas - 4.1.2 [2.6.1] 1.20 @@ -932,7 +960,7 @@ 1.1.3 1.7 1.0.7 - [3.17.1] + 4.17.2 cdh5.9.2 3.5 11.0.2 @@ -945,6 +973,7 @@ [6.0.5] [3.1.6] 2.2.2 + 1.2.17 3.19.0-GA 3.5.3 4.13.0 @@ -960,12 +989,13 @@ 3.6.0 0.0.7 [2.12,3.0) + 3.4.6 - scala-2.12 + spark-34 2.12 2.12.18 @@ -988,25 +1018,60 @@ 14.0.1 8.11.0 4.0.4 - 3.4.2.openaire-SNAPSHOT + 3.4.2.openaire 2.14.2 3.12.0 + 2.19.0 3.7.0-M11 3.25.0-GA 4.10.0 2.0.6 0.10.2 - + 3.6.3 - java17 + spark-35 + + 2.12 + 2.12.18 + 1.3.0 + + + 4.8.1 + + + 1.23.0 + 1.8 + 1.10.0 + 1.9.4 + 1.16.0 + 3.2.2 + 2.13.0 + 1.1.3 + 1.7 + + 14.0.1 + 8.11.0 + 4.0.4 + 3.5.1.openaire-SNAPSHOT + 2.15.2 + 3.12.0 + 2.20.0 + 3.7.0-M11 + 3.25.0-GA + 4.10.0 + 2.0.7 + 0.10.2 + 3.6.3 + + + + + java11 - 17 + [11 @@ -1031,6 +1096,7 @@ --add-opens=java.base/sun.util.calendar=ALL-UNNAMED true + false