convert_hive_to_spark_actions #1

Merged
antonis.lempesis merged 20 commits from convert_hive_to_spark_actions into beta 2024-09-23 13:53:29 +02:00
3 changed files with 64 additions and 40 deletions
Showing only changes of commit 6f2ebb2a52 - Show all commits

View File

@ -10,7 +10,7 @@ SET harvested='true'
WHERE datasource_tmp.id IN (SELECT DISTINCT d.id
FROM ${stats_db_name}.datasource_tmp d,
${stats_db_name}.result_datasources rd
WHERE d.id = rd.datasource); /*EOS*/
WHERE d.id = rd.datasource); -- /*EOS*/
-- Project temporary table update and final project table creation with final updates that can not be applied to ORC tables
UPDATE ${stats_db_name}.project_tmp
@ -19,9 +19,9 @@ WHERE project_tmp.id IN (SELECT pr.id
FROM ${stats_db_name}.project_results pr,
${stats_db_name}.result r
WHERE pr.result = r.id
AND r.type = 'publication'); /*EOS*/
AND r.type = 'publication'); -- /*EOS*/
DROP TABLE IF EXISTS ${stats_db_name}.project purge; /*EOS*/
DROP TABLE IF EXISTS ${stats_db_name}.project purge; -- /*EOS*/
CREATE TABLE ${stats_db_name}.project stored as parquet as
SELECT p.id,
@ -64,7 +64,7 @@ FROM ${stats_db_name}.project_tmp p
AND r.type = 'publication'
AND datediff(to_date(r.date), to_date(pp.enddate)) > 0
GROUP BY pp.id) AS prr2
ON prr2.id = p.id; /*EOS*/
ON prr2.id = p.id; -- /*EOS*/
UPDATE ${stats_db_name}.publication_tmp
SET delayed = 'yes'
@ -74,7 +74,7 @@ WHERE publication_tmp.id IN (SELECT distinct r.id
${stats_db_name}.project_tmp p
WHERE r.id = pr.result
AND pr.id = p.id
AND to_date(r.date) - to_date(p.enddate) > 0); /*EOS*/
AND to_date(r.date) - to_date(p.enddate) > 0); -- /*EOS*/
UPDATE ${stats_db_name}.dataset_tmp
SET delayed = 'yes'
@ -84,7 +84,7 @@ WHERE dataset_tmp.id IN (SELECT distinct r.id
${stats_db_name}.project_tmp p
WHERE r.id = pr.result
AND pr.id = p.id
AND to_date(r.date) - to_date(p.enddate) > 0); /*EOS*/
AND to_date(r.date) - to_date(p.enddate) > 0); -- /*EOS*/
UPDATE ${stats_db_name}.software_tmp
SET delayed = 'yes'
@ -94,7 +94,7 @@ WHERE software_tmp.id IN (SELECT distinct r.id
${stats_db_name}.project_tmp p
WHERE r.id = pr.result
AND pr.id = p.id
AND to_date(r.date) - to_date(p.enddate) > 0); /*EOS*/
AND to_date(r.date) - to_date(p.enddate) > 0); -- /*EOS*/
UPDATE ${stats_db_name}.otherresearchproduct_tmp
SET delayed = 'yes'
@ -104,7 +104,7 @@ WHERE otherresearchproduct_tmp.id IN (SELECT distinct r.id
${stats_db_name}.project_tmp p
WHERE r.id = pr.result
AND pr.id = p.id
AND to_date(r.date) - to_date(p.enddate) > 0); /*EOS*/
AND to_date(r.date) - to_date(p.enddate) > 0); -- /*EOS*/
CREATE OR REPLACE VIEW ${stats_db_name}.project_results_publication AS
SELECT result_projects.id AS result,
@ -117,4 +117,4 @@ FROM ${stats_db_name}.result_projects,
${stats_db_name}.project
WHERE result_projects.id = result.id
AND result.type = 'publication'
AND project.id = result_projects.project; /*EOS*/
AND project.id = result_projects.project; -- /*EOS*/

View File

