mergin with branch beta

This commit is contained in:
Miriam Baglioni 2023-10-19 09:04:35 +02:00
commit f1b898c6b4
16 changed files with 222 additions and 219 deletions

128
README.md
View File

@ -1,2 +1,128 @@
# dnet-hadoop # dnet-hadoop
Dnet-hadoop is the project that defined all the OOZIE workflows for the OpenAIRE Graph construction, processing, provisioning.
Dnet-hadoop is the project that defined all the [OOZIE workflows](https://oozie.apache.org/) for the OpenAIRE Graph construction, processing, provisioning.
How to build, package and run oozie workflows
====================
Oozie-installer is a utility allowing building, uploading and running oozie workflows. In practice, it creates a `*.tar.gz`
package that contains resources that define a workflow and some helper scripts.
This module is automatically executed when running:
`mvn package -Poozie-package -Dworkflow.source.dir=classpath/to/parent/directory/of/oozie_app`
on module having set:
```
<parent>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-workflows</artifactId>
</parent>
```
in `pom.xml` file. `oozie-package` profile initializes oozie workflow packaging, `workflow.source.dir` property points to
a workflow (notice: this is not a relative path but a classpath to directory usually holding `oozie_app` subdirectory).
The outcome of this packaging is `oozie-package.tar.gz` file containing inside all the resources required to run Oozie workflow:
- jar packages
- workflow definitions
- job properties
- maintenance scripts
Required properties
====================
In order to include proper workflow within package, `workflow.source.dir` property has to be set. It could be provided
by setting `-Dworkflow.source.dir=some/job/dir` maven parameter.
In oder to define full set of cluster environment properties one should create `~/.dhp/application.properties` file with
the following properties:
- `dhp.hadoop.frontend.user.name` - your user name on hadoop cluster and frontend machine
- `dhp.hadoop.frontend.host.name` - frontend host name
- `dhp.hadoop.frontend.temp.dir` - frontend directory for temporary files
- `dhp.hadoop.frontend.port.ssh` - frontend machine ssh port
- `oozieServiceLoc` - oozie service location required by run_workflow.sh script executing oozie job
- `nameNode` - name node address
- `jobTracker` - job tracker address
- `oozie.execution.log.file.location` - location of file that will be created when executing oozie job, it contains output
produced by `run_workflow.sh` script (needed to obtain oozie job id)
- `maven.executable` - mvn command location, requires parameterization due to a different setup of CI cluster
- `sparkDriverMemory` - amount of memory assigned to spark jobs driver
- `sparkExecutorMemory` - amount of memory assigned to spark jobs executors
- `sparkExecutorCores` - number of cores assigned to spark jobs executors
All values will be overriden with the ones from `job.properties` and eventually `job-override.properties` stored in module's
main folder.
When overriding properties from `job.properties`, `job-override.properties` file can be created in main module directory
(the one containing `pom.xml` file) and define all new properties which will override existing properties.
One can provide those properties one by one as command line `-D` arguments.
Properties overriding order is the following:
1. `pom.xml` defined properties (located in the project root dir)
2. `~/.dhp/application.properties` defined properties
3. `${workflow.source.dir}/job.properties`
4. `job-override.properties` (located in the project root dir)
5. `maven -Dparam=value`
where the maven `-Dparam` property is overriding all the other ones.
Workflow definition requirements
====================
`workflow.source.dir` property should point to the following directory structure:
[${workflow.source.dir}]
|
|-job.properties (optional)
|
\-[oozie_app]
|
\-workflow.xml
This property can be set using maven `-D` switch.
`[oozie_app]` is the default directory name however it can be set to any value as soon as `oozieAppDir` property is
provided with directory name as value.
Sub-workflows are supported as well and sub-workflow directories should be nested within `[oozie_app]` directory.
Creating oozie installer step-by-step
=====================================
Automated oozie-installer steps are the following:
1. creating jar packages: `*.jar` and `*tests.jar` along with copying all dependencies in `target/dependencies`
2. reading properties from maven, `~/.dhp/application.properties`, `job.properties`, `job-override.properties`
3. invoking priming mechanism linking resources from import.txt file (currently resolving subworkflow resources)
4. assembling shell scripts for preparing Hadoop filesystem, uploading Oozie application and starting workflow
5. copying whole `${workflow.source.dir}` content to `target/${oozie.package.file.name}`
6. generating updated `job.properties` file in `target/${oozie.package.file.name}` based on maven,
`~/.dhp/application.properties`, `job.properties` and `job-override.properties`
7. creating `lib` directory (or multiple directories for sub-workflows for each nested directory) and copying jar packages
created at step (1) to each one of them
8. bundling whole `${oozie.package.file.name}` directory into single tar.gz package
Uploading oozie package and running workflow on cluster
=======================================================
In order to simplify deployment and execution process two dedicated profiles were introduced:
- `deploy`
- `run`
to be used along with `oozie-package` profile e.g. by providing `-Poozie-package,deploy,run` maven parameters.
The `deploy` profile supplements packaging process with:
1) uploading oozie-package via scp to `/home/${user.name}/oozie-packages` directory on `${dhp.hadoop.frontend.host.name}` machine
2) extracting uploaded package
3) uploading oozie content to hadoop cluster HDFS location defined in `oozie.wf.application.path` property (generated dynamically by maven build process, based on `${dhp.hadoop.frontend.user.name}` and `workflow.source.dir` properties)
The `run` profile introduces:
1) executing oozie application uploaded to HDFS cluster using `deploy` command. Triggers `run_workflow.sh` script providing runtime properties defined in `job.properties` file.
Notice: ssh access to frontend machine has to be configured on system level and it is preferable to set key-based authentication in order to simplify remote operations.

