From 46018dc8045eab2141d82e6e8ceae120b57c2079 Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Tue, 11 Jun 2024 10:39:48 +0200 Subject: [PATCH 1/3] Fix OperationUnsupportedException while merging two Result's contexts due to modification of an immutable collection --- .../dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java | 2 +- .../java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtils.java | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java index 6cb0b7218..b4a666b81 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java @@ -119,7 +119,7 @@ public class GraphCleaningFunctions extends CleaningFunctions { .getContext() .stream() .filter(c -> !StringUtils.startsWith(c.getId().toLowerCase(), contextId)) - .collect(Collectors.toList())); + .collect(Collectors.toCollection(ArrayList::new))); } return (T) res; } else { diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtils.java index f1221add3..ac51705d2 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtils.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtils.java @@ -432,7 +432,10 @@ public class MergeUtils { // merge datainfo for same context id merge.setContext(mergeLists(merge.getContext(), enrich.getContext(), trust, Context::getId, (r, l) -> { - r.getDataInfo().addAll(l.getDataInfo()); + ArrayList di = new ArrayList<>(); + di.addAll(r.getDataInfo()); + di.addAll(l.getDataInfo()); + r.setDataInfo(di); return r; })); From 71927ca8189dcd16e7b5bea0a72c44695ce6106e Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Tue, 11 Jun 2024 12:40:50 +0200 Subject: [PATCH 2/3] avoid NPEs --- .../oaf/utils/MergeEntitiesComparator.java | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeEntitiesComparator.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeEntitiesComparator.java index 5792fc10f..ff6c2689a 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeEntitiesComparator.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeEntitiesComparator.java @@ -1,13 +1,9 @@ package eu.dnetlib.dhp.schema.oaf.utils; -import static eu.dnetlib.dhp.schema.common.ModelConstants.CROSSREF_ID; - import java.util.*; -import java.util.stream.Collectors; import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.oaf.KeyValue; import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.schema.oaf.Result; @@ -42,17 +38,23 @@ public class MergeEntitiesComparator implements Comparator { int res = 0; // pid authority - int cfp1 = left - .getCollectedfrom() - .stream() - .map(kv -> PID_AUTHORITIES.indexOf(kv.getKey())) - .max(Integer::compare) + int cfp1 = Optional + .ofNullable(left.getCollectedfrom()) + .map( + cf -> cf + .stream() + .map(kv -> PID_AUTHORITIES.indexOf(kv.getKey())) + .max(Integer::compare) + .orElse(-1)) .orElse(-1); - int cfp2 = right - .getCollectedfrom() - .stream() - .map(kv -> PID_AUTHORITIES.indexOf(kv.getKey())) - .max(Integer::compare) + int cfp2 = Optional + .ofNullable(right.getCollectedfrom()) + .map( + cf -> cf + .stream() + .map(kv -> PID_AUTHORITIES.indexOf(kv.getKey())) + .max(Integer::compare) + .orElse(-1)) .orElse(-1); if (cfp1 >= 0 && cfp1 > cfp2) { From c371513d43e81b0e6e1003877b994db55a736256 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Tue, 11 Jun 2024 14:21:01 +0200 Subject: [PATCH 3/3] [graph resolution] use sparkExecutorMemory to define also the memoryOverhead --- .../eu/dnetlib/dhp/oa/graph/resolution/oozie_app/workflow.xml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/oozie_app/workflow.xml index 74e792f07..916a9f2b1 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/oozie_app/workflow.xml @@ -45,6 +45,7 @@ --executor-memory=${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} + --conf spark.executor.memoryOverhead=${sparkExecutorMemory} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.shuffle.partitions=15000 --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} @@ -79,6 +80,7 @@ --executor-memory=${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} + --conf spark.executor.memoryOverhead=${sparkExecutorMemory} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.shuffle.partitions=10000 --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}