@ -5,7 +5,7 @@
-- Datasource table/view and Datasource related tables/views
------------------------------------------------------------
------------------------------------------------------------
DROP TABLE IF EXISTS ${stats_db_name}.datasource_tmp purge; /*EOS*/
DROP TABLE IF EXISTS ${stats_db_name}.datasource_tmp purge; -- /*EOS*/
CREATE TABLE ${stats_db_name}.datasource_tmp
(
@ -23,6 +23,7 @@ CREATE TABLE ${stats_db_name}.datasource_tmp
issn_printed STRING,
issn_online STRING
) CLUSTERED BY (id) INTO 100 buckets stored AS orc tblproperties ('transactional' = 'true'); /*EOS*/
) CLUSTERED BY (id) INTO 100 buckets stored AS orc tblproperties ('transactional' = 'true'); -- /*EOS*/
-- Insert statement that takes into account the piwik_id of the openAIRE graph
INSERT INTO ${stats_db_name}.datasource_tmp
@ -46,16 +47,16 @@ FROM ${openaire_db_name}.datasource d1
LATERAL VIEW EXPLODE(originalid) temp AS originalidd
WHERE originalidd like "piwik:%") AS d2
ON d1.id = d2.id
WHERE d1.datainfo.deletedbyinference = FALSE and d1.datainfo.invisible=false; /*EOS*/
WHERE d1.datainfo.deletedbyinference = FALSE and d1.datainfo.invisible=false; -- /*EOS*/
-- Updating temporary table with everything that is not based on results -> This is done with the following "dual" table.
-- Creating a temporary dual table that will be removed after the following insert
DROP TABLE IF EXISTS ${stats_db_name}.dual purge; /*EOS*/
DROP TABLE IF EXISTS ${stats_db_name}.dual purge; -- /*EOS*/
CREATE TABLE ${stats_db_name}.dual ( dummy CHAR(1)); /*EOS*/
CREATE TABLE ${stats_db_name}.dual ( dummy CHAR(1)); -- /*EOS*/
INSERT INTO ${stats_db_name}.dual VALUES ('X'); /*EOS*/
INSERT INTO ${stats_db_name}.dual VALUES ('X'); -- /*EOS*/
INSERT INTO ${stats_db_name}.datasource_tmp (`id`, `name`, `type`, `dateofvalidation`, `yearofvalidation`, `harvested`,
`piwik_id`, `latitude`, `longitude`, `websiteurl`, `compatibility`, `issn_printed`, `issn_online`)
@ -73,42 +74,42 @@ SELECT 'other',
null,
null
FROM ${stats_db_name}.dual
WHERE 'other' not in (SELECT id FROM ${stats_db_name}.datasource_tmp WHERE name = 'Unknown Repository'); /*EOS*/
DROP TABLE ${stats_db_name}.dual; /*EOS*/
WHERE 'other' not in (SELECT id FROM ${stats_db_name}.datasource_tmp WHERE name = 'Unknown Repository'); -- /*EOS*/
DROP TABLE ${stats_db_name}.dual; -- /*EOS*/
UPDATE ${stats_db_name}.datasource_tmp SET name='Other' WHERE name = 'Unknown Repository'; /*EOS*/
UPDATE ${stats_db_name}.datasource_tmp SET yearofvalidation=null WHERE yearofvalidation = '-1'; /*EOS*/
UPDATE ${stats_db_name}.datasource_tmp SET name='Other' WHERE name = 'Unknown Repository'; -- /*EOS*/
UPDATE ${stats_db_name}.datasource_tmp SET yearofvalidation=null WHERE yearofvalidation = '-1'; -- /*EOS*/
DROP TABLE IF EXISTS ${stats_db_name}.datasource_languages purge; /*EOS*/
DROP TABLE IF EXISTS ${stats_db_name}.datasource_languages purge; -- /*EOS*/
CREATE TABLE ${stats_db_name}.datasource_languages STORED AS PARQUET AS
SELECT substr(d.id, 4) AS id, langs.languages AS language
FROM ${openaire_db_name}.datasource d LATERAL VIEW explode(d.odlanguages.value) langs AS languages
where d.datainfo.deletedbyinference=false and d.datainfo.invisible=false; /*EOS*/
where d.datainfo.deletedbyinference=false and d.datainfo.invisible=false; -- /*EOS*/
DROP TABLE IF EXISTS ${stats_db_name}.datasource_oids purge; /*EOS*/
DROP TABLE IF EXISTS ${stats_db_name}.datasource_oids purge; -- /*EOS*/
CREATE TABLE ${stats_db_name}.datasource_oids STORED AS PARQUET AS
SELECT substr(d.id, 4) AS id, oids.ids AS oid
FROM ${openaire_db_name}.datasource d LATERAL VIEW explode(d.originalid) oids AS ids
where d.datainfo.deletedbyinference=false and d.datainfo.invisible=false; /*EOS*/
where d.datainfo.deletedbyinference=false and d.datainfo.invisible=false; -- /*EOS*/
DROP TABLE IF EXISTS ${stats_db_name}.datasource_organizations purge; /*EOS*/
DROP TABLE IF EXISTS ${stats_db_name}.datasource_organizations purge; -- /*EOS*/
CREATE TABLE ${stats_db_name}.datasource_organizations STORED AS PARQUET AS
SELECT substr(r.target, 4) AS id, substr(r.source, 4) AS organization
FROM ${openaire_db_name}.relation r
WHERE r.reltype = 'datasourceOrganization' and r.datainfo.deletedbyinference = false and r.source like '20|%' and r.datainfo.invisible=false; /*EOS*/
WHERE r.reltype = 'datasourceOrganization' and r.datainfo.deletedbyinference = false and r.source like '20|%' and r.datainfo.invisible=false; -- /*EOS*/
-- datasource sources:
-- where the datasource info have been collected from.
DROP TABLE IF EXISTS ${stats_db_name}.datasource_sources purge; /*EOS*/
DROP TABLE IF EXISTS ${stats_db_name}.datasource_sources purge; -- /*EOS*/
create table if not exists ${stats_db_name}.datasource_sources STORED AS PARQUET AS
select substr(d.id, 4) as id, substr(cf.key, 4) as datasource
from ${openaire_db_name}.datasource d lateral view explode(d.collectedfrom) cfrom as cf
where d.datainfo.deletedbyinference = false and d.datainfo.invisible=false; /*EOS*/
where d.datainfo.deletedbyinference = false and d.datainfo.invisible=false; -- /*EOS*/
CREATE OR REPLACE VIEW ${stats_db_name}.datasource_results AS
SELECT datasource AS id, id AS result
FROM ${stats_db_name}.result_datasources; /*EOS*/
FROM ${stats_db_name}.result_datasources; -- /*EOS*/