View File

@ -71,7 +71,7 @@ public class GroupEntitiesSparkJob {
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); HdfsSupport.remove(checkpointPath, spark.sparkContext().hadoopConfiguration());
groupEntities(spark, graphInputPath, checkpointPath, outputPath, filterInvisible); groupEntities(spark, graphInputPath, checkpointPath, outputPath, filterInvisible);
}); });
} }

View File

@ -509,12 +509,19 @@ public class GraphCleaningFunctions extends CleaningFunctions {
// from the script from Dimitris // from the script from Dimitris
if ("0000".equals(i.getRefereed().getClassid())) { if ("0000".equals(i.getRefereed().getClassid())) {
final boolean isFromCrossref = ModelConstants.CROSSREF_ID final boolean isFromCrossref = Optional
.equals(i.getCollectedfrom().getKey()); .ofNullable(i.getCollectedfrom())
final boolean hasDoi = i .map(KeyValue::getKey)
.getPid() .map(id -> id.equals(ModelConstants.CROSSREF_ID))
.orElse(false);
final boolean hasDoi = Optional
.ofNullable(i.getPid())
.map(
pid -> pid
.stream() .stream()
.anyMatch(pid -> PidType.doi.toString().equals(pid.getQualifier().getClassid())); .anyMatch(
p -> PidType.doi.toString().equals(p.getQualifier().getClassid())))
.orElse(false);
final boolean isPeerReviewedType = PEER_REVIEWED_TYPES final boolean isPeerReviewedType = PEER_REVIEWED_TYPES
.contains(i.getInstancetype().getClassname()); .contains(i.getInstancetype().getClassname());
final boolean noOtherLitType = r final boolean noOtherLitType = r

View File

@ -0,0 +1,72 @@
# Action Management Framework
This module implements the oozie workflow for the integration of pre-built contents into the OpenAIRE Graph.
Such contents can be
* brand new, non-existing records to be introduced as nodes of the graph
* updates (or enrichment) for records that does exist in the graph (e.g. a new subject term for a publication)
* relations among existing nodes
The actionset contents are organised into logical containers, each of them can contain multiple versions contents and is characterised by
* a name
* an identifier
* the paths on HDFS where each version of the contents is stored
Each version is then characterised by
* the creation date
* the last update date
* the indication where it is the latest one or it is an expired version, candidate for garbage collection
## ActionSet serialization
Each actionset version contains records compliant to the graph internal data model, i.e. subclasses of `eu.dnetlib.dhp.schema.oaf.Oaf`,
defined in the external schemas module
```
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>${dhp-schemas.artifact}</artifactId>
<version>${dhp-schemas.version}</version>
</dependency>
```
When the actionset contains a relationship, the model class to use is `eu.dnetlib.dhp.schema.oaf.Relation`, otherwise
when the actionset contains an entity, it is a `eu.dnetlib.dhp.schema.oaf.OafEntity` or one of its subclasses
`Datasource`, `Organization`, `Project`, `Result` (or one of its subclasses `Publication`, `Dataset`, etc...).
Then, each OpenAIRE Graph model class instance must be wrapped using the class `eu.dnetlib.dhp.schema.action.AtomicAction`, a generic
container that defines two attributes
* `T payload` the OpenAIRE Graph class instance containing the data;
* `Class<T> clazz` must contain the class whose instance is contained in the payload.
Each AtomicAction can be then serialised in JSON format using `com.fasterxml.jackson.databind.ObjectMapper` from
```
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${dhp.jackson.version}</version>
</dependency>
```
Then, the JSON serialization must be stored as a GZip compressed sequence file (`org.apache.hadoop.mapred.SequenceFileOutputFormat`).
As such, it contains a set of tuples, a key and a value defined as `org.apache.hadoop.io.Text` where
* the `key` must be set to the class canonical name contained in the `AtomicAction`;
* the `value` must be set to the AtomicAction JSON serialization.
The following snippet provides an example of how create an actionset version of Relation records:
```
rels // JavaRDD<Relation>
.map(relation -> new AtomicAction<Relation>(Relation.class, relation))
.mapToPair(
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
```

View File

@ -7,6 +7,7 @@ import java.util.Optional;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
@ -77,13 +78,12 @@ public class SparkCopyOpenorgsMergeRels extends AbstractSparkAction {
log.info("Number of Openorgs Merge Relations collected: {}", mergeRelsRDD.count()); log.info("Number of Openorgs Merge Relations collected: {}", mergeRelsRDD.count());
spark final Dataset<Relation> relations = spark
.createDataset( .createDataset(
mergeRelsRDD.rdd(), mergeRelsRDD.rdd(),
Encoders.bean(Relation.class)) Encoders.bean(Relation.class));
.write()
.mode(SaveMode.Append) saveParquet(relations, outputPath, SaveMode.Append);
.parquet(outputPath);
} }
private boolean isMergeRel(Relation rel) { private boolean isMergeRel(Relation rel) {

View File

@ -67,12 +67,7 @@ public class SparkCopyRelationsNoOpenorgs extends AbstractSparkAction {
log.debug("Number of non-Openorgs relations collected: {}", simRels.count()); log.debug("Number of non-Openorgs relations collected: {}", simRels.count());
} }
spark save(spark.createDataset(simRels.rdd(), Encoders.bean(Relation.class)), outputPath, SaveMode.Overwrite);
.createDataset(simRels.rdd(), Encoders.bean(Relation.class))
.write()
.mode(SaveMode.Overwrite)
.json(outputPath);
} }
} }

