From 21a14fcd800944d2a7fca1c70ad77726536f2b97 Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Mon, 15 Jan 2024 00:08:07 +0100 Subject: [PATCH 01/11] Reusable RunSQLSparkJob for executing SQL in Spark through Oozie Spark Actions Implements pivots table update oozie workflow --- .../eu/dnetlib/dhp/oozie/RunSQLSparkJob.java | 75 +++++++++++++++ .../dnetlib/dhp/oozie/run_sql_parameters.json | 20 ++++ .../pivothistory/oozie_app/config-default.xml | 26 +++++ .../oa/dedup/pivothistory/oozie_app/sql.sql | 62 ++++++++++++ .../dedup/pivothistory/oozie_app/workflow.xml | 95 +++++++++++++++++++ 5 files changed, 278 insertions(+) create mode 100644 dhp-common/src/main/java/eu/dnetlib/dhp/oozie/RunSQLSparkJob.java create mode 100644 dhp-common/src/main/resources/eu/dnetlib/dhp/oozie/run_sql_parameters.json create mode 100644 dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/sql.sql create mode 100644 dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/workflow.xml diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oozie/RunSQLSparkJob.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oozie/RunSQLSparkJob.java new file mode 100644 index 0000000000..ef296bfc90 --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oozie/RunSQLSparkJob.java @@ -0,0 +1,75 @@ + +package eu.dnetlib.dhp.oozie; + +import com.google.common.io.Resources; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import org.apache.commons.lang3.time.DurationFormatUtils; +import org.apache.commons.text.StringSubstitutor; +import org.apache.spark.SparkConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; + +public class RunSQLSparkJob { + private static final Logger log = LoggerFactory.getLogger(RunSQLSparkJob.class); + + private final ArgumentApplicationParser parser; + + public RunSQLSparkJob(ArgumentApplicationParser parser) { + this.parser = parser; + } + + public static void main(String[] args) throws Exception { + + Map params = new HashMap<>(); + for (int i = 0; i < args.length - 1; i++) { + if (args[i].startsWith("--")) { + params.put(args[i].substring(2), args[++i]); + } + } + + /* + * String jsonConfiguration = IOUtils .toString( Objects .requireNonNull( RunSQLSparkJob.class + * .getResourceAsStream( "/eu/dnetlib/dhp/oozie/run_sql_parameters.json"))); final ArgumentApplicationParser + * parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); + */ + + Boolean isSparkSessionManaged = Optional + .ofNullable(params.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + URL url = com.google.common.io.Resources.getResource(params.get("sql")); + String raw_sql = Resources.toString(url, StandardCharsets.UTF_8); + + String sql = StringSubstitutor.replace(raw_sql, params); + log.info("sql: {}", sql); + + SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", params.get("hiveMetastoreUris")); + + runWithSparkHiveSession( + conf, + isSparkSessionManaged, + spark -> { + for (String statement : sql.split(";\\s*/\\*\\s*EOS\\s*\\*/\\s*")) { + log.info("executing: {}", statement); + long startTime = System.currentTimeMillis(); + spark.sql(statement).show(); + log + .info( + "executed in {}", + DurationFormatUtils.formatDuration(System.currentTimeMillis() - startTime, "HH:mm:ss.S")); + } + }); + } + +} diff --git a/dhp-common/src/main/resources/eu/dnetlib/dhp/oozie/run_sql_parameters.json b/dhp-common/src/main/resources/eu/dnetlib/dhp/oozie/run_sql_parameters.json new file mode 100644 index 0000000000..355f38e2fc --- /dev/null +++ b/dhp-common/src/main/resources/eu/dnetlib/dhp/oozie/run_sql_parameters.json @@ -0,0 +1,20 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "hmu", + "paramLongName": "hiveMetastoreUris", + "paramDescription": "the hive metastore uris", + "paramRequired": true + }, + { + "paramName": "sql", + "paramLongName": "sql", + "paramDescription": "sql script to execute", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/config-default.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/config-default.xml new file mode 100644 index 0000000000..17bb706477 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/config-default.xml @@ -0,0 +1,26 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + hiveMetastoreUris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + sparkSqlWarehouseDir + /user/hive/warehouse + + \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/sql.sql b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/sql.sql new file mode 100644 index 0000000000..86dbda1c97 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/sql.sql @@ -0,0 +1,62 @@ + +CREATE TABLE `${pivot_history_db}`.`dataset_new` STORED AS PARQUET AS +WITH pivots ( + SELECT property.value AS id, '${new_graph_date}' AS usedIn FROM `${new_graph_db}`.`relation` + LEFT SEMI JOIN `${new_graph_db}`.`dataset` ON relation.source = dataset.id + LATERAL VIEW EXPLODE(properties) AS property WHERE relClass = 'isMergedIn' AND property.key = 'pivot' +UNION + SELECT id, usedIn FROM `${pivot_history_db}`.`dataset` LATERAL VIEW EXPLODE(usages) AS usedIn +) +SELECT id, min(usedIn) as firstUsage, max(usedIn) as lastUsage, collect_set(usedIn) as usages + FROM pivots + GROUP BY id; /*EOS*/ +CREATE TABLE `${pivot_history_db}`.`publication_new` STORED AS PARQUET AS +WITH pivots ( + SELECT property.value AS id, '${new_graph_date}' AS usedIn FROM `${new_graph_db}`.`relation` + LEFT SEMI JOIN `${new_graph_db}`.`publication` ON relation.source = publication.id + LATERAL VIEW EXPLODE(properties) AS property WHERE relClass = 'isMergedIn' AND property.key = 'pivot' +UNION + SELECT id, usedIn FROM `${pivot_history_db}`.`publication` LATERAL VIEW EXPLODE(usages) AS usedIn +) +SELECT id, min(usedIn) as firstUsage, max(usedIn) as lastUsage, collect_set(usedIn) as usages + FROM pivots + GROUP BY id; /*EOS*/ +CREATE TABLE `${pivot_history_db}`.`software_new` STORED AS PARQUET AS +WITH pivots ( + SELECT property.value AS id, '${new_graph_date}' AS usedIn FROM `${new_graph_db}`.`relation` + LEFT SEMI JOIN `${new_graph_db}`.`software` ON relation.source = software.id + LATERAL VIEW EXPLODE(properties) AS property WHERE relClass = 'isMergedIn' AND property.key = 'pivot' +UNION + SELECT id, usedIn FROM `${pivot_history_db}`.`software` LATERAL VIEW EXPLODE(usages) AS usedIn +) +SELECT id, min(usedIn) as firstUsage, max(usedIn) as lastUsage, collect_set(usedIn) as usages + FROM pivots + GROUP BY id; /*EOS*/ +CREATE TABLE `${pivot_history_db}`.`otherresearchproduct_new` STORED AS PARQUET AS +WITH pivots ( + SELECT property.value AS id, '${new_graph_date}' AS usedIn FROM `${new_graph_db}`.`relation` + LEFT SEMI JOIN `${new_graph_db}`.`otherresearchproduct` ON relation.source = otherresearchproduct.id + LATERAL VIEW EXPLODE(properties) AS property WHERE relClass = 'isMergedIn' AND property.key = 'pivot' +UNION + SELECT id, usedIn FROM `${pivot_history_db}`.`otherresearchproduct` LATERAL VIEW EXPLODE(usages) AS usedIn +) +SELECT id, min(usedIn) as firstUsage, max(usedIn) as lastUsage, collect_set(usedIn) as usages + FROM pivots + GROUP BY id; /*EOS*/ + + +DROP TABLE IF EXISTS `${pivot_history_db}`.`dataset_old`; /*EOS*/ +ALTER TABLE `${pivot_history_db}`.`dataset` RENAME TO `${pivot_history_db}`.`dataset_old`; /*EOS*/ +ALTER TABLE `${pivot_history_db}`.`dataset_new` RENAME TO `${pivot_history_db}`.`dataset`; /*EOS*/ + +DROP TABLE IF EXISTS `${pivot_history_db}`.`publication_old`; /*EOS*/ +ALTER TABLE `${pivot_history_db}`.`publication` RENAME TO `${pivot_history_db}`.`publication_old`; /*EOS*/ +ALTER TABLE `${pivot_history_db}`.`publication_new` RENAME TO `${pivot_history_db}`.`publication`; /*EOS*/ + +DROP TABLE IF EXISTS `${pivot_history_db}`.`software_old`; /*EOS*/ +ALTER TABLE `${pivot_history_db}`.`software` RENAME TO `${pivot_history_db}`.`software_old`; /*EOS*/ +ALTER TABLE `${pivot_history_db}`.`software_new` RENAME TO `${pivot_history_db}`.`software`; /*EOS*/ + +DROP TABLE IF EXISTS `${pivot_history_db}`.`otherresearchproduct_old`; /*EOS*/ +ALTER TABLE `${pivot_history_db}`.`otherresearchproduct` RENAME TO `${pivot_history_db}`.`otherresearchproduct_old`; /*EOS*/ +ALTER TABLE `${pivot_history_db}`.`otherresearchproduct_new` RENAME TO `${pivot_history_db}`.`otherresearchproduct`; /*EOS*/ diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/workflow.xml new file mode 100644 index 0000000000..d562f088e9 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/workflow.xml @@ -0,0 +1,95 @@ + + + + + pivot_history_db + + Pivot history DB on hive + + + new_graph_db + + New graph DB on hive + + + new_graph_date + + Creation date of new graph db + + + + + hiveMetastoreUris + hive server metastore URIs + + + sparkSqlWarehouseDir + + + + sparkClusterOpts + --conf spark.network.timeout=600 --conf spark.extraListeners= --conf spark.sql.queryExecutionListeners= --conf spark.yarn.historyServer.address=http://iis-cdh5-test-m3.ocean.icm.edu.pl:18088 --conf spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory + spark cluster-wide options + + + sparkResourceOpts + --executor-memory=3G --conf spark.executor.memoryOverhead=3G --executor-cores=6 --driver-memory=8G --driver-cores=4 + spark resource options + + + sparkApplicationOpts + --conf spark.sql.shuffle.partitions=3840 + spark resource options + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + yarn + cluster + Upgrade Pivot History + eu.dnetlib.dhp.oozie.RunSQLSparkJob + dhp-dedup-openaire-${projectVersion}.jar + + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + ${sparkClusterOpts} + ${sparkResourceOpts} + ${sparkApplicationOpts} + + --hiveMetastoreUris${hiveMetastoreUris} + --sqleu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/sql.sql + --pivot_history_db${pivot_history_db} + --new_graph_db${new_graph_db} + --new_graph_date${new_graph_date} + + + + + + + \ No newline at end of file From 1c6db320f41882c34299e7c346d1c95d592d2644 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 22 Jan 2024 15:53:17 +0100 Subject: [PATCH 02/11] [graph provision] obtain context info from the context API instead from the ISLookUp service --- .../common/api/context/CategorySummary.java | 39 ++++++++++++++ .../api/context/CategorySummaryList.java | 7 +++ .../common/api/context/ConceptSummary.java | 52 +++++++++++++++++++ .../api/context/ConceptSummaryList.java | 7 +++ .../common/api/context/ContextSummary.java | 50 ++++++++++++++++++ .../api/context/ContextSummaryList.java | 7 +++ .../dhp/oa/provision/XmlConverterJob.java | 6 +-- .../dhp/oa/provision/utils/ContextMapper.java | 45 +++++++++++++++- .../dhp/oa/provision/oozie_app/workflow.xml | 6 ++- 9 files changed, 213 insertions(+), 6 deletions(-) create mode 100644 dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/CategorySummary.java create mode 100644 dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/CategorySummaryList.java create mode 100644 dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/ConceptSummary.java create mode 100644 dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/ConceptSummaryList.java create mode 100644 dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/ContextSummary.java create mode 100644 dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/ContextSummaryList.java diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/CategorySummary.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/CategorySummary.java new file mode 100644 index 0000000000..fff28dbdfd --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/CategorySummary.java @@ -0,0 +1,39 @@ + +package eu.dnetlib.dhp.common.api.context; + +public class CategorySummary { + + private String id; + + private String label; + + private boolean hasConcept; + + public String getId() { + return id; + } + + public String getLabel() { + return label; + } + + public boolean isHasConcept() { + return hasConcept; + } + + public CategorySummary setId(final String id) { + this.id = id; + return this; + } + + public CategorySummary setLabel(final String label) { + this.label = label; + return this; + } + + public CategorySummary setHasConcept(final boolean hasConcept) { + this.hasConcept = hasConcept; + return this; + } + +} diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/CategorySummaryList.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/CategorySummaryList.java new file mode 100644 index 0000000000..7213a945a6 --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/CategorySummaryList.java @@ -0,0 +1,7 @@ + +package eu.dnetlib.dhp.common.api.context; + +import java.util.ArrayList; + +public class CategorySummaryList extends ArrayList { +} diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/ConceptSummary.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/ConceptSummary.java new file mode 100644 index 0000000000..a576f9a1e1 --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/ConceptSummary.java @@ -0,0 +1,52 @@ + +package eu.dnetlib.dhp.common.api.context; + +import java.util.List; + +public class ConceptSummary { + + private String id; + + private String label; + + public boolean hasSubConcept; + + private List concepts; + + public String getId() { + return id; + } + + public String getLabel() { + return label; + } + + public List getConcepts() { + return concepts; + } + + public ConceptSummary setId(final String id) { + this.id = id; + return this; + } + + public ConceptSummary setLabel(final String label) { + this.label = label; + return this; + } + + public boolean isHasSubConcept() { + return hasSubConcept; + } + + public ConceptSummary setHasSubConcept(final boolean hasSubConcept) { + this.hasSubConcept = hasSubConcept; + return this; + } + + public ConceptSummary setConcept(final List concepts) { + this.concepts = concepts; + return this; + } + +} diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/ConceptSummaryList.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/ConceptSummaryList.java new file mode 100644 index 0000000000..45ccd28109 --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/ConceptSummaryList.java @@ -0,0 +1,7 @@ + +package eu.dnetlib.dhp.common.api.context; + +import java.util.ArrayList; + +public class ConceptSummaryList extends ArrayList { +} diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/ContextSummary.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/ContextSummary.java new file mode 100644 index 0000000000..46a0d0d5ad --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/ContextSummary.java @@ -0,0 +1,50 @@ + +package eu.dnetlib.dhp.common.api.context; + +public class ContextSummary { + + private String id; + + private String label; + + private String type; + + private String status; + + public String getId() { + return id; + } + + public String getLabel() { + return label; + } + + public String getType() { + return type; + } + + public String getStatus() { + return status; + } + + public ContextSummary setId(final String id) { + this.id = id; + return this; + } + + public ContextSummary setLabel(final String label) { + this.label = label; + return this; + } + + public ContextSummary setType(final String type) { + this.type = type; + return this; + } + + public ContextSummary setStatus(final String status) { + this.status = status; + return this; + } + +} diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/ContextSummaryList.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/ContextSummaryList.java new file mode 100644 index 0000000000..6186000077 --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/context/ContextSummaryList.java @@ -0,0 +1,7 @@ + +package eu.dnetlib.dhp.common.api.context; + +import java.util.ArrayList; + +public class ContextSummaryList extends ArrayList { +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java index 518f411204..6f43ca3f72 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java @@ -62,8 +62,8 @@ public class XmlConverterJob { final String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); - final String isLookupUrl = parser.get("isLookupUrl"); - log.info("isLookupUrl: {}", isLookupUrl); + final String contextApiBaseUrl = parser.get("contextApiBaseUrl"); + log.info("contextApiBaseUrl: {}", contextApiBaseUrl); final SparkConf conf = new SparkConf(); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); @@ -71,7 +71,7 @@ public class XmlConverterJob { runWithSparkSession(conf, isSparkSessionManaged, spark -> { removeOutputDir(spark, outputPath); - convertToXml(spark, inputPath, outputPath, ContextMapper.fromIS(isLookupUrl)); + convertToXml(spark, inputPath, outputPath, ContextMapper.fromAPI(contextApiBaseUrl)); }); } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/ContextMapper.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/ContextMapper.java index bcaf406039..96d92fed6e 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/ContextMapper.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/ContextMapper.java @@ -1,18 +1,22 @@ package eu.dnetlib.dhp.oa.provision.utils; -import java.io.Serializable; -import java.io.StringReader; +import java.io.*; +import java.net.HttpURLConnection; +import java.net.URL; import java.util.HashMap; import org.dom4j.Document; import org.dom4j.DocumentException; import org.dom4j.Node; import org.dom4j.io.SAXReader; +import org.jetbrains.annotations.NotNull; import org.xml.sax.SAXException; import com.google.common.base.Joiner; +import eu.dnetlib.dhp.common.api.context.*; +import eu.dnetlib.dhp.common.rest.DNetRestClient; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; @@ -23,6 +27,42 @@ public class ContextMapper extends HashMap implements Serial private static final String XQUERY = "for $x in //RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='ContextDSResourceType']//*[name()='context' or name()='category' or name()='concept'] return "; + public static ContextMapper fromAPI(final String baseURL) throws Exception { + + final ContextMapper contextMapper = new ContextMapper(); + + for (ContextSummary ctx : DNetRestClient.doGET(baseURL + "/contexts", ContextSummaryList.class)) { + + contextMapper.put(ctx.getId(), new ContextDef(ctx.getId(), ctx.getLabel(), "context", ctx.getType())); + + for (CategorySummary cat : DNetRestClient + .doGET(baseURL + "/context/" + ctx.getId(), CategorySummaryList.class)) { + contextMapper.put(cat.getId(), new ContextDef(cat.getId(), cat.getLabel(), "category", "")); + if (cat.isHasConcept()) { + for (ConceptSummary c : DNetRestClient + .doGET(baseURL + "/context/category/" + cat.getId(), ConceptSummaryList.class)) { + contextMapper.put(c.getId(), new ContextDef(c.getId(), c.getLabel(), "concept", "")); + if (c.isHasSubConcept()) { + for (ConceptSummary cs : c.getConcepts()) { + contextMapper.put(cs.getId(), new ContextDef(cs.getId(), cs.getLabel(), "concept", "")); + if (cs.isHasSubConcept()) { + for (ConceptSummary css : cs.getConcepts()) { + contextMapper + .put( + css.getId(), + new ContextDef(css.getId(), css.getLabel(), "concept", "")); + } + } + } + } + } + } + } + } + return contextMapper; + } + + @Deprecated public static ContextMapper fromIS(final String isLookupUrl) throws DocumentException, ISLookUpException, SAXException { ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl); @@ -32,6 +72,7 @@ public class ContextMapper extends HashMap implements Serial return fromXml(sb.toString()); } + @Deprecated public static ContextMapper fromXml(final String xml) throws DocumentException, SAXException { final ContextMapper contextMapper = new ContextMapper(); diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml index 2e7b11ddee..9eab960f07 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml @@ -9,6 +9,10 @@ isLookupUrl URL for the isLookup service + + contextApiBaseUrl + context API URL + relPartitions number or partitions for the relations Dataset @@ -589,7 +593,7 @@ --inputPath${workingDir}/join_entities --outputPath${workingDir}/xml - --isLookupUrl${isLookupUrl} + --contextApiBaseUrl${contextApiBaseUrl} From 6fd25cf549e3892d3d1f114848367ea00dd84399 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Tue, 23 Jan 2024 08:47:12 +0100 Subject: [PATCH 03/11] code formatting --- .../eu/dnetlib/dhp/oozie/RunSQLSparkJob.java | 18 +- .../dhp/oa/dedup/DedupRecordFactory.java | 284 +++++++++--------- .../dhp/oa/dedup/SparkCreateMergeRels.java | 5 +- 3 files changed, 158 insertions(+), 149 deletions(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oozie/RunSQLSparkJob.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oozie/RunSQLSparkJob.java index ef296bfc90..027bf0735d 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/oozie/RunSQLSparkJob.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oozie/RunSQLSparkJob.java @@ -1,13 +1,7 @@ package eu.dnetlib.dhp.oozie; -import com.google.common.io.Resources; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import org.apache.commons.lang3.time.DurationFormatUtils; -import org.apache.commons.text.StringSubstitutor; -import org.apache.spark.SparkConf; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; import java.net.URL; import java.nio.charset.StandardCharsets; @@ -15,7 +9,15 @@ import java.util.HashMap; import java.util.Map; import java.util.Optional; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; +import org.apache.commons.lang3.time.DurationFormatUtils; +import org.apache.commons.text.StringSubstitutor; +import org.apache.spark.SparkConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.io.Resources; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; public class RunSQLSparkJob { private static final Logger log = LoggerFactory.getLogger(RunSQLSparkJob.class); diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java index 4c12d1dc65..eddfba309d 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java @@ -1,6 +1,16 @@ package eu.dnetlib.dhp.oa.dedup; +import java.util.*; +import java.util.stream.Stream; + +import org.apache.commons.beanutils.BeanUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.ReduceFunction; +import org.apache.spark.sql.*; + import eu.dnetlib.dhp.oa.dedup.model.Identifier; import eu.dnetlib.dhp.oa.merge.AuthorMerger; import eu.dnetlib.dhp.schema.common.ModelSupport; @@ -8,180 +18,176 @@ import eu.dnetlib.dhp.schema.oaf.Author; import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.schema.oaf.Result; -import org.apache.commons.beanutils.BeanUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.api.java.function.ReduceFunction; -import org.apache.spark.sql.*; import scala.Tuple2; import scala.Tuple3; import scala.collection.JavaConversions; -import java.util.*; -import java.util.stream.Stream; - public class DedupRecordFactory { - public static final class DedupRecordReduceState { - public final String dedupId; + public static final class DedupRecordReduceState { + public final String dedupId; - public final ArrayList aliases = new ArrayList<>(); + public final ArrayList aliases = new ArrayList<>(); - public final HashSet acceptanceDate = new HashSet<>(); + public final HashSet acceptanceDate = new HashSet<>(); - public OafEntity entity; + public OafEntity entity; - public DedupRecordReduceState(String dedupId, String id, OafEntity entity) { - this.dedupId = dedupId; - this.entity = entity; - if (entity == null) { - aliases.add(id); - } else { - if (Result.class.isAssignableFrom(entity.getClass())) { - Result result = (Result) entity; - if (result.getDateofacceptance() != null && StringUtils.isNotBlank(result.getDateofacceptance().getValue())) { - acceptanceDate.add(result.getDateofacceptance().getValue()); - } - } - } - } + public DedupRecordReduceState(String dedupId, String id, OafEntity entity) { + this.dedupId = dedupId; + this.entity = entity; + if (entity == null) { + aliases.add(id); + } else { + if (Result.class.isAssignableFrom(entity.getClass())) { + Result result = (Result) entity; + if (result.getDateofacceptance() != null + && StringUtils.isNotBlank(result.getDateofacceptance().getValue())) { + acceptanceDate.add(result.getDateofacceptance().getValue()); + } + } + } + } - public String getDedupId() { - return dedupId; - } - } - private static final int MAX_ACCEPTANCE_DATE = 20; + public String getDedupId() { + return dedupId; + } + } - private DedupRecordFactory() { - } + private static final int MAX_ACCEPTANCE_DATE = 20; - public static Dataset createDedupRecord( - final SparkSession spark, - final DataInfo dataInfo, - final String mergeRelsInputPath, - final String entitiesInputPath, - final Class clazz) { + private DedupRecordFactory() { + } - final long ts = System.currentTimeMillis(); - final Encoder beanEncoder = Encoders.bean(clazz); - final Encoder kryoEncoder = Encoders.kryo(clazz); + public static Dataset createDedupRecord( + final SparkSession spark, + final DataInfo dataInfo, + final String mergeRelsInputPath, + final String entitiesInputPath, + final Class clazz) { - // - Dataset entities = spark - .read() - .schema(Encoders.bean(clazz).schema()) - .json(entitiesInputPath) - .as(beanEncoder) - .map( - (MapFunction>) entity -> { - return new Tuple2<>(entity.getId(), entity); - }, - Encoders.tuple(Encoders.STRING(), kryoEncoder)) - .selectExpr("_1 AS id", "_2 AS kryoObject"); + final long ts = System.currentTimeMillis(); + final Encoder beanEncoder = Encoders.bean(clazz); + final Encoder kryoEncoder = Encoders.kryo(clazz); - // : source is the dedup_id, target is the id of the mergedIn - Dataset mergeRels = spark - .read() - .load(mergeRelsInputPath) - .where("relClass == 'merges'") - .selectExpr("source as dedupId", "target as id"); + // + Dataset entities = spark + .read() + .schema(Encoders.bean(clazz).schema()) + .json(entitiesInputPath) + .as(beanEncoder) + .map( + (MapFunction>) entity -> { + return new Tuple2<>(entity.getId(), entity); + }, + Encoders.tuple(Encoders.STRING(), kryoEncoder)) + .selectExpr("_1 AS id", "_2 AS kryoObject"); - return mergeRels - .join(entities, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left") - .select("dedupId", "id", "kryoObject") - .as(Encoders.tuple(Encoders.STRING(), Encoders.STRING(), kryoEncoder)) - .map((MapFunction, DedupRecordReduceState>) t -> new DedupRecordReduceState(t._1(), t._2(), t._3()), Encoders.kryo(DedupRecordReduceState.class)) - .groupByKey((MapFunction) DedupRecordReduceState::getDedupId, Encoders.STRING()) - .reduceGroups( - (ReduceFunction) (t1, t2) -> { - if (t1.entity == null) { - t2.aliases.addAll(t1.aliases); - return t2; - } - if (t1.acceptanceDate.size() < MAX_ACCEPTANCE_DATE) { - t1.acceptanceDate.addAll(t2.acceptanceDate); - } - t1.aliases.addAll(t2.aliases); - t1.entity = reduceEntity(t1.entity, t2.entity); + // : source is the dedup_id, target is the id of the mergedIn + Dataset mergeRels = spark + .read() + .load(mergeRelsInputPath) + .where("relClass == 'merges'") + .selectExpr("source as dedupId", "target as id"); - return t1; - } - ) - .flatMap - ((FlatMapFunction, OafEntity>) t -> { - String dedupId = t._1(); - DedupRecordReduceState agg = t._2(); + return mergeRels + .join(entities, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left") + .select("dedupId", "id", "kryoObject") + .as(Encoders.tuple(Encoders.STRING(), Encoders.STRING(), kryoEncoder)) + .map( + (MapFunction, DedupRecordReduceState>) t -> new DedupRecordReduceState( + t._1(), t._2(), t._3()), + Encoders.kryo(DedupRecordReduceState.class)) + .groupByKey( + (MapFunction) DedupRecordReduceState::getDedupId, Encoders.STRING()) + .reduceGroups( + (ReduceFunction) (t1, t2) -> { + if (t1.entity == null) { + t2.aliases.addAll(t1.aliases); + return t2; + } + if (t1.acceptanceDate.size() < MAX_ACCEPTANCE_DATE) { + t1.acceptanceDate.addAll(t2.acceptanceDate); + } + t1.aliases.addAll(t2.aliases); + t1.entity = reduceEntity(t1.entity, t2.entity); - if (agg.acceptanceDate.size() >= MAX_ACCEPTANCE_DATE) { - return Collections.emptyIterator(); - } + return t1; + }) + .flatMap((FlatMapFunction, OafEntity>) t -> { + String dedupId = t._1(); + DedupRecordReduceState agg = t._2(); - return Stream.concat(Stream.of(agg.getDedupId()), agg.aliases.stream()) - .map(id -> { - try { - OafEntity res = (OafEntity) BeanUtils.cloneBean(agg.entity); - res.setId(id); - res.setDataInfo(dataInfo); - res.setLastupdatetimestamp(ts); - return res; - } catch (Exception e) { - throw new RuntimeException(e); - } - }).iterator(); - }, beanEncoder); - } + if (agg.acceptanceDate.size() >= MAX_ACCEPTANCE_DATE) { + return Collections.emptyIterator(); + } - private static OafEntity reduceEntity(OafEntity entity, OafEntity duplicate) { + return Stream + .concat(Stream.of(agg.getDedupId()), agg.aliases.stream()) + .map(id -> { + try { + OafEntity res = (OafEntity) BeanUtils.cloneBean(agg.entity); + res.setId(id); + res.setDataInfo(dataInfo); + res.setLastupdatetimestamp(ts); + return res; + } catch (Exception e) { + throw new RuntimeException(e); + } + }) + .iterator(); + }, beanEncoder); + } + + private static OafEntity reduceEntity(OafEntity entity, OafEntity duplicate) { if (duplicate == null) { return entity; } + int compare = new IdentifierComparator<>() + .compare(Identifier.newInstance(entity), Identifier.newInstance(duplicate)); - int compare = new IdentifierComparator<>() - .compare(Identifier.newInstance(entity), Identifier.newInstance(duplicate)); - - if (compare > 0) { + if (compare > 0) { OafEntity swap = duplicate; - duplicate = entity; - entity = swap; - } + duplicate = entity; + entity = swap; + } - entity.mergeFrom(duplicate); + entity.mergeFrom(duplicate); - if (ModelSupport.isSubClass(duplicate, Result.class)) { - Result re = (Result) entity; - Result rd = (Result) duplicate; + if (ModelSupport.isSubClass(duplicate, Result.class)) { + Result re = (Result) entity; + Result rd = (Result) duplicate; - List> authors = new ArrayList<>(); - if (re.getAuthor() != null) { - authors.add(re.getAuthor()); - } - if (rd.getAuthor() != null) { - authors.add(rd.getAuthor()); - } + List> authors = new ArrayList<>(); + if (re.getAuthor() != null) { + authors.add(re.getAuthor()); + } + if (rd.getAuthor() != null) { + authors.add(rd.getAuthor()); + } - re.setAuthor(AuthorMerger.merge(authors)); - } + re.setAuthor(AuthorMerger.merge(authors)); + } - return entity; - } + return entity; + } - public static T entityMerger( - String id, Iterator> entities, long ts, DataInfo dataInfo, Class clazz) { - T base = entities.next()._2(); + public static T entityMerger( + String id, Iterator> entities, long ts, DataInfo dataInfo, Class clazz) { + T base = entities.next()._2(); - while (entities.hasNext()) { - T duplicate = entities.next()._2(); - if (duplicate != null) - base = (T) reduceEntity(base, duplicate); - } + while (entities.hasNext()) { + T duplicate = entities.next()._2(); + if (duplicate != null) + base = (T) reduceEntity(base, duplicate); + } - base.setId(id); - base.setDataInfo(dataInfo); - base.setLastupdatetimestamp(ts); + base.setId(id); + base.setDataInfo(dataInfo); + base.setLastupdatetimestamp(ts); - return base; - } + return base; + } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java index 191870d3b0..59626c1414 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java @@ -242,13 +242,14 @@ public class SparkCreateMergeRels extends AbstractSparkAction { // this was a pivot in a previous graph but it has been merged into a new group with different // pivot - if (!r.isNullAt(r.fieldIndex("lastUsage")) && !pivot.equals(id) && !dedupId.equals(pivotDedupId)) { + if (!r.isNullAt(r.fieldIndex("lastUsage")) && !pivot.equals(id) + && !dedupId.equals(pivotDedupId)) { // materialize the previous dedup record as a merge relation with the new one res.add(new Tuple3<>(dedupId, pivotDedupId, null)); } // add merge relations - if (cut <=0 || r.getAs("position") <= cut) { + if (cut <= 0 || r. getAs("position") <= cut) { res.add(new Tuple3<>(id, pivotDedupId, pivot)); } From f87f3a6483d1ea18945cd8055a3b97a4973b682a Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Tue, 23 Jan 2024 08:54:37 +0100 Subject: [PATCH 04/11] [graph provision] updated param specification for the XML converter job --- .../dhp/oa/provision/input_params_xml_converter.json | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_xml_converter.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_xml_converter.json index eda6154d7e..653a69ed11 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_xml_converter.json +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_xml_converter.json @@ -12,9 +12,9 @@ "paramRequired": true }, { - "paramName": "ilu", - "paramLongName": "isLookupUrl", - "paramDescription": "URL of the isLookUp Service", + "paramName": "cau", + "paramLongName": "contextApiBaseUrl", + "paramDescription": "URL of the context API", "paramRequired": true } ] From 3e96777cc4ce5896a0c1f1af5b5adf00546fec04 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Tue, 23 Jan 2024 15:21:03 +0100 Subject: [PATCH 05/11] [collection] increased logging from the oai-pmh metadata collection process --- .../dhp/common/collection/HttpConnector2.java | 22 ++++++++++++++----- 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/collection/HttpConnector2.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/collection/HttpConnector2.java index 905457bcd0..08cc3ec595 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/collection/HttpConnector2.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/collection/HttpConnector2.java @@ -8,10 +8,13 @@ import java.io.InputStream; import java.net.*; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.math.NumberUtils; +import org.apache.commons.lang3.time.DateUtils; import org.apache.http.HttpHeaders; +import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -98,6 +101,7 @@ public class HttpConnector2 { InputStream input = null; + long start = System.currentTimeMillis(); try { if (getClientParams().getRequestDelay() > 0) { backoffAndSleep(getClientParams().getRequestDelay()); @@ -115,9 +119,8 @@ public class HttpConnector2 { urlConn.addRequestProperty(headerEntry.getKey(), headerEntry.getValue()); } } - if (log.isDebugEnabled()) { - logHeaderFields(urlConn); - } + + logHeaderFields(urlConn); int retryAfter = obtainRetryAfter(urlConn.getHeaderFields()); String rateLimit = urlConn.getHeaderField(Constants.HTTPHEADER_IETF_DRAFT_RATELIMIT_LIMIT); @@ -167,12 +170,14 @@ public class HttpConnector2 { .warn( "{} - waiting and repeating request after default delay of {} sec.", requestUrl, getClientParams().getRetryDelay()); - backoffAndSleep(retryNumber * getClientParams().getRetryDelay() * 1000); + backoffAndSleep(retryNumber * getClientParams().getRetryDelay()); } report.put(REPORT_PREFIX + urlConn.getResponseCode(), requestUrl); urlConn.disconnect(); return attemptDownload(requestUrl, retryNumber + 1, report); default: + log.error("gor error {} from URL: {}", urlConn.getResponseCode(), urlConn.getURL()); + log.error("response message: {}", urlConn.getResponseMessage()); report .put( REPORT_PREFIX + urlConn.getResponseCode(), @@ -196,16 +201,21 @@ public class HttpConnector2 { report.put(e.getClass().getName(), e.getMessage()); backoffAndSleep(getClientParams().getRetryDelay() * retryNumber * 1000); return attemptDownload(requestUrl, retryNumber + 1, report); + } finally { + log + .info( + "request time elapsed: {}sec", + TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - start)); } } private void logHeaderFields(final HttpURLConnection urlConn) throws IOException { - log.debug("StatusCode: {}", urlConn.getResponseMessage()); + log.info("StatusCode: {}", urlConn.getResponseMessage()); for (Map.Entry> e : urlConn.getHeaderFields().entrySet()) { if (e.getKey() != null) { for (String v : e.getValue()) { - log.debug(" key: {} - value: {}", e.getKey(), v); + log.info(" key: {} - value: {}", e.getKey(), v); } } } From 9b13c22e5d9a6d916f53be71400456712397ebaf Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Tue, 23 Jan 2024 15:36:08 +0100 Subject: [PATCH 06/11] [graph provision] retrieve all the context information by adding all=true to the requests issued to thr API --- .../eu/dnetlib/dhp/oa/provision/utils/ContextMapper.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/ContextMapper.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/ContextMapper.java index 96d92fed6e..083dbe988f 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/ContextMapper.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/ContextMapper.java @@ -31,16 +31,19 @@ public class ContextMapper extends HashMap implements Serial final ContextMapper contextMapper = new ContextMapper(); - for (ContextSummary ctx : DNetRestClient.doGET(baseURL + "/contexts", ContextSummaryList.class)) { + for (ContextSummary ctx : DNetRestClient + .doGET(String.format("%s/contexts", baseURL), ContextSummaryList.class)) { contextMapper.put(ctx.getId(), new ContextDef(ctx.getId(), ctx.getLabel(), "context", ctx.getType())); for (CategorySummary cat : DNetRestClient - .doGET(baseURL + "/context/" + ctx.getId(), CategorySummaryList.class)) { + .doGET(String.format("%s/context/%s?all=true", baseURL, ctx.getId()), CategorySummaryList.class)) { contextMapper.put(cat.getId(), new ContextDef(cat.getId(), cat.getLabel(), "category", "")); if (cat.isHasConcept()) { for (ConceptSummary c : DNetRestClient - .doGET(baseURL + "/context/category/" + cat.getId(), ConceptSummaryList.class)) { + .doGET( + String.format("%s/context/category/%s?all=true", baseURL, cat.getId()), + ConceptSummaryList.class)) { contextMapper.put(c.getId(), new ContextDef(c.getId(), c.getLabel(), "concept", "")); if (c.isHasSubConcept()) { for (ConceptSummary cs : c.getConcepts()) { From 2c1e6849f0a43c28c58907829eb6a3f060f48f2c Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 24 Jan 2024 10:36:41 +0100 Subject: [PATCH 07/11] added code of conduct and contributing files --- CODE_OF_CONDUCT.md | 43 +++++++++++++++++++++++++++++++++++++++++++ CONTRIBUTING.md | 9 +++++++++ LICENSE => LICENSE.md | 0 README.md | 5 +++++ 4 files changed, 57 insertions(+) create mode 100644 CODE_OF_CONDUCT.md create mode 100644 CONTRIBUTING.md rename LICENSE => LICENSE.md (100%) diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md new file mode 100644 index 0000000000..aff151f945 --- /dev/null +++ b/CODE_OF_CONDUCT.md @@ -0,0 +1,43 @@ +# Contributor Code of Conduct + +Openness, transparency and our community-driven participatory approach guide us in our day-to-day interactions and decision-making. Our open source projects are no exception. Trust, respect, collaboration and transparency are core values we believe should live and breathe within our projects. Our community welcomes participants from around the world with different experiences, unique perspectives, and great ideas to share. + +## Our Pledge + +In the interest of fostering an open and welcoming environment, we as contributors and maintainers pledge to making participation in our project and our community a harassment-free experience for everyone, regardless of age, body size, disability, ethnicity, sex characteristics, gender identity and expression, level of experience, education, socio-economic status, nationality, personal appearance, race, religion, or sexual identity and orientation. + +## Our Standards + +Examples of behavior that contributes to creating a positive environment include: + +- Using welcoming and inclusive language +- Being respectful of differing viewpoints and experiences +- Gracefully accepting constructive criticism +- Attempting collaboration before conflict +- Focusing on what is best for the community +- Showing empathy towards other community members + +Examples of unacceptable behavior by participants include: + +- Violence, threats of violence, or inciting others to commit self-harm +- The use of sexualized language or imagery and unwelcome sexual attention or advances +- Trolling, intentionally spreading misinformation, insulting/derogatory comments, and personal or political attacks +- Public or private harassment +- Publishing others' private information, such as a physical or electronic address, without explicit permission +- Abuse of the reporting process to intentionally harass or exclude others +- Advocating for, or encouraging, any of the above behavior +- Other conduct which could reasonably be considered inappropriate in a professional setting + +## Our Responsibilities + +Project maintainers are responsible for clarifying the standards of acceptable behavior and are expected to take appropriate and fair corrective action in response to any instances of unacceptable behavior. + +Project maintainers have the right and responsibility to remove, edit, or reject comments, commits, code, wiki edits, issues, and other contributions that are not aligned to this Code of Conduct, or to ban temporarily or permanently any contributor for other behaviors that they deem inappropriate, threatening, offensive, or harmful. + +## Scope + +This Code of Conduct applies both within project spaces and in public spaces when an individual is representing the project or its community. Examples of representing a project or community include using an official project e-mail address, posting via an official social media account, or acting as an appointed representative at an online or offline event. Representation of a project may be further defined and clarified by project maintainers. + +## Attribution + +This Code of Conduct is adapted from the [Contributor Covenant](https://www.contributor-covenant.org/), [version 1.4](https://www.contributor-covenant.org/version/1/4/code-of-conduct.html). \ No newline at end of file diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000000..6d83ebbcca --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,9 @@ +# Contributing to D-Net Hadoop + +:+1::tada: First off, thanks for taking the time to contribute! :tada::+1: + +This project and everyone participating in it is governed by our [Code of Conduct](CODE_OF_CONDUCT.md). By participating, you are expected to uphold this code. Please report unacceptable behavior to [dnet-team@isti.cnr.it](mailto:dnet-team@isti.cnr.it). + +The following is a set of guidelines for contributing to this project and its packages. These are mostly guidelines, not rules. Use your best judgment, and feel free to propose changes to this document in a pull request. + +All contributions are welcome, all contributions will be considered to be contributed under the [project license](#LICENSE.md). diff --git a/LICENSE b/LICENSE.md similarity index 100% rename from LICENSE rename to LICENSE.md diff --git a/README.md b/README.md index 2c1440f44e..b6575814d5 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,11 @@ Dnet-hadoop is the project that defined all the [OOZIE workflows](https://oozie.apache.org/) for the OpenAIRE Graph construction, processing, provisioning. +This project adheres to the Contributor Covenant [code of conduct](CODE_OF_CONDUCT.md). +By participating, you are expected to uphold this code. Please report unacceptable behavior to [dnet-team@isti.cnr.it](mailto:dnet-team@isti.cnr.it). + +This project is licensed under the [AGPL v3 or later version](#LICENSE.md). + How to build, package and run oozie workflows ==================== From 0c97a3a81a55cbdc24342d88d3862a89da1a6c5c Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 24 Jan 2024 10:56:33 +0100 Subject: [PATCH 08/11] minor --- CONTRIBUTING.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 6d83ebbcca..34a26f9133 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -4,6 +4,7 @@ This project and everyone participating in it is governed by our [Code of Conduct](CODE_OF_CONDUCT.md). By participating, you are expected to uphold this code. Please report unacceptable behavior to [dnet-team@isti.cnr.it](mailto:dnet-team@isti.cnr.it). -The following is a set of guidelines for contributing to this project and its packages. These are mostly guidelines, not rules. Use your best judgment, and feel free to propose changes to this document in a pull request. +The following is a set of guidelines for contributing to this project and its packages. These are mostly guidelines, not rules, which applies to this project as a while, including all its sub-modules. +Use your best judgment, and feel free to propose changes to this document in a pull request. All contributions are welcome, all contributions will be considered to be contributed under the [project license](#LICENSE.md). From 2838a9b63086493c5d845728336438d01595f56b Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 24 Jan 2024 16:07:05 +0100 Subject: [PATCH 09/11] Update 'CONTRIBUTING.md' --- CONTRIBUTING.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 34a26f9133..13a359c865 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -7,4 +7,4 @@ This project and everyone participating in it is governed by our [Code of Conduc The following is a set of guidelines for contributing to this project and its packages. These are mostly guidelines, not rules, which applies to this project as a while, including all its sub-modules. Use your best judgment, and feel free to propose changes to this document in a pull request. -All contributions are welcome, all contributions will be considered to be contributed under the [project license](#LICENSE.md). +All contributions are welcome, all contributions will be considered to be contributed under the [project license](LICENSE.md). From a7115cfa9e595c7db1b41cf9a34b0ae72a08d620 Mon Sep 17 00:00:00 2001 From: Antonis Lempesis Date: Thu, 25 Jan 2024 15:06:34 +0100 Subject: [PATCH 10/11] max mem of joins (hive.mapjoin.followby.gby.localtask.max.memory.usage) now 80%, up from 55%. --- .../eu/dnetlib/dhp/oa/graph/stats/oozie_app/workflow.xml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/workflow.xml b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/workflow.xml index cbf97944dc..f15f223209 100644 --- a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/workflow.xml @@ -78,6 +78,10 @@ hive.txn.timeout ${hive_timeout} + + hive.mapjoin.followby.gby.localtask.max.memory.usage + 0.80 + mapred.job.queue.name analytics From 9e8fc6aa88d592fc3cb354bc36894b95679b5092 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 26 Jan 2024 09:17:20 +0100 Subject: [PATCH 11/11] [collection] increased logging from the oai-pmh metadata collection process --- .../dhp/common/collection/HttpConnector2.java | 63 ++++++++++++++----- 1 file changed, 47 insertions(+), 16 deletions(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/collection/HttpConnector2.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/collection/HttpConnector2.java index 08cc3ec595..342d73cdc2 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/collection/HttpConnector2.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/collection/HttpConnector2.java @@ -97,8 +97,6 @@ public class HttpConnector2 { throw new CollectorException(msg); } - log.info("Request attempt {} [{}]", retryNumber, requestUrl); - InputStream input = null; long start = System.currentTimeMillis(); @@ -106,6 +104,9 @@ public class HttpConnector2 { if (getClientParams().getRequestDelay() > 0) { backoffAndSleep(getClientParams().getRequestDelay()); } + + log.info("Request attempt {} [{}]", retryNumber, requestUrl); + final HttpURLConnection urlConn = (HttpURLConnection) new URL(requestUrl).openConnection(); urlConn.setInstanceFollowRedirects(false); urlConn.setReadTimeout(getClientParams().getReadTimeOut() * 1000); @@ -135,9 +136,7 @@ public class HttpConnector2 { } if (is2xx(urlConn.getResponseCode())) { - input = urlConn.getInputStream(); - responseType = urlConn.getContentType(); - return input; + return getInputStream(urlConn, start); } if (is3xx(urlConn.getResponseCode())) { // REDIRECTS @@ -147,6 +146,7 @@ public class HttpConnector2 { .put( REPORT_PREFIX + urlConn.getResponseCode(), String.format("Moved to: %s", newUrl)); + logRequestTime(start); urlConn.disconnect(); if (retryAfter > 0) { backoffAndSleep(retryAfter); @@ -162,19 +162,39 @@ public class HttpConnector2 { if (retryAfter > 0) { log .warn( - "{} - waiting and repeating request after suggested retry-after {} sec.", - requestUrl, retryAfter); + "waiting and repeating request after suggested retry-after {} sec for URL {}", + retryAfter, requestUrl); backoffAndSleep(retryAfter * 1000); } else { log .warn( - "{} - waiting and repeating request after default delay of {} sec.", - requestUrl, getClientParams().getRetryDelay()); + "waiting and repeating request after default delay of {} sec for URL {}", + getClientParams().getRetryDelay(), requestUrl); backoffAndSleep(retryNumber * getClientParams().getRetryDelay()); } report.put(REPORT_PREFIX + urlConn.getResponseCode(), requestUrl); + + logRequestTime(start); + urlConn.disconnect(); + return attemptDownload(requestUrl, retryNumber + 1, report); + case 422: // UNPROCESSABLE ENTITY + report.put(REPORT_PREFIX + urlConn.getResponseCode(), requestUrl); + log.warn("waiting and repeating request after 10 sec for URL {}", requestUrl); + backoffAndSleep(10000); + urlConn.disconnect(); + logRequestTime(start); + try { + return getInputStream(urlConn, start); + } catch (IOException e) { + log + .error( + "server returned 422 and got IOException accessing the response body from URL {}", + requestUrl); + log.error("IOException:", e); + return attemptDownload(requestUrl, retryNumber + 1, report); + } default: log.error("gor error {} from URL: {}", urlConn.getResponseCode(), urlConn.getURL()); log.error("response message: {}", urlConn.getResponseMessage()); @@ -184,6 +204,8 @@ public class HttpConnector2 { String .format( "%s Error: %s", requestUrl, urlConn.getResponseMessage())); + logRequestTime(start); + urlConn.disconnect(); throw new CollectorException(urlConn.getResponseCode() + " error " + report); } } @@ -201,16 +223,25 @@ public class HttpConnector2 { report.put(e.getClass().getName(), e.getMessage()); backoffAndSleep(getClientParams().getRetryDelay() * retryNumber * 1000); return attemptDownload(requestUrl, retryNumber + 1, report); - } finally { - log - .info( - "request time elapsed: {}sec", - TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - start)); } } + private InputStream getInputStream(HttpURLConnection urlConn, long start) throws IOException { + InputStream input = urlConn.getInputStream(); + responseType = urlConn.getContentType(); + logRequestTime(start); + return input; + } + + private static void logRequestTime(long start) { + log + .info( + "request time elapsed: {}sec", + TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - start)); + } + private void logHeaderFields(final HttpURLConnection urlConn) throws IOException { - log.info("StatusCode: {}", urlConn.getResponseMessage()); + log.info("Response: {} - {}", urlConn.getResponseCode(), urlConn.getResponseMessage()); for (Map.Entry> e : urlConn.getHeaderFields().entrySet()) { if (e.getKey() != null) { @@ -235,7 +266,7 @@ public class HttpConnector2 { for (String key : headerMap.keySet()) { if ((key != null) && key.equalsIgnoreCase(HttpHeaders.RETRY_AFTER) && (!headerMap.get(key).isEmpty()) && NumberUtils.isCreatable(headerMap.get(key).get(0))) { - return Integer.parseInt(headerMap.get(key).get(0)) + 10; + return Integer.parseInt(headerMap.get(key).get(0)); } } return -1;