From 21a14fcd800944d2a7fca1c70ad77726536f2b97 Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Mon, 15 Jan 2024 00:08:07 +0100 Subject: [PATCH 1/5] Reusable RunSQLSparkJob for executing SQL in Spark through Oozie Spark Actions Implements pivots table update oozie workflow --- .../eu/dnetlib/dhp/oozie/RunSQLSparkJob.java | 75 +++++++++++++++ .../dnetlib/dhp/oozie/run_sql_parameters.json | 20 ++++ .../pivothistory/oozie_app/config-default.xml | 26 +++++ .../oa/dedup/pivothistory/oozie_app/sql.sql | 62 ++++++++++++ .../dedup/pivothistory/oozie_app/workflow.xml | 95 +++++++++++++++++++ 5 files changed, 278 insertions(+) create mode 100644 dhp-common/src/main/java/eu/dnetlib/dhp/oozie/RunSQLSparkJob.java create mode 100644 dhp-common/src/main/resources/eu/dnetlib/dhp/oozie/run_sql_parameters.json create mode 100644 dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/sql.sql create mode 100644 dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/workflow.xml diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oozie/RunSQLSparkJob.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oozie/RunSQLSparkJob.java new file mode 100644 index 000000000..ef296bfc9 --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oozie/RunSQLSparkJob.java @@ -0,0 +1,75 @@ + +package eu.dnetlib.dhp.oozie; + +import com.google.common.io.Resources; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import org.apache.commons.lang3.time.DurationFormatUtils; +import org.apache.commons.text.StringSubstitutor; +import org.apache.spark.SparkConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; + +public class RunSQLSparkJob { + private static final Logger log = LoggerFactory.getLogger(RunSQLSparkJob.class); + + private final ArgumentApplicationParser parser; + + public RunSQLSparkJob(ArgumentApplicationParser parser) { + this.parser = parser; + } + + public static void main(String[] args) throws Exception { + + Map params = new HashMap<>(); + for (int i = 0; i < args.length - 1; i++) { + if (args[i].startsWith("--")) { + params.put(args[i].substring(2), args[++i]); + } + } + + /* + * String jsonConfiguration = IOUtils .toString( Objects .requireNonNull( RunSQLSparkJob.class + * .getResourceAsStream( "/eu/dnetlib/dhp/oozie/run_sql_parameters.json"))); final ArgumentApplicationParser + * parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); + */ + + Boolean isSparkSessionManaged = Optional + .ofNullable(params.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + URL url = com.google.common.io.Resources.getResource(params.get("sql")); + String raw_sql = Resources.toString(url, StandardCharsets.UTF_8); + + String sql = StringSubstitutor.replace(raw_sql, params); + log.info("sql: {}", sql); + + SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", params.get("hiveMetastoreUris")); + + runWithSparkHiveSession( + conf, + isSparkSessionManaged, + spark -> { + for (String statement : sql.split(";\\s*/\\*\\s*EOS\\s*\\*/\\s*")) { + log.info("executing: {}", statement); + long startTime = System.currentTimeMillis(); + spark.sql(statement).show(); + log + .info( + "executed in {}", + DurationFormatUtils.formatDuration(System.currentTimeMillis() - startTime, "HH:mm:ss.S")); + } + }); + } + +} diff --git a/dhp-common/src/main/resources/eu/dnetlib/dhp/oozie/run_sql_parameters.json b/dhp-common/src/main/resources/eu/dnetlib/dhp/oozie/run_sql_parameters.json new file mode 100644 index 000000000..355f38e2f --- /dev/null +++ b/dhp-common/src/main/resources/eu/dnetlib/dhp/oozie/run_sql_parameters.json @@ -0,0 +1,20 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "hmu", + "paramLongName": "hiveMetastoreUris", + "paramDescription": "the hive metastore uris", + "paramRequired": true + }, + { + "paramName": "sql", + "paramLongName": "sql", + "paramDescription": "sql script to execute", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/config-default.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/config-default.xml new file mode 100644 index 000000000..17bb70647 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/config-default.xml @@ -0,0 +1,26 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + hiveMetastoreUris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + sparkSqlWarehouseDir + /user/hive/warehouse + + \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/sql.sql b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/sql.sql new file mode 100644 index 000000000..86dbda1c9 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/sql.sql @@ -0,0 +1,62 @@ + +CREATE TABLE `${pivot_history_db}`.`dataset_new` STORED AS PARQUET AS +WITH pivots ( + SELECT property.value AS id, '${new_graph_date}' AS usedIn FROM `${new_graph_db}`.`relation` + LEFT SEMI JOIN `${new_graph_db}`.`dataset` ON relation.source = dataset.id + LATERAL VIEW EXPLODE(properties) AS property WHERE relClass = 'isMergedIn' AND property.key = 'pivot' +UNION + SELECT id, usedIn FROM `${pivot_history_db}`.`dataset` LATERAL VIEW EXPLODE(usages) AS usedIn +) +SELECT id, min(usedIn) as firstUsage, max(usedIn) as lastUsage, collect_set(usedIn) as usages + FROM pivots + GROUP BY id; /*EOS*/ +CREATE TABLE `${pivot_history_db}`.`publication_new` STORED AS PARQUET AS +WITH pivots ( + SELECT property.value AS id, '${new_graph_date}' AS usedIn FROM `${new_graph_db}`.`relation` + LEFT SEMI JOIN `${new_graph_db}`.`publication` ON relation.source = publication.id + LATERAL VIEW EXPLODE(properties) AS property WHERE relClass = 'isMergedIn' AND property.key = 'pivot' +UNION + SELECT id, usedIn FROM `${pivot_history_db}`.`publication` LATERAL VIEW EXPLODE(usages) AS usedIn +) +SELECT id, min(usedIn) as firstUsage, max(usedIn) as lastUsage, collect_set(usedIn) as usages + FROM pivots + GROUP BY id; /*EOS*/ +CREATE TABLE `${pivot_history_db}`.`software_new` STORED AS PARQUET AS +WITH pivots ( + SELECT property.value AS id, '${new_graph_date}' AS usedIn FROM `${new_graph_db}`.`relation` + LEFT SEMI JOIN `${new_graph_db}`.`software` ON relation.source = software.id + LATERAL VIEW EXPLODE(properties) AS property WHERE relClass = 'isMergedIn' AND property.key = 'pivot' +UNION + SELECT id, usedIn FROM `${pivot_history_db}`.`software` LATERAL VIEW EXPLODE(usages) AS usedIn +) +SELECT id, min(usedIn) as firstUsage, max(usedIn) as lastUsage, collect_set(usedIn) as usages + FROM pivots + GROUP BY id; /*EOS*/ +CREATE TABLE `${pivot_history_db}`.`otherresearchproduct_new` STORED AS PARQUET AS +WITH pivots ( + SELECT property.value AS id, '${new_graph_date}' AS usedIn FROM `${new_graph_db}`.`relation` + LEFT SEMI JOIN `${new_graph_db}`.`otherresearchproduct` ON relation.source = otherresearchproduct.id + LATERAL VIEW EXPLODE(properties) AS property WHERE relClass = 'isMergedIn' AND property.key = 'pivot' +UNION + SELECT id, usedIn FROM `${pivot_history_db}`.`otherresearchproduct` LATERAL VIEW EXPLODE(usages) AS usedIn +) +SELECT id, min(usedIn) as firstUsage, max(usedIn) as lastUsage, collect_set(usedIn) as usages + FROM pivots + GROUP BY id; /*EOS*/ + + +DROP TABLE IF EXISTS `${pivot_history_db}`.`dataset_old`; /*EOS*/ +ALTER TABLE `${pivot_history_db}`.`dataset` RENAME TO `${pivot_history_db}`.`dataset_old`; /*EOS*/ +ALTER TABLE `${pivot_history_db}`.`dataset_new` RENAME TO `${pivot_history_db}`.`dataset`; /*EOS*/ + +DROP TABLE IF EXISTS `${pivot_history_db}`.`publication_old`; /*EOS*/ +ALTER TABLE `${pivot_history_db}`.`publication` RENAME TO `${pivot_history_db}`.`publication_old`; /*EOS*/ +ALTER TABLE `${pivot_history_db}`.`publication_new` RENAME TO `${pivot_history_db}`.`publication`; /*EOS*/ + +DROP TABLE IF EXISTS `${pivot_history_db}`.`software_old`; /*EOS*/ +ALTER TABLE `${pivot_history_db}`.`software` RENAME TO `${pivot_history_db}`.`software_old`; /*EOS*/ +ALTER TABLE `${pivot_history_db}`.`software_new` RENAME TO `${pivot_history_db}`.`software`; /*EOS*/ + +DROP TABLE IF EXISTS `${pivot_history_db}`.`otherresearchproduct_old`; /*EOS*/ +ALTER TABLE `${pivot_history_db}`.`otherresearchproduct` RENAME TO `${pivot_history_db}`.`otherresearchproduct_old`; /*EOS*/ +ALTER TABLE `${pivot_history_db}`.`otherresearchproduct_new` RENAME TO `${pivot_history_db}`.`otherresearchproduct`; /*EOS*/ diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/workflow.xml new file mode 100644 index 000000000..d562f088e --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/workflow.xml @@ -0,0 +1,95 @@ + + + + + pivot_history_db + + Pivot history DB on hive + + + new_graph_db + + New graph DB on hive + + + new_graph_date + + Creation date of new graph db + + + + + hiveMetastoreUris + hive server metastore URIs + + + sparkSqlWarehouseDir + + + + sparkClusterOpts + --conf spark.network.timeout=600 --conf spark.extraListeners= --conf spark.sql.queryExecutionListeners= --conf spark.yarn.historyServer.address=http://iis-cdh5-test-m3.ocean.icm.edu.pl:18088 --conf spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory + spark cluster-wide options + + + sparkResourceOpts + --executor-memory=3G --conf spark.executor.memoryOverhead=3G --executor-cores=6 --driver-memory=8G --driver-cores=4 + spark resource options + + + sparkApplicationOpts + --conf spark.sql.shuffle.partitions=3840 + spark resource options + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + yarn + cluster + Upgrade Pivot History + eu.dnetlib.dhp.oozie.RunSQLSparkJob + dhp-dedup-openaire-${projectVersion}.jar + + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + ${sparkClusterOpts} + ${sparkResourceOpts} + ${sparkApplicationOpts} + + --hiveMetastoreUris${hiveMetastoreUris} + --sqleu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/sql.sql + --pivot_history_db${pivot_history_db} + --new_graph_db${new_graph_db} + --new_graph_date${new_graph_date} + + + + + + + \ No newline at end of file From 2655eea5bc3075d4a649958c61971586db25452d Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 19 Jan 2024 16:28:05 +0100 Subject: [PATCH 2/5] [orcid enrichment] drop paths before copying the non-modifyed contents --- .../dnetlib/dhp/enrich/orcid/oozie_app/workflow.xml | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/enrich/orcid/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/enrich/orcid/oozie_app/workflow.xml index ce117b5e9..bbd3581c5 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/enrich/orcid/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/enrich/orcid/oozie_app/workflow.xml @@ -43,6 +43,17 @@ --graphPath${graphPath} --masteryarn + + + + + + + + + + + From 1c6db320f41882c34299e7c346d1c95d592d2644 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 22 Jan 2024 15:53:17 +0100 Subject: [PATCH 3/5] [graph provision] obtain context info from the context API instead from the ISLookUp service --- .../common/api/context/CategorySummary.java | 39 ++++++++++++++ .../api/context/CategorySummaryList.java | 7 +++ .../common/api/context/ConceptSummary.java | 52 +++++++++++++++++++ .../api/context/ConceptSummaryList.java | 7 +++ .../common/api/context/ContextSummary.java | 50 ++++++++++++++++++ .../api/context/ContextSummaryList.java | 7 +++ .../dhp/oa/provision/XmlConverterJob.java | 6 +-- .../dhp/oa/provision/utils/ContextMapper.java | 45 +++++++++++++++- .../dhp/oa/provision/oozie_app/workflow.xml | 6 ++- 9 files changed, 213 insertions(+), 6 deletions(-) create mode 100644 dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/CategorySummary.java create mode 100644 dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/CategorySummaryList.java create mode 100644 dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/ConceptSummary.java create mode 100644 dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/ConceptSummaryList.java create mode 100644 dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/ContextSummary.java create mode 100644 dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/ContextSummaryList.java diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/CategorySummary.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/CategorySummary.java new file mode 100644 index 000000000..fff28dbdf --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/CategorySummary.java @@ -0,0 +1,39 @@ + +package eu.dnetlib.dhp.common.api.context; + +public class CategorySummary { + + private String id; + + private String label; + + private boolean hasConcept; + + public String getId() { + return id; + } + + public String getLabel() { + return label; + } + + public boolean isHasConcept() { + return hasConcept; + } + + public CategorySummary setId(final String id) { + this.id = id; + return this; + } + + public CategorySummary setLabel(final String label) { + this.label = label; + return this; + } + + public CategorySummary setHasConcept(final boolean hasConcept) { + this.hasConcept = hasConcept; + return this; + } + +} diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/CategorySummaryList.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/CategorySummaryList.java new file mode 100644 index 000000000..7213a945a --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/CategorySummaryList.java @@ -0,0 +1,7 @@ + +package eu.dnetlib.dhp.common.api.context; + +import java.util.ArrayList; + +public class CategorySummaryList extends ArrayList { +} diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/ConceptSummary.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/ConceptSummary.java new file mode 100644 index 000000000..a576f9a1e --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/ConceptSummary.java @@ -0,0 +1,52 @@ + +package eu.dnetlib.dhp.common.api.context; + +import java.util.List; + +public class ConceptSummary { + + private String id; + + private String label; + + public boolean hasSubConcept; + + private List concepts; + + public String getId() { + return id; + } + + public String getLabel() { + return label; + } + + public List getConcepts() { + return concepts; + } + + public ConceptSummary setId(final String id) { + this.id = id; + return this; + } + + public ConceptSummary setLabel(final String label) { + this.label = label; + return this; + } + + public boolean isHasSubConcept() { + return hasSubConcept; + } + + public ConceptSummary setHasSubConcept(final boolean hasSubConcept) { + this.hasSubConcept = hasSubConcept; + return this; + } + + public ConceptSummary setConcept(final List concepts) { + this.concepts = concepts; + return this; + } + +} diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/ConceptSummaryList.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/ConceptSummaryList.java new file mode 100644 index 000000000..45ccd2810 --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/ConceptSummaryList.java @@ -0,0 +1,7 @@ + +package eu.dnetlib.dhp.common.api.context; + +import java.util.ArrayList; + +public class ConceptSummaryList extends ArrayList { +} diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/ContextSummary.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/ContextSummary.java new file mode 100644 index 000000000..46a0d0d5a --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/ContextSummary.java @@ -0,0 +1,50 @@ + +package eu.dnetlib.dhp.common.api.context; + +public class ContextSummary { + + private String id; + + private String label; + + private String type; + + private String status; + + public String getId() { + return id; + } + + public String getLabel() { + return label; + } + + public String getType() { + return type; + } + + public String getStatus() { + return status; + } + + public ContextSummary setId(final String id) { + this.id = id; + return this; + } + + public ContextSummary setLabel(final String label) { + this.label = label; + return this; + } + + public ContextSummary setType(final String type) { + this.type = type; + return this; + } + + public ContextSummary setStatus(final String status) { + this.status = status; + return this; + } + +} diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/ContextSummaryList.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/ContextSummaryList.java new file mode 100644 index 000000000..618600007 --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/ContextSummaryList.java @@ -0,0 +1,7 @@ + +package eu.dnetlib.dhp.common.api.context; + +import java.util.ArrayList; + +public class ContextSummaryList extends ArrayList { +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java index 518f41120..6f43ca3f7 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java @@ -62,8 +62,8 @@ public class XmlConverterJob { final String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); - final String isLookupUrl = parser.get("isLookupUrl"); - log.info("isLookupUrl: {}", isLookupUrl); + final String contextApiBaseUrl = parser.get("contextApiBaseUrl"); + log.info("contextApiBaseUrl: {}", contextApiBaseUrl); final SparkConf conf = new SparkConf(); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); @@ -71,7 +71,7 @@ public class XmlConverterJob { runWithSparkSession(conf, isSparkSessionManaged, spark -> { removeOutputDir(spark, outputPath); - convertToXml(spark, inputPath, outputPath, ContextMapper.fromIS(isLookupUrl)); + convertToXml(spark, inputPath, outputPath, ContextMapper.fromAPI(contextApiBaseUrl)); }); } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/ContextMapper.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/ContextMapper.java index bcaf40603..96d92fed6 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/ContextMapper.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/ContextMapper.java @@ -1,18 +1,22 @@ package eu.dnetlib.dhp.oa.provision.utils; -import java.io.Serializable; -import java.io.StringReader; +import java.io.*; +import java.net.HttpURLConnection; +import java.net.URL; import java.util.HashMap; import org.dom4j.Document; import org.dom4j.DocumentException; import org.dom4j.Node; import org.dom4j.io.SAXReader; +import org.jetbrains.annotations.NotNull; import org.xml.sax.SAXException; import com.google.common.base.Joiner; +import eu.dnetlib.dhp.common.api.context.*; +import eu.dnetlib.dhp.common.rest.DNetRestClient; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; @@ -23,6 +27,42 @@ public class ContextMapper extends HashMap implements Serial private static final String XQUERY = "for $x in //RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='ContextDSResourceType']//*[name()='context' or name()='category' or name()='concept'] return "; + public static ContextMapper fromAPI(final String baseURL) throws Exception { + + final ContextMapper contextMapper = new ContextMapper(); + + for (ContextSummary ctx : DNetRestClient.doGET(baseURL + "/contexts", ContextSummaryList.class)) { + + contextMapper.put(ctx.getId(), new ContextDef(ctx.getId(), ctx.getLabel(), "context", ctx.getType())); + + for (CategorySummary cat : DNetRestClient + .doGET(baseURL + "/context/" + ctx.getId(), CategorySummaryList.class)) { + contextMapper.put(cat.getId(), new ContextDef(cat.getId(), cat.getLabel(), "category", "")); + if (cat.isHasConcept()) { + for (ConceptSummary c : DNetRestClient + .doGET(baseURL + "/context/category/" + cat.getId(), ConceptSummaryList.class)) { + contextMapper.put(c.getId(), new ContextDef(c.getId(), c.getLabel(), "concept", "")); + if (c.isHasSubConcept()) { + for (ConceptSummary cs : c.getConcepts()) { + contextMapper.put(cs.getId(), new ContextDef(cs.getId(), cs.getLabel(), "concept", "")); + if (cs.isHasSubConcept()) { + for (ConceptSummary css : cs.getConcepts()) { + contextMapper + .put( + css.getId(), + new ContextDef(css.getId(), css.getLabel(), "concept", "")); + } + } + } + } + } + } + } + } + return contextMapper; + } + + @Deprecated public static ContextMapper fromIS(final String isLookupUrl) throws DocumentException, ISLookUpException, SAXException { ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl); @@ -32,6 +72,7 @@ public class ContextMapper extends HashMap implements Serial return fromXml(sb.toString()); } + @Deprecated public static ContextMapper fromXml(final String xml) throws DocumentException, SAXException { final ContextMapper contextMapper = new ContextMapper(); diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml index 2e7b11dde..9eab960f0 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml @@ -9,6 +9,10 @@ isLookupUrl URL for the isLookup service + + contextApiBaseUrl + context API URL + relPartitions number or partitions for the relations Dataset @@ -589,7 +593,7 @@ --inputPath${workingDir}/join_entities --outputPath${workingDir}/xml - --isLookupUrl${isLookupUrl} + --contextApiBaseUrl${contextApiBaseUrl} From 6fd25cf549e3892d3d1f114848367ea00dd84399 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Tue, 23 Jan 2024 08:47:12 +0100 Subject: [PATCH 4/5] code formatting --- .../eu/dnetlib/dhp/oozie/RunSQLSparkJob.java | 18 +- .../dhp/oa/dedup/DedupRecordFactory.java | 284 +++++++++--------- .../dhp/oa/dedup/SparkCreateMergeRels.java | 5 +- 3 files changed, 158 insertions(+), 149 deletions(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oozie/RunSQLSparkJob.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oozie/RunSQLSparkJob.java index ef296bfc9..027bf0735 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/oozie/RunSQLSparkJob.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oozie/RunSQLSparkJob.java @@ -1,13 +1,7 @@ package eu.dnetlib.dhp.oozie; -import com.google.common.io.Resources; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import org.apache.commons.lang3.time.DurationFormatUtils; -import org.apache.commons.text.StringSubstitutor; -import org.apache.spark.SparkConf; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; import java.net.URL; import java.nio.charset.StandardCharsets; @@ -15,7 +9,15 @@ import java.util.HashMap; import java.util.Map; import java.util.Optional; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; +import org.apache.commons.lang3.time.DurationFormatUtils; +import org.apache.commons.text.StringSubstitutor; +import org.apache.spark.SparkConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.io.Resources; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; public class RunSQLSparkJob { private static final Logger log = LoggerFactory.getLogger(RunSQLSparkJob.class); diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java index 4c12d1dc6..eddfba309 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java @@ -1,6 +1,16 @@ package eu.dnetlib.dhp.oa.dedup; +import java.util.*; +import java.util.stream.Stream; + +import org.apache.commons.beanutils.BeanUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.ReduceFunction; +import org.apache.spark.sql.*; + import eu.dnetlib.dhp.oa.dedup.model.Identifier; import eu.dnetlib.dhp.oa.merge.AuthorMerger; import eu.dnetlib.dhp.schema.common.ModelSupport; @@ -8,180 +18,176 @@ import eu.dnetlib.dhp.schema.oaf.Author; import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.schema.oaf.Result; -import org.apache.commons.beanutils.BeanUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.api.java.function.ReduceFunction; -import org.apache.spark.sql.*; import scala.Tuple2; import scala.Tuple3; import scala.collection.JavaConversions; -import java.util.*; -import java.util.stream.Stream; - public class DedupRecordFactory { - public static final class DedupRecordReduceState { - public final String dedupId; + public static final class DedupRecordReduceState { + public final String dedupId; - public final ArrayList aliases = new ArrayList<>(); + public final ArrayList aliases = new ArrayList<>(); - public final HashSet acceptanceDate = new HashSet<>(); + public final HashSet acceptanceDate = new HashSet<>(); - public OafEntity entity; + public OafEntity entity; - public DedupRecordReduceState(String dedupId, String id, OafEntity entity) { - this.dedupId = dedupId; - this.entity = entity; - if (entity == null) { - aliases.add(id); - } else { - if (Result.class.isAssignableFrom(entity.getClass())) { - Result result = (Result) entity; - if (result.getDateofacceptance() != null && StringUtils.isNotBlank(result.getDateofacceptance().getValue())) { - acceptanceDate.add(result.getDateofacceptance().getValue()); - } - } - } - } + public DedupRecordReduceState(String dedupId, String id, OafEntity entity) { + this.dedupId = dedupId; + this.entity = entity; + if (entity == null) { + aliases.add(id); + } else { + if (Result.class.isAssignableFrom(entity.getClass())) { + Result result = (Result) entity; + if (result.getDateofacceptance() != null + && StringUtils.isNotBlank(result.getDateofacceptance().getValue())) { + acceptanceDate.add(result.getDateofacceptance().getValue()); + } + } + } + } - public String getDedupId() { - return dedupId; - } - } - private static final int MAX_ACCEPTANCE_DATE = 20; + public String getDedupId() { + return dedupId; + } + } - private DedupRecordFactory() { - } + private static final int MAX_ACCEPTANCE_DATE = 20; - public static Dataset createDedupRecord( - final SparkSession spark, - final DataInfo dataInfo, - final String mergeRelsInputPath, - final String entitiesInputPath, - final Class clazz) { + private DedupRecordFactory() { + } - final long ts = System.currentTimeMillis(); - final Encoder beanEncoder = Encoders.bean(clazz); - final Encoder kryoEncoder = Encoders.kryo(clazz); + public static Dataset createDedupRecord( + final SparkSession spark, + final DataInfo dataInfo, + final String mergeRelsInputPath, + final String entitiesInputPath, + final Class clazz) { - // - Dataset entities = spark - .read() - .schema(Encoders.bean(clazz).schema()) - .json(entitiesInputPath) - .as(beanEncoder) - .map( - (MapFunction>) entity -> { - return new Tuple2<>(entity.getId(), entity); - }, - Encoders.tuple(Encoders.STRING(), kryoEncoder)) - .selectExpr("_1 AS id", "_2 AS kryoObject"); + final long ts = System.currentTimeMillis(); + final Encoder beanEncoder = Encoders.bean(clazz); + final Encoder kryoEncoder = Encoders.kryo(clazz); - // : source is the dedup_id, target is the id of the mergedIn - Dataset mergeRels = spark - .read() - .load(mergeRelsInputPath) - .where("relClass == 'merges'") - .selectExpr("source as dedupId", "target as id"); + // + Dataset entities = spark + .read() + .schema(Encoders.bean(clazz).schema()) + .json(entitiesInputPath) + .as(beanEncoder) + .map( + (MapFunction>) entity -> { + return new Tuple2<>(entity.getId(), entity); + }, + Encoders.tuple(Encoders.STRING(), kryoEncoder)) + .selectExpr("_1 AS id", "_2 AS kryoObject"); - return mergeRels - .join(entities, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left") - .select("dedupId", "id", "kryoObject") - .as(Encoders.tuple(Encoders.STRING(), Encoders.STRING(), kryoEncoder)) - .map((MapFunction, DedupRecordReduceState>) t -> new DedupRecordReduceState(t._1(), t._2(), t._3()), Encoders.kryo(DedupRecordReduceState.class)) - .groupByKey((MapFunction) DedupRecordReduceState::getDedupId, Encoders.STRING()) - .reduceGroups( - (ReduceFunction) (t1, t2) -> { - if (t1.entity == null) { - t2.aliases.addAll(t1.aliases); - return t2; - } - if (t1.acceptanceDate.size() < MAX_ACCEPTANCE_DATE) { - t1.acceptanceDate.addAll(t2.acceptanceDate); - } - t1.aliases.addAll(t2.aliases); - t1.entity = reduceEntity(t1.entity, t2.entity); + // : source is the dedup_id, target is the id of the mergedIn + Dataset mergeRels = spark + .read() + .load(mergeRelsInputPath) + .where("relClass == 'merges'") + .selectExpr("source as dedupId", "target as id"); - return t1; - } - ) - .flatMap - ((FlatMapFunction, OafEntity>) t -> { - String dedupId = t._1(); - DedupRecordReduceState agg = t._2(); + return mergeRels + .join(entities, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left") + .select("dedupId", "id", "kryoObject") + .as(Encoders.tuple(Encoders.STRING(), Encoders.STRING(), kryoEncoder)) + .map( + (MapFunction, DedupRecordReduceState>) t -> new DedupRecordReduceState( + t._1(), t._2(), t._3()), + Encoders.kryo(DedupRecordReduceState.class)) + .groupByKey( + (MapFunction) DedupRecordReduceState::getDedupId, Encoders.STRING()) + .reduceGroups( + (ReduceFunction) (t1, t2) -> { + if (t1.entity == null) { + t2.aliases.addAll(t1.aliases); + return t2; + } + if (t1.acceptanceDate.size() < MAX_ACCEPTANCE_DATE) { + t1.acceptanceDate.addAll(t2.acceptanceDate); + } + t1.aliases.addAll(t2.aliases); + t1.entity = reduceEntity(t1.entity, t2.entity); - if (agg.acceptanceDate.size() >= MAX_ACCEPTANCE_DATE) { - return Collections.emptyIterator(); - } + return t1; + }) + .flatMap((FlatMapFunction, OafEntity>) t -> { + String dedupId = t._1(); + DedupRecordReduceState agg = t._2(); - return Stream.concat(Stream.of(agg.getDedupId()), agg.aliases.stream()) - .map(id -> { - try { - OafEntity res = (OafEntity) BeanUtils.cloneBean(agg.entity); - res.setId(id); - res.setDataInfo(dataInfo); - res.setLastupdatetimestamp(ts); - return res; - } catch (Exception e) { - throw new RuntimeException(e); - } - }).iterator(); - }, beanEncoder); - } + if (agg.acceptanceDate.size() >= MAX_ACCEPTANCE_DATE) { + return Collections.emptyIterator(); + } - private static OafEntity reduceEntity(OafEntity entity, OafEntity duplicate) { + return Stream + .concat(Stream.of(agg.getDedupId()), agg.aliases.stream()) + .map(id -> { + try { + OafEntity res = (OafEntity) BeanUtils.cloneBean(agg.entity); + res.setId(id); + res.setDataInfo(dataInfo); + res.setLastupdatetimestamp(ts); + return res; + } catch (Exception e) { + throw new RuntimeException(e); + } + }) + .iterator(); + }, beanEncoder); + } + + private static OafEntity reduceEntity(OafEntity entity, OafEntity duplicate) { if (duplicate == null) { return entity; } + int compare = new IdentifierComparator<>() + .compare(Identifier.newInstance(entity), Identifier.newInstance(duplicate)); - int compare = new IdentifierComparator<>() - .compare(Identifier.newInstance(entity), Identifier.newInstance(duplicate)); - - if (compare > 0) { + if (compare > 0) { OafEntity swap = duplicate; - duplicate = entity; - entity = swap; - } + duplicate = entity; + entity = swap; + } - entity.mergeFrom(duplicate); + entity.mergeFrom(duplicate); - if (ModelSupport.isSubClass(duplicate, Result.class)) { - Result re = (Result) entity; - Result rd = (Result) duplicate; + if (ModelSupport.isSubClass(duplicate, Result.class)) { + Result re = (Result) entity; + Result rd = (Result) duplicate; - List> authors = new ArrayList<>(); - if (re.getAuthor() != null) { - authors.add(re.getAuthor()); - } - if (rd.getAuthor() != null) { - authors.add(rd.getAuthor()); - } + List> authors = new ArrayList<>(); + if (re.getAuthor() != null) { + authors.add(re.getAuthor()); + } + if (rd.getAuthor() != null) { + authors.add(rd.getAuthor()); + } - re.setAuthor(AuthorMerger.merge(authors)); - } + re.setAuthor(AuthorMerger.merge(authors)); + } - return entity; - } + return entity; + } - public static T entityMerger( - String id, Iterator> entities, long ts, DataInfo dataInfo, Class clazz) { - T base = entities.next()._2(); + public static T entityMerger( + String id, Iterator> entities, long ts, DataInfo dataInfo, Class clazz) { + T base = entities.next()._2(); - while (entities.hasNext()) { - T duplicate = entities.next()._2(); - if (duplicate != null) - base = (T) reduceEntity(base, duplicate); - } + while (entities.hasNext()) { + T duplicate = entities.next()._2(); + if (duplicate != null) + base = (T) reduceEntity(base, duplicate); + } - base.setId(id); - base.setDataInfo(dataInfo); - base.setLastupdatetimestamp(ts); + base.setId(id); + base.setDataInfo(dataInfo); + base.setLastupdatetimestamp(ts); - return base; - } + return base; + } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java index 191870d3b..59626c141 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java @@ -242,13 +242,14 @@ public class SparkCreateMergeRels extends AbstractSparkAction { // this was a pivot in a previous graph but it has been merged into a new group with different // pivot - if (!r.isNullAt(r.fieldIndex("lastUsage")) && !pivot.equals(id) && !dedupId.equals(pivotDedupId)) { + if (!r.isNullAt(r.fieldIndex("lastUsage")) && !pivot.equals(id) + && !dedupId.equals(pivotDedupId)) { // materialize the previous dedup record as a merge relation with the new one res.add(new Tuple3<>(dedupId, pivotDedupId, null)); } // add merge relations - if (cut <=0 || r.getAs("position") <= cut) { + if (cut <= 0 || r. getAs("position") <= cut) { res.add(new Tuple3<>(id, pivotDedupId, pivot)); } From f87f3a6483d1ea18945cd8055a3b97a4973b682a Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Tue, 23 Jan 2024 08:54:37 +0100 Subject: [PATCH 5/5] [graph provision] updated param specification for the XML converter job --- .../dhp/oa/provision/input_params_xml_converter.json | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_xml_converter.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_xml_converter.json index eda6154d7..653a69ed1 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_xml_converter.json +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_xml_converter.json @@ -12,9 +12,9 @@ "paramRequired": true }, { - "paramName": "ilu", - "paramLongName": "isLookupUrl", - "paramDescription": "URL of the isLookUp Service", + "paramName": "cau", + "paramLongName": "contextApiBaseUrl", + "paramDescription": "URL of the context API", "paramRequired": true } ]