View File

@ -155,7 +155,7 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
(FlatMapFunction<ConnectedComponent, Relation>) cc -> ccToMergeRel(cc, dedupConf), (FlatMapFunction<ConnectedComponent, Relation>) cc -> ccToMergeRel(cc, dedupConf),
Encoders.bean(Relation.class)); Encoders.bean(Relation.class));
mergeRels.write().mode(SaveMode.Overwrite).parquet(mergeRelPath); saveParquet(mergeRels, mergeRelPath, SaveMode.Overwrite);
} }
} }

View File

@ -72,11 +72,7 @@ public class SparkCreateOrgsDedupRecord extends AbstractSparkAction {
final String mergeRelsPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, "organization"); final String mergeRelsPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, "organization");
rootOrganization(spark, entityPath, mergeRelsPath) save(rootOrganization(spark, entityPath, mergeRelsPath), outputPath, SaveMode.Overwrite);
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
} }

View File

@ -82,8 +82,6 @@ public class SparkCreateSimRels extends AbstractSparkAction {
final String outputPath = DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity); final String outputPath = DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity);
removeOutputDir(spark, outputPath); removeOutputDir(spark, outputPath);
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
SparkDeduper deduper = new SparkDeduper(dedupConf); SparkDeduper deduper = new SparkDeduper(dedupConf);
Dataset<?> simRels = spark Dataset<?> simRels = spark

