store graph as hive DB #31

Open
opened 4 years ago by claudio.atzori · 5 comments
Owner

Currently the OpenAIRE graph materialized across the different processing steps is stored as compressed text files containing newline-delimited JSON records.

dataset
	.write()
	.option("compression", "gzip")
	.mode(SaveMode.Overwrite)
	.text(path);

In order to analyse the data with ease, it is often needed to make the graph content available as database on Hive, introducing overhead in

  1. the operations: time needed to run yet another workflow that performs the conversion;
  2. waste of resources: the hive DB is yet another graph materialization, stored in a different path (in a different format).

The purpose of this enhancement is therefore to address this limitation by making each individual graph processing step capable of storing the graph data as Hive DB.

In order to move on with this activity incrementally, both save modes could be supported, by means of a new parameter saveMode=json|parquet.

Hints

  • every spark node implementing a graph materialization should include few extra parameters in the org.apache.spark.SparkConf and make use of runWithSparkHiveSession instead of runWithSparkSession;
  • the method to store the graph, interpreting the saveMode parameter and implementing the two modes must be defined once in a common utility class, available to all the oozie workflow modules. Note that, as indicated in #29, the input/output parameter names should be normalized, therefore in case of saving the graph as json files the inputGraph will represent a path, likewise when saving the graph as hive DB the same parameter must indicate the DB name.
