diff --git a/dhp-common/pom.xml b/dhp-common/pom.xml index 6e656034b..ec7c14e90 100644 --- a/dhp-common/pom.xml +++ b/dhp-common/pom.xml @@ -52,6 +52,8 @@ + true + ${scala.binary.version} ${scala.version} @@ -76,11 +78,11 @@ org.apache.spark - spark-core_2.11 + spark-core_${scala.binary.version} org.apache.spark - spark-sql_2.11 + spark-sql_${scala.binary.version} @@ -159,7 +161,7 @@ eu.dnetlib.dhp - dhp-schemas + dhp-schemas_${scala.binary.version} diff --git a/dhp-pace-core/pom.xml b/dhp-pace-core/pom.xml index fac087ac7..a2525128b 100644 --- a/dhp-pace-core/pom.xml +++ b/dhp-pace-core/pom.xml @@ -20,7 +20,7 @@ net.alchim31.maven scala-maven-plugin - 4.0.1 + ${net.alchim31.maven.version} scala-compile-first @@ -39,8 +39,9 @@ + true + ${scala.binary.version} ${scala.version} - -target:jvm-1.8 @@ -98,11 +99,11 @@ org.apache.spark - spark-core_2.11 + spark-core_${scala.binary.version} org.apache.spark - spark-sql_2.11 + spark-sql_${scala.binary.version} diff --git a/dhp-workflows/dhp-actionmanager/pom.xml b/dhp-workflows/dhp-actionmanager/pom.xml index 87126920b..13ffe8b18 100644 --- a/dhp-workflows/dhp-actionmanager/pom.xml +++ b/dhp-workflows/dhp-actionmanager/pom.xml @@ -11,12 +11,12 @@ org.apache.spark - spark-core_2.11 + spark-core_${scala.binary.version} org.apache.spark - spark-sql_2.11 + spark-sql_${scala.binary.version} diff --git a/dhp-workflows/dhp-aggregation/pom.xml b/dhp-workflows/dhp-aggregation/pom.xml index 53d349d2a..acd04901d 100644 --- a/dhp-workflows/dhp-aggregation/pom.xml +++ b/dhp-workflows/dhp-aggregation/pom.xml @@ -38,6 +38,8 @@ + true + ${scala.binary.version} ${scala.version} @@ -54,11 +56,11 @@ org.apache.spark - spark-core_2.11 + spark-core_${scala.binary.version} org.apache.spark - spark-sql_2.11 + spark-sql_${scala.binary.version} diff --git a/dhp-workflows/dhp-blacklist/pom.xml b/dhp-workflows/dhp-blacklist/pom.xml index 479a9e8c6..7ecc8b35d 100644 --- a/dhp-workflows/dhp-blacklist/pom.xml +++ b/dhp-workflows/dhp-blacklist/pom.xml @@ -16,11 +16,11 @@ org.apache.spark - spark-core_2.11 + spark-core_${scala.binary.version} org.apache.spark - spark-sql_2.11 + spark-sql_${scala.binary.version} diff --git a/dhp-workflows/dhp-broker-events/pom.xml b/dhp-workflows/dhp-broker-events/pom.xml index 01f1ea321..322fc7e93 100644 --- a/dhp-workflows/dhp-broker-events/pom.xml +++ b/dhp-workflows/dhp-broker-events/pom.xml @@ -18,11 +18,11 @@ org.apache.spark - spark-core_2.11 + spark-core_${scala.binary.version} org.apache.spark - spark-sql_2.11 + spark-sql_${scala.binary.version} org.elasticsearch diff --git a/dhp-workflows/dhp-dedup-openaire/pom.xml b/dhp-workflows/dhp-dedup-openaire/pom.xml index 750cc7028..1cadae460 100644 --- a/dhp-workflows/dhp-dedup-openaire/pom.xml +++ b/dhp-workflows/dhp-dedup-openaire/pom.xml @@ -13,7 +13,7 @@ net.alchim31.maven scala-maven-plugin - 4.0.1 + ${net.alchim31.maven.version} scala-compile-first @@ -32,8 +32,9 @@ + true + ${scala.binary.version} ${scala.version} - -target:jvm-1.8 @@ -58,38 +59,67 @@ eu.dnetlib.dhp dhp-common ${project.version} + + + log4j + log4j + + + annotations + org.jetbrains + + + slf4j-api + org.slf4j + + eu.dnetlib.dhp dhp-pace-core ${project.version} + + + jsr305 + com.google.code.findbugs + + + javassist + org.javassist + + + + + + org.apache.commons + commons-lang3 org.scala-lang.modules - scala-java8-compat_2.11 + scala-java8-compat_${scala.binary.version} 1.0.2 org.scala-lang.modules - scala-collection-compat_2.11 - 2.8.0 + scala-collection-compat_${scala.binary.version} + 2.11.0 org.apache.spark - spark-core_2.11 + spark-core_${scala.binary.version} org.apache.spark - spark-sql_2.11 + spark-sql_${scala.binary.version} org.apache.spark - spark-graphx_2.11 + spark-graphx_${scala.binary.version} @@ -107,12 +137,6 @@ jaxen - - com.influxdb - influxdb-client-java - 3.1.0 - - com.fasterxml.jackson.core jackson-databind diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkSimRelsAnalytics.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkSimRelsAnalytics.java new file mode 100644 index 000000000..18f8dd92d --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkSimRelsAnalytics.java @@ -0,0 +1,118 @@ + +package eu.dnetlib.dhp.oa.dedup; + +import java.io.IOException; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.*; +import org.dom4j.DocumentException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.xml.sax.SAXException; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.application.dedup.log.DedupLogModel; +import eu.dnetlib.dhp.application.dedup.log.DedupLogWriter; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import eu.dnetlib.pace.config.DedupConfig; +import eu.dnetlib.pace.model.SparkDedupConfig; + +public class SparkSimRelsAnalytics extends AbstractSparkAction { + + private static final Logger log = LoggerFactory.getLogger(SparkSimRelsAnalytics.class); + + public SparkSimRelsAnalytics(ArgumentApplicationParser parser, SparkSession spark) { + super(parser, spark); + spark.sparkContext().setLogLevel("WARN"); + } + + public static void main(String[] args) throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkSimRelsAnalytics.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json"))); + parser.parseArgument(args); + + SparkConf conf = new SparkConf(); + new SparkSimRelsAnalytics(parser, getSparkSession(conf)) + .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); + } + + @Override + public void run(ISLookUpService isLookUpService) + throws DocumentException, IOException, ISLookUpException, SAXException { + + // read oozie parameters + final String graphBasePath = parser.get("graphBasePath"); + final String isLookUpUrl = parser.get("isLookUpUrl"); + final String actionSetId = parser.get("actionSetId"); + final String workingPath = parser.get("workingPath"); + final int numPartitions = Optional + .ofNullable(parser.get("numPartitions")) + .map(Integer::valueOf) + .orElse(NUM_PARTITIONS); + + log.info("numPartitions: '{}'", numPartitions); + log.info("graphBasePath: '{}'", graphBasePath); + log.info("isLookUpUrl: '{}'", isLookUpUrl); + log.info("actionSetId: '{}'", actionSetId); + log.info("workingPath: '{}'", workingPath); + + final String dfLogPath = parser.get("dataframeLog"); + final String runTag = Optional.ofNullable(parser.get("runTAG")).orElse("UNKNOWN"); + + // for each dedup configuration + for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) { + final long start = System.currentTimeMillis(); + + final String entity = dedupConf.getWf().getEntityType(); + final String subEntity = dedupConf.getWf().getSubEntityValue(); + log.info("Creating simrels for: '{}'", subEntity); + + final String outputPath = DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity); + removeOutputDir(spark, outputPath); + + JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + SparkDedupConfig sparkConfig = new SparkDedupConfig(dedupConf, numPartitions); + + spark.udf().register("collect_sort_slice", sparkConfig.collectSortSliceUDAF()); + + Dataset simRels = spark + .read() + .textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) + .transform(sparkConfig.modelExtractor()) // Extract fields from input json column according to model + // definition + .transform(sparkConfig.generateClustersWithWindows()) // generate pairs according to + // filters, clusters, and model + // definition + .transform(sparkConfig.processClusters()) // process blocks and emits pairs of found + // similarities + .map( + (MapFunction) t -> DedupUtility + .createSimRel(t.getStruct(0).getString(0), t.getStruct(0).getString(1), entity), + Encoders.bean(Relation.class)); + + saveParquet(simRels, outputPath, SaveMode.Overwrite); + final long end = System.currentTimeMillis(); + if (StringUtils.isNotBlank(dfLogPath)) { + final DedupLogModel model = new DedupLogModel(runTag, dedupConf.toString(), subEntity, start, end, + end - start); + new DedupLogWriter(dfLogPath).appendLog(model, spark); + + } + + } + } + +} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/config-default.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/config-default.xml index 2e0ed9aee..862f568c9 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/config-default.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/config-default.xml @@ -13,6 +13,6 @@ oozie.action.sharelib.for.spark - spark2 + spark342 \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml index ba2270c8a..4fd1990b4 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml @@ -126,15 +126,25 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=15000 + --conf spark.sql.shuffle.partitions=5000 + --conf spark.driver.extraJavaOptions="-Xss256k" + --conf spark.executor.extraJavaOptions="-Dlog4j.configuration=spark-log4j.properties -Xss256k" + --conf spark.extraListeners= + --conf spark.sql.queryExecutionListeners= + --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.minExecutors=100 --conf spark.dynamicAllocation.shuffleTracking.enabled=true + --conf spark.network.io.preferDirectBufs=true --conf spark.memory.fraction=0.4 --conf spark.sql.adaptive.coalescePartitions.minPartitionNum=5000 + --conf spark.shuffle.useOldFetchProtocol=true --conf spark.shuffle.service.enabled=true --conf spark.eventLog.enabled=true + --conf spark.executor.heartbeatInterval=60s + --conf spark.network.timeout=640s + --conf spark.sql.legacy.allowUntypedScalaUDF=true --graphBasePath${graphBasePath} --isLookUpUrl${isLookUpUrl} --actionSetId${actionSetId} --workingPath${workingPath} - --numPartitions15000 + --numPartitions5000 - + diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java index ef7cc656c..00c9d5f27 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java @@ -95,6 +95,7 @@ public class SparkDedupTest implements Serializable { final SparkConf conf = new SparkConf(); conf.set("spark.sql.shuffle.partitions", "200"); + conf.set("spark.sql.legacy.allowUntypedScalaUDF", "true"); spark = SparkSession .builder() .appName(SparkDedupTest.class.getSimpleName()) diff --git a/dhp-workflows/dhp-doiboost/pom.xml b/dhp-workflows/dhp-doiboost/pom.xml index 37accbc4f..6e8911fba 100644 --- a/dhp-workflows/dhp-doiboost/pom.xml +++ b/dhp-workflows/dhp-doiboost/pom.xml @@ -33,6 +33,8 @@ + true + ${scala.binary.version} ${scala.version} @@ -70,12 +72,12 @@ org.apache.spark - spark-core_2.11 + spark-core_${scala.binary.version} org.apache.spark - spark-sql_2.11 + spark-sql_${scala.binary.version} diff --git a/dhp-workflows/dhp-enrichment/pom.xml b/dhp-workflows/dhp-enrichment/pom.xml index 591cad252..9698dee03 100644 --- a/dhp-workflows/dhp-enrichment/pom.xml +++ b/dhp-workflows/dhp-enrichment/pom.xml @@ -12,11 +12,11 @@ org.apache.spark - spark-core_2.11 + spark-core_${scala.binary.version} org.apache.spark - spark-sql_2.11 + spark-sql_${scala.binary.version} @@ -27,7 +27,7 @@ org.apache.spark - spark-hive_2.11 + spark-hive_${scala.binary.version} test diff --git a/dhp-workflows/dhp-graph-mapper/pom.xml b/dhp-workflows/dhp-graph-mapper/pom.xml index f579a7d2b..ef35951c0 100644 --- a/dhp-workflows/dhp-graph-mapper/pom.xml +++ b/dhp-workflows/dhp-graph-mapper/pom.xml @@ -14,7 +14,7 @@ net.alchim31.maven scala-maven-plugin - 4.0.1 + ${net.alchim31.maven.version} scala-compile-first @@ -37,6 +37,8 @@ -Xmax-classfile-name 200 + true + ${scala.binary.version} ${scala.version} @@ -64,15 +66,15 @@ org.apache.spark - spark-core_2.11 + spark-core_${scala.binary.version} org.apache.spark - spark-sql_2.11 + spark-sql_${scala.binary.version} org.apache.spark - spark-hive_2.11 + spark-hive_${scala.binary.version} test @@ -125,7 +127,7 @@ org.json4s - json4s-jackson_2.11 + json4s-jackson_${scala.binary.version} diff --git a/dhp-workflows/dhp-graph-provision/pom.xml b/dhp-workflows/dhp-graph-provision/pom.xml index 413cc8cdd..e62fcdf19 100644 --- a/dhp-workflows/dhp-graph-provision/pom.xml +++ b/dhp-workflows/dhp-graph-provision/pom.xml @@ -14,7 +14,7 @@ net.alchim31.maven scala-maven-plugin - 4.0.1 + ${net.alchim31.maven.version} scala-compile-first @@ -37,6 +37,8 @@ -Xmax-classfile-name 200 + true + ${scala.binary.version} ${scala.version} @@ -48,11 +50,11 @@ org.apache.spark - spark-core_2.11 + spark-core_${scala.binary.version} org.apache.spark - spark-sql_2.11 + spark-sql_${scala.binary.version} com.jayway.jsonpath diff --git a/dhp-workflows/dhp-stats-promote/pom.xml b/dhp-workflows/dhp-stats-promote/pom.xml index ce3e739a5..9e17a78dc 100644 --- a/dhp-workflows/dhp-stats-promote/pom.xml +++ b/dhp-workflows/dhp-stats-promote/pom.xml @@ -10,11 +10,11 @@ org.apache.spark - spark-core_2.11 + spark-core_${scala.binary.version} org.apache.spark - spark-sql_2.11 + spark-sql_${scala.binary.version} diff --git a/dhp-workflows/dhp-stats-update/pom.xml b/dhp-workflows/dhp-stats-update/pom.xml index 2bc610c42..f491b5868 100644 --- a/dhp-workflows/dhp-stats-update/pom.xml +++ b/dhp-workflows/dhp-stats-update/pom.xml @@ -10,11 +10,11 @@ org.apache.spark - spark-core_2.11 + spark-core_${scala.binary.version} org.apache.spark - spark-sql_2.11 + spark-sql_${scala.binary.version} diff --git a/dhp-workflows/dhp-usage-raw-data-update/pom.xml b/dhp-workflows/dhp-usage-raw-data-update/pom.xml index 954c8bd39..a9dbb09ae 100644 --- a/dhp-workflows/dhp-usage-raw-data-update/pom.xml +++ b/dhp-workflows/dhp-usage-raw-data-update/pom.xml @@ -46,13 +46,11 @@ org.apache.spark - spark-core_2.11 - 2.2.0 + spark-core_${scala.binary.version} org.apache.spark - spark-sql_2.11 - 2.4.5 + spark-sql_${scala.binary.version} com.googlecode.json-simple diff --git a/dhp-workflows/dhp-usage-stats-build/pom.xml b/dhp-workflows/dhp-usage-stats-build/pom.xml index 54e18580b..56aec73b7 100644 --- a/dhp-workflows/dhp-usage-stats-build/pom.xml +++ b/dhp-workflows/dhp-usage-stats-build/pom.xml @@ -46,13 +46,11 @@ org.apache.spark - spark-core_2.11 - 2.2.0 + spark-core_${scala.binary.version} org.apache.spark - spark-sql_2.11 - 2.4.5 + spark-sql_${scala.binary.version} com.googlecode.json-simple diff --git a/pom.xml b/pom.xml index 4707f7d01..976355453 100644 --- a/pom.xml +++ b/pom.xml @@ -142,7 +142,7 @@ eu.dnetlib.dhp - dhp-schemas + dhp-schemas_${scala.binary.version} ${dhp-schemas.version} @@ -171,25 +171,25 @@ org.apache.spark - spark-core_2.11 + spark-core_${scala.binary.version} ${dhp.spark.version} provided org.apache.spark - spark-sql_2.11 + spark-sql_${scala.binary.version} ${dhp.spark.version} provided org.apache.spark - spark-graphx_2.11 + spark-graphx_${scala.binary.version} ${dhp.spark.version} provided org.apache.spark - spark-hive_2.11 + spark-hive_${scala.binary.version} ${dhp.spark.version} test @@ -295,7 +295,7 @@ com.lucidworks.spark spark-solr - 3.6.0 + 4.0.2 * @@ -523,7 +523,7 @@ org.json4s - json4s-jackson_2.11 + json4s-jackson_${scala.binary.version} ${json4s.version} @@ -699,7 +699,7 @@ org.antipathy - mvn-scalafmt_2.11 + mvn-scalafmt_${scala.binary.version} 1.0.1640073709.733712b @@ -756,7 +756,7 @@ org.antipathy - mvn-scalafmt_2.11 + mvn-scalafmt_${scala.binary.version} https://code-repo.d4science.org/D-Net/dnet-hadoop/raw/branch/beta/dhp-build/dhp-code-style/src/main/resources/scalafmt/scalafmt.conf false @@ -865,17 +865,18 @@ cdh5.9.2 2.6.0-${dhp.cdh.version} 4.1.0-${dhp.cdh.version} - 2.4.0.cloudera2 - 2.9.6 + 3.4.1 + 2.14.2 3.5 true 11.0.2 - 2.11.12 + 2.12.18 + 2.12 5.6.1 3.3.3 3.4.2 [2.12,3.0) - [3.17.1] + 3.17.2-SNAPSHOT [4.0.3] [6.0.5] [3.1.6] @@ -883,13 +884,13 @@ 7.5.0 4.7.2 1.20 - 3.5.3 + 3.7.0-M11 4.13.0 1.8 4.1.2 1.8 4.5.3 - 4.0.1 + 4.8.1 2.2.2 1.1.3 3.2.1