convert_hive_to_spark_actions #1

Merged
antonis.lempesis merged 20 commits from convert_hive_to_spark_actions into beta 2024-09-23 13:53:29 +02:00
10 changed files with 248 additions and 458 deletions
Showing only changes of commit 54e11b6a43 - Show all commits

View File

@ -4,108 +4,6 @@
----------------------------------------------------------------
----------------------------------------------------------------
--Datasource temporary table updates
UPDATE ${stats_db_name}.datasource_tmp
SET harvested='true'
WHERE datasource_tmp.id IN (SELECT DISTINCT d.id
FROM ${stats_db_name}.datasource_tmp d,
${stats_db_name}.result_datasources rd
WHERE d.id = rd.datasource); -- /*EOS*/
-- Project temporary table update and final project table creation with final updates that can not be applied to ORC tables
UPDATE ${stats_db_name}.project_tmp
SET haspubs='yes'
WHERE project_tmp.id IN (SELECT pr.id
FROM ${stats_db_name}.project_results pr,
${stats_db_name}.result r
WHERE pr.result = r.id
AND r.type = 'publication'); -- /*EOS*/
DROP TABLE IF EXISTS ${stats_db_name}.project purge; -- /*EOS*/
CREATE TABLE ${stats_db_name}.project stored as parquet as
SELECT p.id,
p.acronym,
p.title,
p.funder,
p.funding_lvl0,
p.funding_lvl1,
p.funding_lvl2,
p.ec39,
p.type,
p.startdate,
p.enddate,
p.start_year,
p.end_year,
p.duration,
CASE WHEN prr1.id IS NULL THEN 'no' ELSE 'yes' END AS haspubs,
CASE WHEN prr1.id IS NULL THEN 0 ELSE prr1.np END AS numpubs,
CASE WHEN prr2.id IS NULL THEN 0 ELSE prr2.daysForlastPub END AS daysforlastpub,
CASE WHEN prr2.id IS NULL THEN 0 ELSE prr2.dp END AS delayedpubs,
p.callidentifier,
p.code,
p.totalcost,
p.fundedamount,
p.currency
FROM ${stats_db_name}.project_tmp p
LEFT JOIN (SELECT pr.id, count(distinct pr.result) AS np
FROM ${stats_db_name}.project_results pr
INNER JOIN ${stats_db_name}.result r ON pr.result = r.id
WHERE r.type = 'publication'
GROUP BY pr.id) AS prr1 on prr1.id = p.id
LEFT JOIN (SELECT pp.id,
max(datediff(to_date(r.date), to_date(pp.enddate))) AS daysForlastPub,
count(distinct r.id) AS dp
FROM ${stats_db_name}.project_tmp pp,
${stats_db_name}.project_results pr,
${stats_db_name}.result r
WHERE pp.id = pr.id
AND pr.result = r.id
AND r.type = 'publication'
AND datediff(to_date(r.date), to_date(pp.enddate)) > 0
GROUP BY pp.id) AS prr2
ON prr2.id = p.id; -- /*EOS*/
UPDATE ${stats_db_name}.publication_tmp
SET delayed = 'yes'
WHERE publication_tmp.id IN (SELECT distinct r.id
FROM ${stats_db_name}.result r,
${stats_db_name}.project_results pr,
${stats_db_name}.project_tmp p
WHERE r.id = pr.result
AND pr.id = p.id
AND to_date(r.date) - to_date(p.enddate) > 0); -- /*EOS*/
UPDATE ${stats_db_name}.dataset_tmp
SET delayed = 'yes'
WHERE dataset_tmp.id IN (SELECT distinct r.id
FROM ${stats_db_name}.result r,
${stats_db_name}.project_results pr,
${stats_db_name}.project_tmp p
WHERE r.id = pr.result
AND pr.id = p.id
AND to_date(r.date) - to_date(p.enddate) > 0); -- /*EOS*/
UPDATE ${stats_db_name}.software_tmp
SET delayed = 'yes'
WHERE software_tmp.id IN (SELECT distinct r.id
FROM ${stats_db_name}.result r,
${stats_db_name}.project_results pr,
${stats_db_name}.project_tmp p
WHERE r.id = pr.result
AND pr.id = p.id
AND to_date(r.date) - to_date(p.enddate) > 0); -- /*EOS*/
UPDATE ${stats_db_name}.otherresearchproduct_tmp
SET delayed = 'yes'
WHERE otherresearchproduct_tmp.id IN (SELECT distinct r.id
FROM ${stats_db_name}.result r,
${stats_db_name}.project_results pr,
${stats_db_name}.project_tmp p
WHERE r.id = pr.result
AND pr.id = p.id
AND to_date(r.date) - to_date(p.enddate) > 0); -- /*EOS*/
CREATE OR REPLACE VIEW ${stats_db_name}.project_results_publication AS
SELECT result_projects.id AS result,
result_projects.project AS project_results,

