From 05ee7d8b0950bbb93214f7d84d7da9f089526fe3 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 12 Oct 2023 09:13:42 +0200 Subject: [PATCH 1/6] [graph cleaning] avoid NPEs --- .../oaf/utils/GraphCleaningFunctions.java | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java index 3c3e8052e..324e3dd58 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java @@ -509,12 +509,19 @@ public class GraphCleaningFunctions extends CleaningFunctions { // from the script from Dimitris if ("0000".equals(i.getRefereed().getClassid())) { - final boolean isFromCrossref = ModelConstants.CROSSREF_ID - .equals(i.getCollectedfrom().getKey()); - final boolean hasDoi = i - .getPid() - .stream() - .anyMatch(pid -> PidType.doi.toString().equals(pid.getQualifier().getClassid())); + final boolean isFromCrossref = Optional + .ofNullable(i.getCollectedfrom()) + .map(KeyValue::getKey) + .map(id -> id.equals(ModelConstants.CROSSREF_ID)) + .orElse(false); + final boolean hasDoi = Optional + .ofNullable(i.getPid()) + .map( + pid -> pid + .stream() + .anyMatch( + p -> PidType.doi.toString().equals(p.getQualifier().getClassid()))) + .orElse(false); final boolean isPeerReviewedType = PEER_REVIEWED_TYPES .contains(i.getInstancetype().getClassname()); final boolean noOtherLitType = r From dda602fff7c65341d0db9dcb9b0b5db2cf5be7ee Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 12 Oct 2023 10:05:46 +0200 Subject: [PATCH 2/6] [AMF] docs --- dhp-workflows/dhp-actionmanager/README.md | 72 +++++++++++++++++++++++ 1 file changed, 72 insertions(+) create mode 100644 dhp-workflows/dhp-actionmanager/README.md diff --git a/dhp-workflows/dhp-actionmanager/README.md b/dhp-workflows/dhp-actionmanager/README.md new file mode 100644 index 000000000..9899c4a98 --- /dev/null +++ b/dhp-workflows/dhp-actionmanager/README.md @@ -0,0 +1,72 @@ +# Action Management Framework + +This module implements the oozie workflow for the integration of pre-built contents into the OpenAIRE Graph. + +Such contents can be + +* brand new, non-existing records to be introduced as nodes of the graph +* updates (or enrichment) for records that does exist in the graph (e.g. a new subject term for a publication) +* relations among existing nodes + +The actionset contents are organised into logical containers, each of them can contain multiple versions contents and is characterised by + +* a name +* an identifier +* the paths on HDFS where each version of the contents is stored + +Each version is then characterised by + +* the creation date +* the last update date +* the indication where it is the latest one or it is an expired version, candidate for garbage collection + +## ActionSet serialization + +Each actionset version contains records compliant to the graph internal data model, i.e. subclasses of `eu.dnetlib.dhp.schema.oaf.Oaf`, +defined in the external schemas module + +``` + + eu.dnetlib.dhp + ${dhp-schemas.artifact} + ${dhp-schemas.version} + +``` + +When the actionset contains a relationship, the model class to use is `eu.dnetlib.dhp.schema.oaf.Relation`, otherwise +when the actionset contains an entity, it is a `eu.dnetlib.dhp.schema.oaf.OafEntity` or one of its subclasses +`Datasource`, `Organization`, `Project`, `Result` (or one of its subclasses `Publication`, `Dataset`, etc...). + +Then, each OpenAIRE Graph model class instance must be wrapped using the class `eu.dnetlib.dhp.schema.action.AtomicAction`, a generic +container that defines two attributes + +* `T payload` the OpenAIRE Graph class instance containing the data; +* `Class clazz` must contain the class whose instance is contained in the payload. + +Each AtomicAction can be then serialised in JSON format using `com.fasterxml.jackson.databind.ObjectMapper` from + +``` + + com.fasterxml.jackson.core + jackson-databind + ${dhp.jackson.version} + +``` + +Then, the JSON serialization must be stored as a GZip compressed sequence file (`org.apache.hadoop.mapred.SequenceFileOutputFormat`). +As such, it contains a set of tuples, a key and a value defined as `org.apache.hadoop.io.Text` where + +* the `key` must be set to the class canonical name contained in the `AtomicAction`; +* the `value` must be set to the AtomicAction JSON serialization. + +The following snippet provides an example of how create an actionset version of Relation records: + +``` + rels // JavaRDD + .map(relation -> new AtomicAction(Relation.class, relation)) + .mapToPair( + aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), + new Text(OBJECT_MAPPER.writeValueAsString(aa)))) + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); +``` + From 76447958bb538c75872d6b5f0fef184e97b42d55 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 12 Oct 2023 12:23:20 +0200 Subject: [PATCH 3/6] cleanup & docs --- README.md | 128 +++++++++++++++++- dhp-workflows/dhp-distcp/pom.xml | 13 -- .../dhp/distcp/oozie_app/config-default.xml | 18 --- .../dnetlib/dhp/distcp/oozie_app/workflow.xml | 46 ------- dhp-workflows/docs/oozie-installer.markdown | 111 --------------- dhp-workflows/pom.xml | 1 - 6 files changed, 127 insertions(+), 190 deletions(-) delete mode 100644 dhp-workflows/dhp-distcp/pom.xml delete mode 100644 dhp-workflows/dhp-distcp/src/main/resources/eu/dnetlib/dhp/distcp/oozie_app/config-default.xml delete mode 100644 dhp-workflows/dhp-distcp/src/main/resources/eu/dnetlib/dhp/distcp/oozie_app/workflow.xml delete mode 100644 dhp-workflows/docs/oozie-installer.markdown diff --git a/README.md b/README.md index 0a0bd82ab..2c1440f44 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,128 @@ # dnet-hadoop -Dnet-hadoop is the project that defined all the OOZIE workflows for the OpenAIRE Graph construction, processing, provisioning. \ No newline at end of file + +Dnet-hadoop is the project that defined all the [OOZIE workflows](https://oozie.apache.org/) for the OpenAIRE Graph construction, processing, provisioning. + +How to build, package and run oozie workflows +==================== + +Oozie-installer is a utility allowing building, uploading and running oozie workflows. In practice, it creates a `*.tar.gz` +package that contains resources that define a workflow and some helper scripts. + +This module is automatically executed when running: + +`mvn package -Poozie-package -Dworkflow.source.dir=classpath/to/parent/directory/of/oozie_app` + +on module having set: + +``` + + eu.dnetlib.dhp + dhp-workflows + +``` + +in `pom.xml` file. `oozie-package` profile initializes oozie workflow packaging, `workflow.source.dir` property points to +a workflow (notice: this is not a relative path but a classpath to directory usually holding `oozie_app` subdirectory). + +The outcome of this packaging is `oozie-package.tar.gz` file containing inside all the resources required to run Oozie workflow: + +- jar packages +- workflow definitions +- job properties +- maintenance scripts + +Required properties +==================== + +In order to include proper workflow within package, `workflow.source.dir` property has to be set. It could be provided +by setting `-Dworkflow.source.dir=some/job/dir` maven parameter. + +In oder to define full set of cluster environment properties one should create `~/.dhp/application.properties` file with +the following properties: + +- `dhp.hadoop.frontend.user.name` - your user name on hadoop cluster and frontend machine +- `dhp.hadoop.frontend.host.name` - frontend host name +- `dhp.hadoop.frontend.temp.dir` - frontend directory for temporary files +- `dhp.hadoop.frontend.port.ssh` - frontend machine ssh port +- `oozieServiceLoc` - oozie service location required by run_workflow.sh script executing oozie job +- `nameNode` - name node address +- `jobTracker` - job tracker address +- `oozie.execution.log.file.location` - location of file that will be created when executing oozie job, it contains output +produced by `run_workflow.sh` script (needed to obtain oozie job id) +- `maven.executable` - mvn command location, requires parameterization due to a different setup of CI cluster +- `sparkDriverMemory` - amount of memory assigned to spark jobs driver +- `sparkExecutorMemory` - amount of memory assigned to spark jobs executors +- `sparkExecutorCores` - number of cores assigned to spark jobs executors + +All values will be overriden with the ones from `job.properties` and eventually `job-override.properties` stored in module's +main folder. + +When overriding properties from `job.properties`, `job-override.properties` file can be created in main module directory +(the one containing `pom.xml` file) and define all new properties which will override existing properties. +One can provide those properties one by one as command line `-D` arguments. + +Properties overriding order is the following: + +1. `pom.xml` defined properties (located in the project root dir) +2. `~/.dhp/application.properties` defined properties +3. `${workflow.source.dir}/job.properties` +4. `job-override.properties` (located in the project root dir) +5. `maven -Dparam=value` + +where the maven `-Dparam` property is overriding all the other ones. + +Workflow definition requirements +==================== + +`workflow.source.dir` property should point to the following directory structure: + + [${workflow.source.dir}] + | + |-job.properties (optional) + | + \-[oozie_app] + | + \-workflow.xml + +This property can be set using maven `-D` switch. + +`[oozie_app]` is the default directory name however it can be set to any value as soon as `oozieAppDir` property is +provided with directory name as value. + +Sub-workflows are supported as well and sub-workflow directories should be nested within `[oozie_app]` directory. + +Creating oozie installer step-by-step +===================================== + +Automated oozie-installer steps are the following: + +1. creating jar packages: `*.jar` and `*tests.jar` along with copying all dependencies in `target/dependencies` +2. reading properties from maven, `~/.dhp/application.properties`, `job.properties`, `job-override.properties` +3. invoking priming mechanism linking resources from import.txt file (currently resolving subworkflow resources) +4. assembling shell scripts for preparing Hadoop filesystem, uploading Oozie application and starting workflow +5. copying whole `${workflow.source.dir}` content to `target/${oozie.package.file.name}` +6. generating updated `job.properties` file in `target/${oozie.package.file.name}` based on maven, +`~/.dhp/application.properties`, `job.properties` and `job-override.properties` +7. creating `lib` directory (or multiple directories for sub-workflows for each nested directory) and copying jar packages +created at step (1) to each one of them +8. bundling whole `${oozie.package.file.name}` directory into single tar.gz package + +Uploading oozie package and running workflow on cluster +======================================================= + +In order to simplify deployment and execution process two dedicated profiles were introduced: + +- `deploy` +- `run` + +to be used along with `oozie-package` profile e.g. by providing `-Poozie-package,deploy,run` maven parameters. + +The `deploy` profile supplements packaging process with: +1) uploading oozie-package via scp to `/home/${user.name}/oozie-packages` directory on `${dhp.hadoop.frontend.host.name}` machine +2) extracting uploaded package +3) uploading oozie content to hadoop cluster HDFS location defined in `oozie.wf.application.path` property (generated dynamically by maven build process, based on `${dhp.hadoop.frontend.user.name}` and `workflow.source.dir` properties) + +The `run` profile introduces: +1) executing oozie application uploaded to HDFS cluster using `deploy` command. Triggers `run_workflow.sh` script providing runtime properties defined in `job.properties` file. + +Notice: ssh access to frontend machine has to be configured on system level and it is preferable to set key-based authentication in order to simplify remote operations. \ No newline at end of file diff --git a/dhp-workflows/dhp-distcp/pom.xml b/dhp-workflows/dhp-distcp/pom.xml deleted file mode 100644 index c3d3a7375..000000000 --- a/dhp-workflows/dhp-distcp/pom.xml +++ /dev/null @@ -1,13 +0,0 @@ - - - - dhp-workflows - eu.dnetlib.dhp - 1.2.5-SNAPSHOT - - 4.0.0 - - dhp-distcp - - - \ No newline at end of file diff --git a/dhp-workflows/dhp-distcp/src/main/resources/eu/dnetlib/dhp/distcp/oozie_app/config-default.xml b/dhp-workflows/dhp-distcp/src/main/resources/eu/dnetlib/dhp/distcp/oozie_app/config-default.xml deleted file mode 100644 index 905fb9984..000000000 --- a/dhp-workflows/dhp-distcp/src/main/resources/eu/dnetlib/dhp/distcp/oozie_app/config-default.xml +++ /dev/null @@ -1,18 +0,0 @@ - - - jobTracker - yarnRM - - - nameNode - hdfs://nameservice1 - - - sourceNN - webhdfs://namenode2.hadoop.dm.openaire.eu:50071 - - - oozie.use.system.libpath - true - - \ No newline at end of file diff --git a/dhp-workflows/dhp-distcp/src/main/resources/eu/dnetlib/dhp/distcp/oozie_app/workflow.xml b/dhp-workflows/dhp-distcp/src/main/resources/eu/dnetlib/dhp/distcp/oozie_app/workflow.xml deleted file mode 100644 index 91b97332b..000000000 --- a/dhp-workflows/dhp-distcp/src/main/resources/eu/dnetlib/dhp/distcp/oozie_app/workflow.xml +++ /dev/null @@ -1,46 +0,0 @@ - - - - sourceNN - the source name node - - - sourcePath - the source path - - - targetPath - the target path - - - hbase_dump_distcp_memory_mb - 6144 - memory for distcp action copying InfoSpace dump from remote cluster - - - hbase_dump_distcp_num_maps - 1 - maximum number of simultaneous copies of InfoSpace dump from remote location - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - -Dmapreduce.map.memory.mb=${hbase_dump_distcp_memory_mb} - -pb - -m ${hbase_dump_distcp_num_maps} - ${sourceNN}/${sourcePath} - ${nameNode}/${targetPath} - - - - - - - \ No newline at end of file diff --git a/dhp-workflows/docs/oozie-installer.markdown b/dhp-workflows/docs/oozie-installer.markdown deleted file mode 100644 index d2de80dcc..000000000 --- a/dhp-workflows/docs/oozie-installer.markdown +++ /dev/null @@ -1,111 +0,0 @@ -General notes -==================== - -Oozie-installer is a utility allowing building, uploading and running oozie workflows. In practice, it creates a `*.tar.gz` package that contains resouces that define a workflow and some helper scripts. - -This module is automatically executed when running: - -`mvn package -Poozie-package -Dworkflow.source.dir=classpath/to/parent/directory/of/oozie_app` - -on module having set: - - - eu.dnetlib.dhp - dhp-workflows - - -in `pom.xml` file. `oozie-package` profile initializes oozie workflow packaging, `workflow.source.dir` property points to a workflow (notice: this is not a relative path but a classpath to directory usually holding `oozie_app` subdirectory). - -The outcome of this packaging is `oozie-package.tar.gz` file containing inside all the resources required to run Oozie workflow: - -- jar packages -- workflow definitions -- job properties -- maintenance scripts - -Required properties -==================== - -In order to include proper workflow within package, `workflow.source.dir` property has to be set. It could be provided by setting `-Dworkflow.source.dir=some/job/dir` maven parameter. - -In oder to define full set of cluster environment properties one should create `~/.dhp/application.properties` file with the following properties: - -- `dhp.hadoop.frontend.user.name` - your user name on hadoop cluster and frontend machine -- `dhp.hadoop.frontend.host.name` - frontend host name -- `dhp.hadoop.frontend.temp.dir` - frontend directory for temporary files -- `dhp.hadoop.frontend.port.ssh` - frontend machine ssh port -- `oozieServiceLoc` - oozie service location required by run_workflow.sh script executing oozie job -- `nameNode` - name node address -- `jobTracker` - job tracker address -- `oozie.execution.log.file.location` - location of file that will be created when executing oozie job, it contains output produced by `run_workflow.sh` script (needed to obtain oozie job id) -- `maven.executable` - mvn command location, requires parameterization due to a different setup of CI cluster -- `sparkDriverMemory` - amount of memory assigned to spark jobs driver -- `sparkExecutorMemory` - amount of memory assigned to spark jobs executors -- `sparkExecutorCores` - number of cores assigned to spark jobs executors - -All values will be overriden with the ones from `job.properties` and eventually `job-override.properties` stored in module's main folder. - -When overriding properties from `job.properties`, `job-override.properties` file can be created in main module directory (the one containing `pom.xml` file) and define all new properties which will override existing properties. One can provide those properties one by one as command line -D arguments. - -Properties overriding order is the following: - -1. `pom.xml` defined properties (located in the project root dir) -2. `~/.dhp/application.properties` defined properties -3. `${workflow.source.dir}/job.properties` -4. `job-override.properties` (located in the project root dir) -5. `maven -Dparam=value` - -where the maven `-Dparam` property is overriding all the other ones. - -Workflow definition requirements -==================== - -`workflow.source.dir` property should point to the following directory structure: - - [${workflow.source.dir}] - | - |-job.properties (optional) - | - \-[oozie_app] - | - \-workflow.xml - -This property can be set using maven `-D` switch. - -`[oozie_app]` is the default directory name however it can be set to any value as soon as `oozieAppDir` property is provided with directory name as value. - -Subworkflows are supported as well and subworkflow directories should be nested within `[oozie_app]` directory. - -Creating oozie installer step-by-step -===================================== - -Automated oozie-installer steps are the following: - -1. creating jar packages: `*.jar` and `*tests.jar` along with copying all dependancies in `target/dependencies` -2. reading properties from maven, `~/.dhp/application.properties`, `job.properties`, `job-override.properties` -3. invoking priming mechanism linking resources from import.txt file (currently resolving subworkflow resources) -4. assembling shell scripts for preparing Hadoop filesystem, uploading Oozie application and starting workflow -5. copying whole `${workflow.source.dir}` content to `target/${oozie.package.file.name}` -6. generating updated `job.properties` file in `target/${oozie.package.file.name}` based on maven, `~/.dhp/application.properties`, `job.properties` and `job-override.properties` -7. creating `lib` directory (or multiple directories for subworkflows for each nested directory) and copying jar packages created at step (1) to each one of them -8. bundling whole `${oozie.package.file.name}` directory into single tar.gz package - -Uploading oozie package and running workflow on cluster -======================================================= - -In order to simplify deployment and execution process two dedicated profiles were introduced: - -- `deploy` -- `run` - -to be used along with `oozie-package` profile e.g. by providing `-Poozie-package,deploy,run` maven parameters. - -`deploy` profile supplements packaging process with: -1) uploading oozie-package via scp to `/home/${user.name}/oozie-packages` directory on `${dhp.hadoop.frontend.host.name}` machine -2) extracting uploaded package -3) uploading oozie content to hadoop cluster HDFS location defined in `oozie.wf.application.path` property (generated dynamically by maven build process, based on `${dhp.hadoop.frontend.user.name}` and `workflow.source.dir` properties) - -`run` profile introduces: -1) executing oozie application uploaded to HDFS cluster using `deploy` command. Triggers `run_workflow.sh` script providing runtime properties defined in `job.properties` file. - -Notice: ssh access to frontend machine has to be configured on system level and it is preferable to set key-based authentication in order to simplify remote operations. \ No newline at end of file diff --git a/dhp-workflows/pom.xml b/dhp-workflows/pom.xml index 64f5f2d26..369c71b5b 100644 --- a/dhp-workflows/pom.xml +++ b/dhp-workflows/pom.xml @@ -25,7 +25,6 @@ dhp-workflow-profiles dhp-aggregation - dhp-distcp dhp-actionmanager dhp-graph-mapper dhp-dedup-openaire From 6cf64d5d8b3b9826bce76d94f0548ee96eff2736 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 13 Oct 2023 10:09:26 +0200 Subject: [PATCH 4/6] [SWH] renamed 'Software Heritage Identifier' to 'Software Hash Identifier' --- .../src/main/java/eu/dnetlib/dhp/swh/utils/SWHConstants.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/utils/SWHConstants.java b/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/utils/SWHConstants.java index eae839cfd..2a0403044 100644 --- a/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/utils/SWHConstants.java +++ b/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/utils/SWHConstants.java @@ -12,7 +12,7 @@ public class SWHConstants { public static final String SWHID = "swhid"; - public static final String SWHID_CLASSNAME = "Software Heritage Identifier"; + public static final String SWHID_CLASSNAME = "Software Hash Identifier"; public static final String SWH_ID = "10|openaire____::dbfd07503aaa1ed31beed7dec942f3f4"; From 03670bb9ce8609a17277e2d6ab6e53190cc8fe7e Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 16 Oct 2023 10:55:47 +0200 Subject: [PATCH 5/6] [dedup] use common saveParquet and save methods to ensure outputs are compressed --- .../dhp/oa/dedup/SparkCopyOpenorgsMergeRels.java | 10 +++++----- .../dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java | 7 +------ .../eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java | 2 +- .../dhp/oa/dedup/SparkCreateOrgsDedupRecord.java | 6 +----- .../eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java | 2 -- .../eu/dnetlib/dhp/oa/dedup/SparkWhitelistSimRels.java | 2 -- 6 files changed, 8 insertions(+), 21 deletions(-) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsMergeRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsMergeRels.java index 9d0f61007..eca2193af 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsMergeRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsMergeRels.java @@ -7,6 +7,7 @@ import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; @@ -77,13 +78,12 @@ public class SparkCopyOpenorgsMergeRels extends AbstractSparkAction { log.info("Number of Openorgs Merge Relations collected: {}", mergeRelsRDD.count()); - spark + final Dataset relations = spark .createDataset( mergeRelsRDD.rdd(), - Encoders.bean(Relation.class)) - .write() - .mode(SaveMode.Append) - .parquet(outputPath); + Encoders.bean(Relation.class)); + + saveParquet(relations, outputPath, SaveMode.Append); } private boolean isMergeRel(Relation rel) { diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java index 62cbb5bff..e10f41c82 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java @@ -67,12 +67,7 @@ public class SparkCopyRelationsNoOpenorgs extends AbstractSparkAction { log.debug("Number of non-Openorgs relations collected: {}", simRels.count()); } - spark - .createDataset(simRels.rdd(), Encoders.bean(Relation.class)) - .write() - .mode(SaveMode.Overwrite) - .json(outputPath); - + save(spark.createDataset(simRels.rdd(), Encoders.bean(Relation.class)), outputPath, SaveMode.Overwrite); } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java index 2f551b244..babbaaabd 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java @@ -155,7 +155,7 @@ public class SparkCreateMergeRels extends AbstractSparkAction { (FlatMapFunction) cc -> ccToMergeRel(cc, dedupConf), Encoders.bean(Relation.class)); - mergeRels.write().mode(SaveMode.Overwrite).parquet(mergeRelPath); + saveParquet(mergeRels, mergeRelPath, SaveMode.Overwrite); } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateOrgsDedupRecord.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateOrgsDedupRecord.java index 8e5e9fd69..25e394f25 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateOrgsDedupRecord.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateOrgsDedupRecord.java @@ -72,11 +72,7 @@ public class SparkCreateOrgsDedupRecord extends AbstractSparkAction { final String mergeRelsPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, "organization"); - rootOrganization(spark, entityPath, mergeRelsPath) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath); + save(rootOrganization(spark, entityPath, mergeRelsPath), outputPath, SaveMode.Overwrite); } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java index 5b3cc3111..5f54c34df 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java @@ -82,8 +82,6 @@ public class SparkCreateSimRels extends AbstractSparkAction { final String outputPath = DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity); removeOutputDir(spark, outputPath); - JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - SparkDeduper deduper = new SparkDeduper(dedupConf); Dataset simRels = spark diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkWhitelistSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkWhitelistSimRels.java index 94a09ed05..65ad0c327 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkWhitelistSimRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkWhitelistSimRels.java @@ -67,8 +67,6 @@ public class SparkWhitelistSimRels extends AbstractSparkAction { log.info("workingPath: '{}'", workingPath); log.info("whiteListPath: '{}'", whiteListPath); - JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - // file format: source####target Dataset whiteListRels = spark .read() From 0e44b037a52558e20bbe418a5d313fc7fd8e966f Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Tue, 17 Oct 2023 07:54:01 +0200 Subject: [PATCH 6/6] FIX: GroupEntitiesSparkJob deletes whole graph outputPath instead of its temporary folder --- .../java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java index 99981bf6a..f5c8eea19 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java @@ -71,7 +71,7 @@ public class GroupEntitiesSparkJob { conf, isSparkSessionManaged, spark -> { - HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); + HdfsSupport.remove(checkpointPath, spark.sparkContext().hadoopConfiguration()); groupEntities(spark, graphInputPath, checkpointPath, outputPath, filterInvisible); }); }