Currently the OpenAIRE graph materialized across the different processing steps is stored as compressed text files containing newline-delimited JSON records. ``` dataset .write() .option("compression", "gzip") .mode(SaveMode.Overwrite) .text(path); ``` In order to analyse the data with ease, it is often needed to make the graph content available as database on Hive, introducing overhead in 1. the operations: time needed to run yet another workflow that performs the conversion; 2. waste of resources: the hive DB is yet another graph materialization, stored in a different path (in a different format). The purpose of this enhancement is therefore to address this limitation by making each individual graph processing step capable of storing the graph data as Hive DB. In order to move on with this activity incrementally, both save modes could be supported, by means of a new parameter **saveMode=json|parquet**. Hints * every spark node implementing a graph materialization should include few extra parameters in the ```org.apache.spark.SparkConf``` and make use of ```runWithSparkHiveSession``` instead of ```runWithSparkSession```; * the method to store the graph, interpreting the **saveMode** parameter and implementing the two modes must be defined once in a common utility class, available to all the oozie workflow modules. Note that, as indicated in [#29](https://code-repo.d4science.org/D-Net/dnet-hadoop/issues/29#issue-1167), the input/output parameter names should be normalized, therefore in case of saving the graph as json files the **inputGraph** will represent a path, likewise when saving the graph as hive DB the same parameter must indicate the DB name.
claudio.atzori added the
enhancement
help wanted
labels 4 years ago
sandro.labruzzo was assigned by claudio.atzori 4 years ago
claudio.atzori self-assigned this 4 years ago
michele.artini was assigned by claudio.atzori 4 years ago
miriam.baglioni was assigned by claudio.atzori 4 years ago
Poster
Owner

I'm factoring out utilities to be used across different workflows in a new module dhp-workflows-common.

I'm factoring out utilities to be used across different workflows in a new module `dhp-workflows-common`.
michele.debonis was assigned by claudio.atzori 4 years ago
Poster
Owner

Ongoing update, up to f3ce97ecf9 I introduced the following changes

  • eu.dnetlib.dhp.common.GraphFormat declares the two supported formats JSON | HIVE
  • the workflows for the creation of the aggregatorGraph and for the creation of the merged graph (aggregatorGraph, mergeAggregatorGraphs, both defined in dhp-graph-mapper) are updated to accept both formats.

@michele.debonis can you take care of updating the deduplication workflow?

Ongoing update, up to https://code-repo.d4science.org/D-Net/dnet-hadoop/commit/f3ce97ecf9e1fb7b6d8bdaacd6f36e5dec7125d5 I introduced the following changes * `eu.dnetlib.dhp.common.GraphFormat` declares the two supported formats `JSON | HIVE` * the workflows for the creation of the aggregatorGraph and for the creation of the merged graph (`aggregatorGraph`, `mergeAggregatorGraphs`, both defined in `dhp-graph-mapper`) are updated to accept both formats. @michele.debonis can you take care of updating the deduplication workflow?
przemyslaw.jacewicz was assigned by claudio.atzori 4 years ago
Poster
Owner

@przemyslaw.jacewicz can you take over the adaptation of the actionmanager promote workflow?

@all The idea is to progressively move from the current JSON based encoding and the path-based parameters to the HIVE based encoding and DB name based parameters.

@przemyslaw.jacewicz can you take over the adaptation of the actionmanager promote workflow? @all The idea is to progressively move from the current JSON based encoding and the path-based parameters to the HIVE based encoding and DB name based parameters.
Collaborator

Yes, I can. I'm not sure when I'll be able to start the work, though. We have some pending tasks in IIS and some planning within our team is needed.

Yes, I can. I'm not sure when I'll be able to start the work, though. We have some pending tasks in IIS and some planning within our team is needed.
Poster
Owner

Ongoing update: I'm using the graphCleaning workflow to experiment the different combinations of input/output graph format and I notice that when the input graph is stored in hive, the memory footprint increases more than I expected; in particular with the default settings

sparkDriverMemory=2G
sparkExecutorMemory=7G
sparkExecutorCores=4

The workflow dies processing publications for running OOM: http://iis-cdh5-test-m1.ocean.icm.edu.pl:8088/cluster/app/application_1595483222400_5962

The actual error cause is hidden by the 2nd execution attempt, but in fact it failed with:

Container killed by YARN for exceeding memory limits.  8.0 GB of 8 GB physical memory used.

We already know that with the current graph model classes the impact on the available memory is quite high when usind bean Encoders, so whenever possible we used the kryo encoders instead. However, unfortunately storing a kryo encoded dataset in an Hive table, with the experiments I performed so far, results in a table with a single binary column (not to useful for data inspection/query).
Perhaps this behaviour can be changed with more investigations, but maybe there are some alternative.

The idea for tracking data quality metrics over time assumes to run a batch of SQL statements over some of the JSON encoded graph materializations and store the observations to prometheus. Running the same queries over time would create a set of time series that we hope would catch sensible data quality aspects.

The materialization of the graph as a proper HIVE DB is not really a prerequisite for running SQL queries against it. In fact SQL statements could be executed also against tempViews created from a spark dataset. As many SQL queries would need to join different tables we might probably need to make the entire set of graph tables available as tempViews.

Another benefit for this approach is that the existing procedure we already use to map the graph as a proper HIVE DB would still be available and could be run when we need to deepen the analysis.

All this would need some experimentation, but before moving on I'd like to hear the other people involved in this task :)

Ongoing update: I'm using the graphCleaning workflow to experiment the different combinations of input/output graph format and I notice that when the input graph is stored in hive, the memory footprint increases more than I expected; in particular with the default settings ``` sparkDriverMemory=2G sparkExecutorMemory=7G sparkExecutorCores=4 ``` The workflow dies processing publications for running OOM: http://iis-cdh5-test-m1.ocean.icm.edu.pl:8088/cluster/app/application_1595483222400_5962 The actual error cause is hidden by the 2nd execution attempt, but in fact it failed with: ``` Container killed by YARN for exceeding memory limits. 8.0 GB of 8 GB physical memory used. ``` We already know that with the current graph model classes the impact on the available memory is quite high when usind bean Encoders, so whenever possible we used the kryo encoders instead. However, unfortunately storing a kryo encoded dataset in an Hive table, with the experiments I performed so far, results in a table with a single binary column (not to useful for data inspection/query). Perhaps this behaviour can be changed with more investigations, but maybe there are some alternative. The idea for tracking data quality metrics over time assumes to run a batch of SQL statements over some of the JSON encoded graph materializations and store the observations to prometheus. Running the same queries over time would create a set of time series that we hope would catch sensible data quality aspects. The materialization of the graph as a proper HIVE DB is not really a prerequisite for running SQL queries against it. In fact SQL statements could be executed also against tempViews created from a spark dataset. As many SQL queries would need to join different tables we might probably need to make the entire set of graph tables available as tempViews. Another benefit for this approach is that the existing procedure we already use to map the graph as a proper HIVE DB would still be available and could be run when we need to deepen the analysis. All this would need some experimentation, but before moving on I'd like to hear the other people involved in this task :)
Sign in to join this conversation.
No Milestone
No project
2 Participants
Notifications
Due Date
The due date is invalid or out of range. Please use the format 'yyyy-mm-dd'.

No due date set.

Dependencies

No dependencies set.

Reference: D-Net/dnet-hadoop#31
Loading…
There is no content yet.