View File

@ -1,42 +1,4 @@
------------------------------------------------------------------------------------------------------
-- Creating parquet tables from the updated temporary tables and removing unnecessary temporary tables
------------------------------------------------------------------------------------------------------
DROP TABLE IF EXISTS ${stats_db_name}.datasource purge; /*EOS*/
CREATE TABLE ${stats_db_name}.datasource stored AS parquet AS
SELECT *
FROM ${stats_db_name}.datasource_tmp; /*EOS*/
DROP TABLE IF EXISTS ${stats_db_name}.publication purge; /*EOS*/
CREATE TABLE ${stats_db_name}.publication stored AS parquet AS
SELECT *
FROM ${stats_db_name}.publication_tmp; /*EOS*/
DROP TABLE IF EXISTS ${stats_db_name}.dataset purge; /*EOS*/
CREATE TABLE ${stats_db_name}.dataset stored AS parquet AS
SELECT *
FROM ${stats_db_name}.dataset_tmp; /*EOS*/
DROP TABLE IF EXISTS ${stats_db_name}.software purge; /*EOS*/
CREATE TABLE ${stats_db_name}.software stored AS parquet AS
SELECT *
FROM ${stats_db_name}.software_tmp; /*EOS*/
DROP TABLE IF EXISTS ${stats_db_name}.otherresearchproduct purge; /*EOS*/
CREATE TABLE ${stats_db_name}.otherresearchproduct stored AS parquet AS
SELECT *
FROM ${stats_db_name}.otherresearchproduct_tmp; /*EOS*/
DROP TABLE ${stats_db_name}.project_tmp; /*EOS*/
DROP TABLE ${stats_db_name}.datasource_tmp; /*EOS*/
DROP TABLE ${stats_db_name}.publication_tmp; /*EOS*/
DROP TABLE ${stats_db_name}.dataset_tmp; /*EOS*/
DROP TABLE ${stats_db_name}.software_tmp; /*EOS*/
DROP TABLE ${stats_db_name}.otherresearchproduct_tmp; /*EOS*/
set mapred.job.queue.name=analytics; /*EOS*/
----------------------------------------------
-- Re-creating views from final parquet tables

View File