View File

@ -67,8 +67,6 @@ public class SparkWhitelistSimRels extends AbstractSparkAction {
log.info("workingPath: '{}'", workingPath); log.info("workingPath: '{}'", workingPath);
log.info("whiteListPath: '{}'", whiteListPath); log.info("whiteListPath: '{}'", whiteListPath);
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
// file format: source####target // file format: source####target
Dataset<Row> whiteListRels = spark Dataset<Row> whiteListRels = spark
.read() .read()

View File

@ -1,13 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId>
<version>1.2.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dhp-distcp</artifactId>
</project>

View File

@ -1,18 +0,0 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>sourceNN</name>
<value>webhdfs://namenode2.hadoop.dm.openaire.eu:50071</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
</configuration>

View File

@ -1,46 +0,0 @@
<workflow-app name="distcp" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourceNN</name>
<description>the source name node</description>
</property>
<property>
<name>sourcePath</name>
<description>the source path</description>
</property>
<property>
<name>targetPath</name>
<description>the target path</description>
</property>
<property>
<name>hbase_dump_distcp_memory_mb</name>
<value>6144</value>
<description>memory for distcp action copying InfoSpace dump from remote cluster</description>
</property>
<property>
<name>hbase_dump_distcp_num_maps</name>
<value>1</value>
<description>maximum number of simultaneous copies of InfoSpace dump from remote location</description>
</property>
</parameters>
<start to="distcp"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="distcp">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>-Dmapreduce.map.memory.mb=${hbase_dump_distcp_memory_mb}</arg>
<arg>-pb</arg>
<arg>-m ${hbase_dump_distcp_num_maps}</arg>
<arg>${sourceNN}/${sourcePath}</arg>
<arg>${nameNode}/${targetPath}</arg>
</distcp>
<ok to="End" />
<error to="Kill" />
</action>
<end name="End"/>
</workflow-app>

View File

@ -12,7 +12,7 @@ public class SWHConstants {
public static final String SWHID = "swhid"; public static final String SWHID = "swhid";
public static final String SWHID_CLASSNAME = "Software Heritage Identifier"; public static final String SWHID_CLASSNAME = "Software Hash Identifier";
public static final String SWH_ID = "10|openaire____::dbfd07503aaa1ed31beed7dec942f3f4"; public static final String SWH_ID = "10|openaire____::dbfd07503aaa1ed31beed7dec942f3f4";

View File

@ -1,111 +0,0 @@
General notes
====================
Oozie-installer is a utility allowing building, uploading and running oozie workflows. In practice, it creates a `*.tar.gz` package that contains resouces that define a workflow and some helper scripts.
This module is automatically executed when running:
`mvn package -Poozie-package -Dworkflow.source.dir=classpath/to/parent/directory/of/oozie_app`
on module having set:
<parent>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-workflows</artifactId>
</parent>
in `pom.xml` file. `oozie-package` profile initializes oozie workflow packaging, `workflow.source.dir` property points to a workflow (notice: this is not a relative path but a classpath to directory usually holding `oozie_app` subdirectory).
The outcome of this packaging is `oozie-package.tar.gz` file containing inside all the resources required to run Oozie workflow:
- jar packages
- workflow definitions
- job properties
- maintenance scripts
Required properties
====================
In order to include proper workflow within package, `workflow.source.dir` property has to be set. It could be provided by setting `-Dworkflow.source.dir=some/job/dir` maven parameter.
In oder to define full set of cluster environment properties one should create `~/.dhp/application.properties` file with the following properties:
- `dhp.hadoop.frontend.user.name` - your user name on hadoop cluster and frontend machine
- `dhp.hadoop.frontend.host.name` - frontend host name
- `dhp.hadoop.frontend.temp.dir` - frontend directory for temporary files
- `dhp.hadoop.frontend.port.ssh` - frontend machine ssh port
- `oozieServiceLoc` - oozie service location required by run_workflow.sh script executing oozie job
- `nameNode` - name node address
- `jobTracker` - job tracker address
- `oozie.execution.log.file.location` - location of file that will be created when executing oozie job, it contains output produced by `run_workflow.sh` script (needed to obtain oozie job id)
- `maven.executable` - mvn command location, requires parameterization due to a different setup of CI cluster
- `sparkDriverMemory` - amount of memory assigned to spark jobs driver
- `sparkExecutorMemory` - amount of memory assigned to spark jobs executors
- `sparkExecutorCores` - number of cores assigned to spark jobs executors
All values will be overriden with the ones from `job.properties` and eventually `job-override.properties` stored in module's main folder.
When overriding properties from `job.properties`, `job-override.properties` file can be created in main module directory (the one containing `pom.xml` file) and define all new properties which will override existing properties. One can provide those properties one by one as command line -D arguments.
Properties overriding order is the following:
1. `pom.xml` defined properties (located in the project root dir)
2. `~/.dhp/application.properties` defined properties
3. `${workflow.source.dir}/job.properties`
4. `job-override.properties` (located in the project root dir)
5. `maven -Dparam=value`
where the maven `-Dparam` property is overriding all the other ones.
Workflow definition requirements
====================
`workflow.source.dir` property should point to the following directory structure:
[${workflow.source.dir}]
|
|-job.properties (optional)
|
\-[oozie_app]
|
\-workflow.xml
This property can be set using maven `-D` switch.
`[oozie_app]` is the default directory name however it can be set to any value as soon as `oozieAppDir` property is provided with directory name as value.
Subworkflows are supported as well and subworkflow directories should be nested within `[oozie_app]` directory.
Creating oozie installer step-by-step
=====================================
Automated oozie-installer steps are the following:
1. creating jar packages: `*.jar` and `*tests.jar` along with copying all dependancies in `target/dependencies`
2. reading properties from maven, `~/.dhp/application.properties`, `job.properties`, `job-override.properties`
3. invoking priming mechanism linking resources from import.txt file (currently resolving subworkflow resources)
4. assembling shell scripts for preparing Hadoop filesystem, uploading Oozie application and starting workflow
5. copying whole `${workflow.source.dir}` content to `target/${oozie.package.file.name}`
6. generating updated `job.properties` file in `target/${oozie.package.file.name}` based on maven, `~/.dhp/application.properties`, `job.properties` and `job-override.properties`
7. creating `lib` directory (or multiple directories for subworkflows for each nested directory) and copying jar packages created at step (1) to each one of them
8. bundling whole `${oozie.package.file.name}` directory into single tar.gz package
Uploading oozie package and running workflow on cluster
=======================================================
In order to simplify deployment and execution process two dedicated profiles were introduced:
- `deploy`
- `run`
to be used along with `oozie-package` profile e.g. by providing `-Poozie-package,deploy,run` maven parameters.
`deploy` profile supplements packaging process with:
1) uploading oozie-package via scp to `/home/${user.name}/oozie-packages` directory on `${dhp.hadoop.frontend.host.name}` machine
2) extracting uploaded package
3) uploading oozie content to hadoop cluster HDFS location defined in `oozie.wf.application.path` property (generated dynamically by maven build process, based on `${dhp.hadoop.frontend.user.name}` and `workflow.source.dir` properties)
`run` profile introduces:
1) executing oozie application uploaded to HDFS cluster using `deploy` command. Triggers `run_workflow.sh` script providing runtime properties defined in `job.properties` file.
Notice: ssh access to frontend machine has to be configured on system level and it is preferable to set key-based authentication in order to simplify remote operations.

View File

@ -25,7 +25,6 @@
<modules> <modules>
<module>dhp-workflow-profiles</module> <module>dhp-workflow-profiles</module>
<module>dhp-aggregation</module> <module>dhp-aggregation</module>
<module>dhp-distcp</module>
<module>dhp-actionmanager</module> <module>dhp-actionmanager</module>
<module>dhp-graph-mapper</module> <module>dhp-graph-mapper</module>
<module>dhp-dedup-openaire</module> <module>dhp-dedup-openaire</module>