View File

@ -308,7 +308,7 @@
<error to="Kill"/>
</action>
<action name="Step8">
<!-- <action name="Step8">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
@ -316,18 +316,29 @@
<class>eu.dnetlib.dhp.oozie.RunSQLSparkJob</class>
<jar>dhp-stats-update-${projectVersion}.jar</jar>
<spark-opts>
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
&#45;&#45;conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
${sparkClusterOpts}
${sparkResourceOpts}
${sparkApplicationOpts}
</spark-opts>
<arg>--hiveMetastoreUris</arg><arg>${hive_metastore_uris}</arg>
<arg>--sql</arg><arg>eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step8.sql</arg>
<arg>--stats_db_name</arg><arg>${stats_db_name}</arg>
<arg>--openaire_db_name</arg><arg>${openaire_db_name}</arg>
<arg>&#45;&#45;hiveMetastoreUris</arg><arg>${hive_metastore_uris}</arg>
<arg>&#45;&#45;sql</arg><arg>eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step8.sql</arg>
<arg>&#45;&#45;stats_db_name</arg><arg>${stats_db_name}</arg>
<arg>&#45;&#45;openaire_db_name</arg><arg>${openaire_db_name}</arg>
</spark>
<ok to="Step9"/>
<error to="Kill"/>
</action>-->
<action name="Step8">
<hive2 xmlns="uri:oozie:hive2-action:0.1">
<jdbc-url>${hive_jdbc_url}</jdbc-url>
<script>scripts/step8.sql</script>
<param>stats_db_name=${stats_db_name}</param>
<param>openaire_db_name=${openaire_db_name}</param>
</hive2>
<ok to="Step9"/>
<error to="Kill"/>
</action>
<action name="Step9">
@ -375,7 +386,7 @@
<error to="Kill"/>
</action>
<action name="Step11">
<!-- <action name="Step11">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
@ -383,19 +394,31 @@
<class>eu.dnetlib.dhp.oozie.RunSQLSparkJob</class>
<jar>dhp-stats-update-${projectVersion}.jar</jar>
<spark-opts>
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
&#45;&#45;conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
${sparkClusterOpts}
${sparkResourceOpts}
${sparkApplicationOpts}
</spark-opts>
<arg>--hiveMetastoreUris</arg><arg>${hive_metastore_uris}</arg>
<arg>--sql</arg><arg>eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step11.sql</arg>
<arg>--stats_db_name</arg><arg>${stats_db_name}</arg>
<arg>--openaire_db_name</arg><arg>${openaire_db_name}</arg>
<arg>--external_stats_db_name</arg><arg>${external_stats_db_name}</arg>
<arg>&#45;&#45;hiveMetastoreUris</arg><arg>${hive_metastore_uris}</arg>
<arg>&#45;&#45;sql</arg><arg>eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step11.sql</arg>
<arg>&#45;&#45;stats_db_name</arg><arg>${stats_db_name}</arg>
<arg>&#45;&#45;openaire_db_name</arg><arg>${openaire_db_name}</arg>
<arg>&#45;&#45;external_stats_db_name</arg><arg>${external_stats_db_name}</arg>
</spark>
<ok to="Step12"/>
<error to="Kill"/>
</action>-->
<action name="Step11">
<hive2 xmlns="uri:oozie:hive2-action:0.1">
<jdbc-url>${hive_jdbc_url}</jdbc-url>
<script>scripts/step11.sql</script>
<param>stats_db_name=${stats_db_name}</param>
<param>openaire_db_name=${openaire_db_name}</param>
<param>external_stats_db_name=${external_stats_db_name}</param>
</hive2>
<ok to="Step12"/>
<error to="Kill"/>
</action>
<action name="Step12">