@ -1,58 +1,26 @@
set mapred.job.queue.name=analytics; /*EOS*/
-- replace the creation of the result view to include the boolean fields from the previous tables (green, gold,
-- replace the creation of the result view with a table, which will include the boolean fields from the previous tables (green, gold,
-- peer reviewed)
drop table if exists ${stats_db_name}.result_tmp; /*EOS*/
CREATE TABLE ${stats_db_name}.result_tmp (
id STRING,
title STRING,
publisher STRING,
journal STRING,
`date` STRING,
`year` INT,
bestlicence STRING,
access_mode STRING,
embargo_end_date STRING,
delayed BOOLEAN,
authors INT,
source STRING,
abstract BOOLEAN,
type STRING ,
peer_reviewed BOOLEAN,
green BOOLEAN,
gold BOOLEAN)
clustered by (id) into 100 buckets stored as orc tblproperties('transactional'='true'); /*EOS*/
insert into ${stats_db_name}.result_tmp
select r.id, r.title, r.publisher, r.journal, r.`date`, date_format(r.`date`, 'yyyy'), r.bestlicence, r.bestlicence, r.embargo_end_date, r.delayed, r.authors, r.source, r.abstract, r.type, pr.peer_reviewed, green.green, gold.gold
FROM ${stats_db_name}.publication r
LEFT OUTER JOIN ${stats_db_name}.result_peerreviewed pr on pr.id=r.id
LEFT OUTER JOIN ${stats_db_name}.result_greenoa green on green.id=r.id
LEFT OUTER JOIN ${stats_db_name}.result_gold gold on gold.id=r.id; /*EOS*/
insert into ${stats_db_name}.result_tmp
select r.id, r.title, r.publisher, r.journal, r.`date`, date_format(r.`date`, 'yyyy'), r.bestlicence, r.bestlicence, r.embargo_end_date, r.delayed, r.authors, r.source, r.abstract, r.type, pr.peer_reviewed, green.green, gold.gold
FROM ${stats_db_name}.dataset r
LEFT OUTER JOIN ${stats_db_name}.result_peerreviewed pr on pr.id=r.id
LEFT OUTER JOIN ${stats_db_name}.result_greenoa green on green.id=r.id
LEFT OUTER JOIN ${stats_db_name}.result_gold gold on gold.id=r.id; /*EOS*/
insert into ${stats_db_name}.result_tmp
select r.id, r.title, r.publisher, r.journal, r.`date`, date_format(r.`date`, 'yyyy'), r.bestlicence, r.bestlicence, r.embargo_end_date, r.delayed, r.authors, r.source, r.abstract, r.type, pr.peer_reviewed, green.green, gold.gold
FROM ${stats_db_name}.software r
LEFT OUTER JOIN ${stats_db_name}.result_peerreviewed pr on pr.id=r.id
LEFT OUTER JOIN ${stats_db_name}.result_greenoa green on green.id=r.id
LEFT OUTER JOIN ${stats_db_name}.result_gold gold on gold.id=r.id; /*EOS*/
insert into ${stats_db_name}.result_tmp
select r.id, r.title, r.publisher, r.journal, r.`date`, date_format(r.`date`, 'yyyy'), r.bestlicence, r.bestlicence, r.embargo_end_date, r.delayed, r.authors, r.source, r.abstract, r.type, pr.peer_reviewed, green.green, gold.gold
FROM ${stats_db_name}.otherresearchproduct r
LEFT OUTER JOIN ${stats_db_name}.result_peerreviewed pr on pr.id=r.id
LEFT OUTER JOIN ${stats_db_name}.result_greenoa green on green.id=r.id
LEFT OUTER JOIN ${stats_db_name}.result_gold gold on gold.id=r.id; /*EOS*/
drop table if exists ${stats_db_name}.result; /*EOS*/
drop view if exists ${stats_db_name}.result; /*EOS*/
create table ${stats_db_name}.result stored as parquet as select * from ${stats_db_name}.result_tmp; /*EOS*/
drop table ${stats_db_name}.result_tmp; /*EOS*/
drop table if exists ${stats_db_name}.result; /*EOS*/
CREATE TABLE ${stats_db_name}.result stored as parquet as
SELECT /*+ COALESCE(100) */ r.id, r.title, r.publisher, r.journal, r.`date`, DATE_FORMAT(r.`date`, 'yyyy'), r.bestlicence, r.bestlicence, r.embargo_end_date, r.delayed, r.authors, r.source, r.abstract, r.type, pr.peer_reviewed, green.green, gold.gold
FROM (
(SELECT id, title, p.publisher, journal, `date`, DATE_FORMAT(`date`, 'yyyy'), bestlicence, bestlicence, embargo_end_date, delayed, authors, source, abstract, type
FROM ${stats_db_name}.publication)
UNION ALL
(SELECT id, title, p.publisher, journal, `date`, DATE_FORMAT(`date`, 'yyyy'), bestlicence, bestlicence, embargo_end_date, delayed, authors, source, abstract, type
FROM ${stats_db_name}.dataset)
UNION ALL
(select id, title, p.publisher, journal, `date`, DATE_FORMAT(`date`, 'yyyy'), bestlicence, bestlicence, embargo_end_date, delayed, authors, source, abstract, type
FROM ${stats_db_name}.software)
UNION ALL
(select id, title, p.publisher, journal, `date`, DATE_FORMAT(`date`, 'yyyy'), bestlicence, bestlicence, embargo_end_date, delayed, authors, source, abstract, type
FROM ${stats_db_name}.otherresearchproduct)
) r
LEFT OUTER JOIN ${stats_db_name}.result_peerreviewed pr on pr.id=r.id
LEFT OUTER JOIN ${stats_db_name}.result_greenoa green on green.id=r.id
LEFT OUTER JOIN ${stats_db_name}.result_gold gold on gold.id=r.id; /*EOS*/

View File

@ -7,41 +7,41 @@ set mapred.job.queue.name=analytics; /*EOS*/
--------------------------------------------------------------
-- Publication temporary table
DROP TABLE IF EXISTS ${stats_db_name}.publication_tmp purge; /*EOS*/
CREATE TABLE ${stats_db_name}.publication_tmp
(
id STRING,
title STRING,
publisher STRING,
journal STRING,
date STRING,
year STRING,
bestlicence STRING,
embargo_end_date STRING,
delayed BOOLEAN,
authors INT,
source STRING,
abstract BOOLEAN,
type STRING
)
clustered by (id) into 100 buckets stored as orc tblproperties ('transactional' = 'true'); /*EOS*/
DROP TABLE IF EXISTS ${stats_db_name}.publication purge; /*EOS*/
CREATE TABLE ${stats_db_name}.publication stored as parquet as
with pub_pr as (
select pub.id as pub_id, case when (to_date(pub.dateofacceptance.value) > to_date( pj.enddate.value)) then true else false end as delayed
from ${openaire_db_name}.publication pub
join ${openaire_db_name}.relation rel
on reltype = 'resultProject' and relclass = 'isProducedBy' and rel.source=pub.id
and rel.datainfo.deletedbyinference = false and rel.datainfo.invisible = false
join ${openaire_db_name}.project pj on pj.id=rel.target and pj.datainfo.deletedbyinference = false and pj.datainfo.invisible = false
where pub.datainfo.deletedbyinference = false and pub.datainfo.invisible = false
),
pub_delayed as (
select pub_id, max(delayed) as delayed
from pub_pr
group by pub_id
)
select /*+ COALESCE(100) */
substr(pub.id, 4) as id,
pub.title[0].value as title,
pub.publisher.value as publisher,
pub.journal.name as journal,
pub.dateofacceptance.value as date,
date_format(pub.dateofacceptance.value, 'yyyy') as year,
pub.bestaccessright.classname as bestlicence,
pub.embargoenddate.value as embargo_end_date,
coalesce(pub_delayed.delayed, false) as delayed, -- It's delayed, when the publication was published after the end of at least one of its projects.
size(pub.author) as authors,
concat_ws('\u003B', pub.source.value) as source,
case when size(pub.description) > 0 then true else false end as abstract,
'publication' as type
from ${openaire_db_name}.publication pub
left outer join pub_delayed on pub.id=pub_delayed.pub_id
where pub.datainfo.deletedbyinference = false and pub.datainfo.invisible = false; /*EOS*/
INSERT INTO ${stats_db_name}.publication_tmp
SELECT substr(p.id, 4) as id,
p.title[0].value as title,
p.publisher.value as publisher,
p.journal.name as journal,
p.dateofacceptance.value as date,
date_format(p.dateofacceptance.value, 'yyyy') as year,
p.bestaccessright.classname as bestlicence,
p.embargoenddate.value as embargo_end_date,
false as delayed,
size(p.author) as authors,
concat_ws('\u003B', p.source.value) as source,
case when size(p.description) > 0 then true else false end as abstract,
'publication' as type
from ${openaire_db_name}.publication p
where p.datainfo.deletedbyinference = false and p.datainfo.invisible=false; /*EOS*/
DROP TABLE IF EXISTS ${stats_db_name}.publication_classifications purge; /*EOS*/

View File

@ -5,42 +5,41 @@
------------------------------------------------------
-- Dataset temporary table supporting updates
DROP TABLE IF EXISTS ${stats_db_name}.dataset_tmp purge; /*EOS*/
DROP TABLE IF EXISTS ${stats_db_name}.dataset purge; /*EOS*/
CREATE TABLE ${stats_db_name}.dataset_tmp
(
id STRING,
title STRING,
publisher STRING,
journal STRING,
date STRING,
year STRING,
bestlicence STRING,
embargo_end_date STRING,
delayed BOOLEAN,
authors INT,
source STRING,
abstract BOOLEAN,
type STRING
CREATE TABLE ${stats_db_name}.dataset stored as parquet as
with datast_pr as (
select datast.id as datast_id, case when (to_date(datast.dateofacceptance.value) > to_date( pj.enddate.value)) then true else false end as delayed
from ${openaire_db_name}.dataset datast
join ${openaire_db_name}.relation rel
on reltype = 'resultProject' and relclass = 'isProducedBy' and rel.source=datast.id
and rel.datainfo.deletedbyinference = false and rel.datainfo.invisible = false
join ${openaire_db_name}.project pj on pj.id=rel.target and pj.datainfo.deletedbyinference = false and pj.datainfo.invisible = false
where datast.datainfo.deletedbyinference = false and datast.datainfo.invisible = false
),
datast_delayed as (
select datast_id, max(delayed) as delayed
from datast_pr
group by datast_id
)
clustered by (id) into 100 buckets stored AS orc tblproperties ('transactional' = 'true'); /*EOS*/
select /*+ COALESCE(100) */
substr(datast.id, 4) as id,
datast.title[0].value as title,
datast.publisher.value as publisher,
cast(null as string) as journal,
datast.dateofacceptance.value as date,
date_format(datast.dateofacceptance.value, 'yyyy') as year,
datast.bestaccessright.classname as bestlicence,
datast.embargoenddate.value as embargo_end_date,
coalesce(datast_delayed.delayed, false) as delayed, -- It's delayed, when the dataset was published after the end of the project.
size(datast.author) as authors,
concat_ws('\u003B', datast.source.value) as source,
case when size(datast.description) > 0 then true else false end as abstract,
'dataset' as type
from ${openaire_db_name}.dataset datast
left outer join datast_delayed on datast.id=datast_delayed.datast_id
where datast.datainfo.deletedbyinference = false and datast.datainfo.invisible = false; /*EOS*/
INSERT INTO ${stats_db_name}.dataset_tmp
SELECT substr(d.id, 4) AS id,
d.title[0].value AS title,
d.publisher.value AS publisher,
cast(null AS string) AS journal,
d.dateofacceptance.value as date,
date_format(d.dateofacceptance.value, 'yyyy') AS year,
d.bestaccessright.classname AS bestlicence,
d.embargoenddate.value AS embargo_end_date,
false AS delayed,
size(d.author) AS authors,
concat_ws('\u003B', d.source.value) AS source,
CASE WHEN SIZE(d.description) > 0 THEN TRUE ELSE FALSE end AS abstract,
'dataset' AS type
FROM ${openaire_db_name}.dataset d
WHERE d.datainfo.deletedbyinference = FALSE and d.datainfo.invisible=false; /*EOS*/
DROP TABLE IF EXISTS ${stats_db_name}.dataset_citations purge; /*EOS*/

View File

@ -5,41 +5,41 @@
--------------------------------------------------------
-- Software temporary table supporting updates
DROP TABLE IF EXISTS ${stats_db_name}.software_tmp purge; /*EOS*/
CREATE TABLE ${stats_db_name}.software_tmp
(
id STRING,
title STRING,
publisher STRING,
journal STRING,
date STRING,
year STRING,
bestlicence STRING,
embargo_end_date STRING,
delayed BOOLEAN,
authors INT,
source STRING,
abstract BOOLEAN,
type STRING
)
clustered by (id) INTO 100 buckets stored AS orc tblproperties ('transactional' = 'true'); /*EOS*/
DROP TABLE IF EXISTS ${stats_db_name}.software purge; /*EOS*/
CREATE TABLE ${stats_db_name}.software stored as parquet as
with soft_pr as (
select soft.id as soft_id, case when (to_date(soft.dateofacceptance.value) > to_date( pj.enddate.value)) then true else false end as delayed
from ${openaire_db_name}.software soft
join ${openaire_db_name}.relation rel
on reltype = 'resultProject' and relclass = 'isProducedBy' and rel.source=soft.id
and rel.datainfo.deletedbyinference = false and rel.datainfo.invisible = false
join ${openaire_db_name}.project pj on pj.id=rel.target and pj.datainfo.deletedbyinference = false and pj.datainfo.invisible = false
where soft.datainfo.deletedbyinference = false and soft.datainfo.invisible = false
),
soft_delayed as (
select soft_id, max(delayed) as delayed
from soft_pr
group by soft_id
)
select /*+ COALESCE(100) */
substr(soft.id, 4) as id,
soft.title[0].value as title,
soft.publisher.value as publisher,
cast(null as string) as journal,
soft.dateofacceptance.value as date,
date_format(soft.dateofacceptance.value, 'yyyy') as year,
soft.bestaccessright.classname as bestlicence,
soft.embargoenddate.value as embargo_end_date,
coalesce(soft_delayed.delayed, false) as delayed, -- It's delayed, when the software was published after the end of the project.
size(soft.author) as authors,
concat_ws('\u003B', soft.source.value) as source,
case when size(soft.description) > 0 then true else false end as abstract,
'software' as type
from ${openaire_db_name}.software soft
left outer join soft_delayed on soft.id=soft_delayed.soft_id
where soft.datainfo.deletedbyinference = false and soft.datainfo.invisible = false; /*EOS*/
INSERT INTO ${stats_db_name}.software_tmp
SELECT substr(s.id, 4) as id,
s.title[0].value AS title,
s.publisher.value AS publisher,
CAST(NULL AS string) AS journal,
s.dateofacceptance.value AS DATE,
date_format(s.dateofacceptance.value, 'yyyy') AS YEAR,
s.bestaccessright.classname AS bestlicence,
s.embargoenddate.value AS embargo_end_date,
FALSE AS delayed,
SIZE(s.author) AS authors,
concat_ws('\u003B', s.source.value) AS source,
CASE WHEN SIZE(s.description) > 0 THEN TRUE ELSE FALSE END AS abstract,
'software' as type
from ${openaire_db_name}.software s
where s.datainfo.deletedbyinference = false and s.datainfo.invisible=false; /*EOS*/
DROP TABLE IF EXISTS ${stats_db_name}.software_citations purge; /*EOS*/

View File

@ -5,41 +5,41 @@
--------------------------------------------------------------------------------
-- Otherresearchproduct temporary table supporting updates
DROP TABLE IF EXISTS ${stats_db_name}.otherresearchproduct_tmp purge; /*EOS*/
DROP TABLE IF EXISTS ${stats_db_name}.otherresearchproduct purge; /*EOS*/
CREATE TABLE ${stats_db_name}.otherresearchproduct_tmp
(
id STRING,
title STRING,
publisher STRING,
journal STRING,
date STRING,
year STRING,
bestlicence STRING,
embargo_end_date STRING,
delayed BOOLEAN,
authors INT,
source STRING,
abstract BOOLEAN,
type STRING
) CLUSTERED BY (id) INTO 100 buckets stored AS orc tblproperties ('transactional' = 'true'); /*EOS*/
CREATE TABLE ${stats_db_name}.otherresearchproduct stored as parquet as
with other_pr as (
select other.id as other_id, case when (to_date(other.dateofacceptance.value) > to_date( pj.enddate.value)) then true else false end as delayed
from ${openaire_db_name}.otherresearchproduct other
join ${openaire_db_name}.relation rel
on reltype = 'resultProject' and relclass = 'isProducedBy' and rel.source=other.id
and rel.datainfo.deletedbyinference = false and rel.datainfo.invisible = false
join ${openaire_db_name}.project pj on pj.id=rel.target and pj.datainfo.deletedbyinference = false and pj.datainfo.invisible = false
where other.datainfo.deletedbyinference = false and other.datainfo.invisible = false
),
other_delayed as (
select other_id, max(delayed) as delayed
from other_pr
group by other_id
)
select /*+ COALESCE(100) */
substr(other.id, 4) as id,
other.title[0].value as title,
other.publisher.value as publisher,
cast(null as string) as journal,
other.dateofacceptance.value as date,
date_format(other.dateofacceptance.value, 'yyyy') as year,
other.bestaccessright.classname as bestlicence,
other.embargoenddate.value as embargo_end_date,
false as delayed,
size(other.author) as authors,
concat_ws('\u003B', other.source.value) as source,
case when size(other.description) > 0 then true else false end as abstract,
'other' as type
from ${openaire_db_name}.otherresearchproduct other
left outer join other_delayed on other.id=other_delayed.other_id
where other.datainfo.deletedbyinference = false and other.datainfo.invisible = false; /*EOS*/
INSERT INTO ${stats_db_name}.otherresearchproduct_tmp
SELECT substr(o.id, 4) AS id,
o.title[0].value AS title,
o.publisher.value AS publisher,
CAST(NULL AS string) AS journal,
o.dateofacceptance.value AS DATE,
date_format(o.dateofacceptance.value, 'yyyy') AS year,
o.bestaccessright.classname AS bestlicence,
o.embargoenddate.value as embargo_end_date,
FALSE AS delayed,
SIZE(o.author) AS authors,
concat_ws('\u003B', o.source.value) AS source,
CASE WHEN SIZE(o.description) > 0 THEN TRUE ELSE FALSE END AS abstract,
'other' AS type
FROM ${openaire_db_name}.otherresearchproduct o
WHERE o.datainfo.deletedbyinference = FALSE and o.datainfo.invisible=false; /*EOS*/
-- Otherresearchproduct_citations
DROP TABLE IF EXISTS ${stats_db_name}.otherresearchproduct_citations purge; /*EOS*/

View File

@ -34,61 +34,69 @@ from ${openaire_db_name}.project p
lateral view explode(p.h2020classification) classifs as class
where p.datainfo.deletedbyinference=false and p.datainfo.invisible=false and class.h2020programme is not null; /*EOS*/
DROP TABLE IF EXISTS ${stats_db_name}.project_tmp purge; /*EOS*/
DROP TABLE IF EXISTS ${stats_db_name}.project purge; /*EOS*/
CREATE TABLE ${stats_db_name}.project_tmp
(
id STRING,
acronym STRING,
title STRING,
funder STRING,
funding_lvl0 STRING,
funding_lvl1 STRING,
funding_lvl2 STRING,
ec39 STRING,
type STRING,
startdate STRING,
enddate STRING,
start_year INT,
end_year INT,
duration INT,
haspubs STRING,
numpubs INT,
daysforlastpub INT,
delayedpubs INT,
callidentifier STRING,
code STRING,
totalcost FLOAT,
fundedamount FLOAT,
currency STRING
) CLUSTERED BY (id) INTO 100 buckets stored AS orc tblproperties ('transactional' = 'true'); /*EOS*/
CREATE TABLE ${stats_db_name}.project stored as parquet as
with pr_pub as (
select pr.id as pr_id, pub.id as pub_id,
(case when datediff(pub.dt_dateofacceptance, pr.dt_enddate) > 0 then true else false end) as delayed,
max(datediff(pub.dt_dateofacceptance, pr.dt_enddate)) as daysForlastPub
from (select id, to_date(dateofacceptance.value) as dt_dateofacceptance from ${openaire_db_name}.publication
where datainfo.deletedbyinference = false and datainfo.invisible = false) pub
join ${openaire_db_name}.relation rel
on rel.reltype = 'resultProject' and rel.relclass = 'isProducedBy' and rel.source=pub.id
and rel.datainfo.deletedbyinference = false and rel.datainfo.invisible = false
join (select id, to_date(enddate.value) as dt_enddate from ${openaire_db_name}.project
where datainfo.deletedbyinference = false and datainfo.invisible = false) pr
on pr.id=rel.target
group by pr.id, pub.id, pub.dt_dateofacceptance, pr.dt_enddate
),
num_pubs_pr as (
select pr_id, count( distinct pub_id) as num_pubs
from pr_pub
group by pr_id
),
pub_delayed as (
select pr_id, pub_id, max(delayed) as delayed
from pr_pub
group by pr_id, pub_id
),
num_pub_delayed as (
select pr_id, count(distinct pub_id) as num_delayed
from pub_delayed
where delayed
group by pr_id
)
select /*+ COALESCE(100) */
substr(p.id, 4) as id,
p.acronym.value as acronym,
p.title.value as title,
xpath_string(p.fundingtree[0].value, '//funder/name') as funder,
xpath_string(p.fundingtree[0].value, '//funding_level_0/name') as funding_lvl0,
xpath_string(p.fundingtree[0].value, '//funding_level_1/name') as funding_lvl1,
xpath_string(p.fundingtree[0].value, '//funding_level_2/name') as funding_lvl2,
p.ecsc39.value as ec39,
p.contracttype.classname as type,
p.startdate.value as startdate,
p.enddate.value as enddate,
year(p.startdate.value) as start_year,
year(p.enddate.value) as end_year,
cast(months_between(p.enddate.value, p.startdate.value) as int) as duration,
case when pr_pub.pub_id is null then 'no' else 'yes' end as haspubs,
num_pubs_pr.num_pubs as numpubs,
pr_pub.daysForlastPub as daysForlastPub,
npd.num_delayed as delayedpubs,
p.callidentifier.value as callidentifier,
p.code.value as code,
p.totalcost as totalcost,
p.fundedamount as fundedamount,
p.currency.value as currency
from ${openaire_db_name}.project p
left outer join pr_pub on pr_pub.pr_id = p.id
left outer join num_pubs_pr on num_pubs_pr.pr_id = p.id
left outer join num_pub_delayed npd on npd.pr_id=p.id
where p.datainfo.deletedbyinference = false and p.datainfo.invisible = false; /*EOS*/
INSERT INTO ${stats_db_name}.project_tmp
SELECT substr(p.id, 4) AS id,
p.acronym.value AS acronym,
p.title.value AS title,
xpath_string(p.fundingtree[0].value, '//funder/name') AS funder,
xpath_string(p.fundingtree[0].value, '//funding_level_0/name') AS funding_lvl0,
xpath_string(p.fundingtree[0].value, '//funding_level_1/name') AS funding_lvl1,
xpath_string(p.fundingtree[0].value, '//funding_level_2/name') AS funding_lvl2,
p.ecsc39.value AS ec39,
p.contracttype.classname AS type,
p.startdate.value AS startdate,
p.enddate.value AS enddate,
year(p.startdate.value) AS start_year,
year(p.enddate.value) AS end_year,
CAST(MONTHS_BETWEEN(p.enddate.value, p.startdate.value) AS INT) AS duration,
'no' AS haspubs,
0 AS numpubs,
0 AS daysforlastpub,
0 AS delayedpubs,
p.callidentifier.value AS callidentifier,
p.code.value AS code,
p.totalcost AS totalcost,
p.fundedamount AS fundedamount,
p.currency.value AS currency
FROM ${openaire_db_name}.project p
WHERE p.datainfo.deletedbyinference = false and p.datainfo.invisible=false; /*EOS*/
DROP TABLE IF EXISTS ${stats_db_name}.funder purge; /*EOS*/

View File

@ -7,16 +7,16 @@
-- Views on temporary tables that should be re-created in the end
CREATE OR REPLACE VIEW ${stats_db_name}.result as
SELECT *, bestlicence AS access_mode
FROM ${stats_db_name}.publication_tmp
FROM ${stats_db_name}.publication
UNION ALL
SELECT *, bestlicence AS access_mode
FROM ${stats_db_name}.software_tmp
FROM ${stats_db_name}.software
UNION ALL
SELECT *, bestlicence AS access_mode
FROM ${stats_db_name}.dataset_tmp
FROM ${stats_db_name}.dataset
UNION ALL
SELECT *, bestlicence AS access_mode
FROM ${stats_db_name}.otherresearchproduct_tmp; /*EOS*/
FROM ${stats_db_name}.otherresearchproduct; /*EOS*/
-- Views on final tables
CREATE OR REPLACE VIEW ${stats_db_name}.result_datasources AS
@ -153,4 +153,4 @@ CREATE TABLE ${stats_db_name}.result_projects STORED AS PARQUET AS
select /*+ COALESCE(100) */ pr.result AS id, pr.id AS project, datediff(p.enddate, p.startdate) AS daysfromend, pr.provenance as provenance
FROM ${stats_db_name}.result r
JOIN ${stats_db_name}.project_results pr ON r.id = pr.result
JOIN ${stats_db_name}.project_tmp p ON p.id = pr.id; /*EOS*/
JOIN ${stats_db_name}.project p ON p.id = pr.id; /*EOS*/

View File

@ -5,81 +5,36 @@
-- Datasource table/view and Datasource related tables/views
------------------------------------------------------------
------------------------------------------------------------
DROP TABLE IF EXISTS ${stats_db_name}.datasource_tmp purge; -- /*EOS*/
DROP TABLE IF EXISTS ${stats_db_name}.datasource purge; /*EOS*/
CREATE TABLE ${stats_db_name}.datasource_tmp
(
`id` string,
`name` STRING,
`type` STRING,
`dateofvalidation` STRING,
`yearofvalidation` string,
`harvested` BOOLEAN,
`piwik_id` INT,
`latitude` STRING,
`longitude` STRING,
`websiteurl` STRING,
`compatibility` STRING,
issn_printed STRING,
issn_online STRING
) CLUSTERED BY (id) INTO 100 buckets stored AS orc tblproperties ('transactional' = 'true'); -- /*EOS*/
CREATE TABLE ${stats_db_name}.datasource stored as parquet as
with piwik_datasource as (
select id, split(originalidd, '\\:')[1] as piwik_id
from ${openaire_db_name}.datasource
lateral view explode(originalid) temp as originalidd
where originalidd like "piwik:%"
)
select /*+ COALESCE(100) */
substr(dtrce.id, 4) as id,
case when dtrce.officialname.value='Unknown Repository' then 'Other' else dtrce.officialname.value end as name,
dtrce.datasourcetype.classname as type,
dtrce.dateofvalidation.value as dateofvalidation,
case when dtrce.dateofvalidation.value='-1' then null else date_format(dtrce.dateofvalidation.value, 'yyyy') end as yearofvalidation,
case when res.d_id is null then false else true end as harvested,
case when piwik_d.piwik_id is null then 0 else piwik_d.piwik_id end as piwik_id,
dtrce.latitude.value as latitude,
dtrce.longitude.value as longitude,
dtrce.websiteurl.value as websiteurl,
dtrce.openairecompatibility.classid as compatibility,
dtrce.journal.issnprinted as issn_printed,
dtrce.journal.issnonline as issn_online
from ${openaire_db_name}.datasource dtrce
left outer join (select inst.hostedby.key as d_id from ${openaire_db_name}.result lateral view outer explode (instance) insts as inst) res on res.d_id=dtrce.id
left outer join piwik_datasource piwik_d on piwik_d.id=dtrce.id
where dtrce.datainfo.deletedbyinference = false and dtrce.datainfo.invisible = false; /*EOS*/
-- Insert statement that takes into account the piwik_id of the openAIRE graph
INSERT INTO ${stats_db_name}.datasource_tmp
SELECT substr(d1.id, 4) AS id,
officialname.value AS name,
datasourcetype.classname AS type,
dateofvalidation.value AS dateofvalidation,
date_format(d1.dateofvalidation.value, 'yyyy') AS yearofvalidation,
FALSE AS harvested,
CASE WHEN d2.piwik_id IS NULL THEN 0 ELSE d2.piwik_id END AS piwik_id,
d1.latitude.value AS latitude,
d1.longitude.value AS longitude,
d1.websiteurl.value AS websiteurl,
d1.openairecompatibility.classid AS compatibility,
d1.journal.issnprinted AS issn_printed,
d1.journal.issnonline AS issn_online
FROM ${openaire_db_name}.datasource d1
LEFT OUTER JOIN
(SELECT id, split(originalidd, '\\:')[1] as piwik_id
FROM ${openaire_db_name}.datasource
LATERAL VIEW EXPLODE(originalid) temp AS originalidd
WHERE originalidd like "piwik:%") AS d2
ON d1.id = d2.id
WHERE d1.datainfo.deletedbyinference = FALSE and d1.datainfo.invisible=false; -- /*EOS*/
-- Updating temporary table with everything that is not based on results -> This is done with the following "dual" table.
-- Creating a temporary dual table that will be removed after the following insert
DROP TABLE IF EXISTS ${stats_db_name}.dual purge; -- /*EOS*/
CREATE TABLE ${stats_db_name}.dual ( dummy CHAR(1)); -- /*EOS*/
INSERT INTO ${stats_db_name}.dual VALUES ('X'); -- /*EOS*/
INSERT INTO ${stats_db_name}.datasource_tmp (`id`, `name`, `type`, `dateofvalidation`, `yearofvalidation`, `harvested`,
`piwik_id`, `latitude`, `longitude`, `websiteurl`, `compatibility`, `issn_printed`, `issn_online`)
SELECT 'other',
'Other',
'Repository',
NULL,
NULL,
false,
0,
NULL,
NULL,
NULL,
'unknown',
null,
null
FROM ${stats_db_name}.dual
WHERE 'other' not in (SELECT id FROM ${stats_db_name}.datasource_tmp WHERE name = 'Unknown Repository'); -- /*EOS*/
DROP TABLE ${stats_db_name}.dual; -- /*EOS*/
UPDATE ${stats_db_name}.datasource_tmp SET name='Other' WHERE name = 'Unknown Repository'; -- /*EOS*/
UPDATE ${stats_db_name}.datasource_tmp SET yearofvalidation=null WHERE yearofvalidation = '-1'; -- /*EOS*/
DROP TABLE IF EXISTS ${stats_db_name}.datasource_languages purge; -- /*EOS*/
DROP TABLE IF EXISTS ${stats_db_name}.datasource_languages purge; /*EOS*/
CREATE TABLE ${stats_db_name}.datasource_languages STORED AS PARQUET AS
SELECT substr(d.id, 4) AS id, langs.languages AS language