From 25d0512fbdfdafa79b531cab7460d6326b22609e Mon Sep 17 00:00:00 2001 From: antleb Date: Tue, 20 Apr 2021 01:43:23 +0300 Subject: [PATCH 01/22] code cleanup --- .../graph/stats/oozie_app/scripts/step16.sql | 31 +------------------ .../stats/oozie_app/scripts/step16_5.sql | 5 +-- .../scripts/step20-createMonitorDB.sql | 6 ---- 3 files changed, 2 insertions(+), 40 deletions(-) diff --git a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step16.sql b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step16.sql index 833deff73..481fd9e8c 100644 --- a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step16.sql +++ b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step16.sql @@ -59,33 +59,4 @@ from result_gold union all select distinct r.id, false as gold from ${stats_db_name}.result r -where r.id not in (select id from result_gold); - --- shortcut result-country through the organization affiliation -create table ${stats_db_name}.result_affiliated_country as -select r.id as id, o.country as country -from ${stats_db_name}.result r -join ${stats_db_name}.result_organization ro on ro.id=r.id -join ${stats_db_name}.organization o on o.id=ro.organization -where o.country is not null and o.country!=''; - --- shortcut result-country through datasource of deposition -create table ${stats_db_name}.result_deposited_country as -select r.id as id, o.country as country -from ${stats_db_name}.result r -join ${stats_db_name}.result_datasources rd on rd.id=r.id -join ${stats_db_name}.datasource d on d.id=rd.datasource -join ${stats_db_name}.datasource_organizations dor on dor.id=d.id -join ${stats_db_name}.organization o on o.id=dor.organization -where o.country is not null and o.country!=''; - --- ANALYZE TABLE ${stats_db_name}.result_peerreviewed COMPUTE STATISTICS; --- ANALYZE TABLE ${stats_db_name}.result_peerreviewed COMPUTE STATISTICS FOR COLUMNS; --- ANALYZE TABLE ${stats_db_name}.result_greenoa COMPUTE STATISTICS; --- ANALYZE TABLE ${stats_db_name}.result_greenoa COMPUTE STATISTICS FOR COLUMNS; --- ANALYZE TABLE ${stats_db_name}.result_gold COMPUTE STATISTICS; --- ANALYZE TABLE ${stats_db_name}.result_gold COMPUTE STATISTICS FOR COLUMNS; --- ANALYZE TABLE ${stats_db_name}.result_affiliated_country COMPUTE STATISTICS; --- ANALYZE TABLE ${stats_db_name}.result_affiliated_country COMPUTE STATISTICS FOR COLUMNS; --- ANALYZE TABLE ${stats_db_name}.result_deposited_country COMPUTE STATISTICS; --- ANALYZE TABLE ${stats_db_name}.result_deposited_country COMPUTE STATISTICS FOR COLUMNS; \ No newline at end of file +where r.id not in (select id from result_gold); \ No newline at end of file diff --git a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step16_5.sql b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step16_5.sql index 2bdc263ef..f737c1ea6 100644 --- a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step16_5.sql +++ b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step16_5.sql @@ -52,7 +52,4 @@ LEFT OUTER JOIN ${stats_db_name}.result_gold gold on gold.id=r.id; drop table if exists ${stats_db_name}.result; drop view if exists ${stats_db_name}.result; create table ${stats_db_name}.result stored as parquet as select * from ${stats_db_name}.result_tmp; -drop table ${stats_db_name}.result_tmp; --- --- ANALYZE TABLE ${stats_db_name}.result COMPUTE STATISTICS; --- ANALYZE TABLE ${stats_db_name}.result COMPUTE STATISTICS FOR COLUMNS; \ No newline at end of file +drop table ${stats_db_name}.result_tmp; \ No newline at end of file diff --git a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step20-createMonitorDB.sql b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step20-createMonitorDB.sql index 9477ada12..af5e2a6a4 100644 --- a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step20-createMonitorDB.sql +++ b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step20-createMonitorDB.sql @@ -19,9 +19,6 @@ create table TARGET.result as select * from SOURCE.result r where exists (select 1 from SOURCE.result_concepts rc where rc.id=r.id) ) foo; compute stats TARGET.result; -create table TARGET.result_affiliated_country as select * from SOURCE.result_affiliated_country rac where exists (select 1 from TARGET.result r where r.id=rac.id); -compute stats TARGET.result_affiliated_country; - create table TARGET.result_citations as select * from SOURCE.result_citations orig where exists (select 1 from TARGET.result r where r.id=orig.id); compute stats TARGET.result_citations; @@ -34,9 +31,6 @@ compute stats TARGET.result_concepts; create table TARGET.result_datasources as select * from SOURCE.result_datasources orig where exists (select 1 from TARGET.result r where r.id=orig.id); compute stats TARGET.result_datasources; -create table TARGET.result_deposited_country as select * from SOURCE.result_deposited_country orig where exists (select 1 from TARGET.result r where r.id=orig.id); -compute stats TARGET.result_deposited_country; - create table TARGET.result_fundercount as select * from SOURCE.result_fundercount orig where exists (select 1 from TARGET.result r where r.id=orig.id); compute stats TARGET.result_fundercount; From 625d993cd97b81c05fef8e71b05519a81aa85c18 Mon Sep 17 00:00:00 2001 From: antleb Date: Tue, 20 Apr 2021 02:31:06 +0300 Subject: [PATCH 02/22] added step for observatory db --- .../oa/graph/stats/oozie_app/observatory.sh | 28 ++ .../scripts/step21-createObservatoryDB.sql | 259 ++++++++++++++++++ .../dhp/oa/graph/stats/oozie_app/workflow.xml | 29 +- 3 files changed, 313 insertions(+), 3 deletions(-) create mode 100644 dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/observatory.sh create mode 100644 dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step21-createObservatoryDB.sql diff --git a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/observatory.sh b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/observatory.sh new file mode 100644 index 000000000..ff03bca03 --- /dev/null +++ b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/observatory.sh @@ -0,0 +1,28 @@ +export PYTHON_EGG_CACHE=/home/$(whoami)/.python-eggs +export link_folder=/tmp/impala-shell-python-egg-cache-$(whoami) +if ! [ -L $link_folder ] +then + rm -Rf "$link_folder" + ln -sfn ${PYTHON_EGG_CACHE}${link_folder} ${link_folder} +fi + +export SOURCE=$1 +export TARGET=$2 +export SHADOW=$3 +export SCRIPT_PATH=$4 + +echo "Getting file from " $4 +hdfs dfs -copyToLocal $4 + +echo "Creating observatory database" +impala-shell -q "drop database if exists ${TARGET} cascade" +impala-shell -q "create database if not exists ${TARGET}" +impala-shell -d ${SOURCE} -q "show tables" --delimited | sed "s/\(.*\)/create view ${TARGET}.\1 as select * from ${SOURCE}.\1;/" | impala-shell -f - +cat step21-createObservatoryDB.sql | sed s/SOURCE/$1/g | sed s/TARGET/$2/g1 | impala-shell -f - +echo "Impala shell finished" + +echo "Updating shadow observatory database" +impala-shell -q "create database if not exists ${SHADOW}" +impala-shell -d ${SHADOW} -q "show tables" --delimited | sed "s/^/drop view if exists ${SHADOW}./" | sed "s/$/;/" | impala-shell -f - +impala-shell -d ${TARGET} -q "show tables" --delimited | sed "s/\(.*\)/create view ${SHADOW}.\1 as select * from ${TARGET}.\1;/" | impala-shell -f - +echo "Shadow db ready!" \ No newline at end of file diff --git a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step21-createObservatoryDB.sql b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step21-createObservatoryDB.sql new file mode 100644 index 000000000..40cdf3f6d --- /dev/null +++ b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step21-createObservatoryDB.sql @@ -0,0 +1,259 @@ +create table TARGET.result_affiliated_country stored as parquet as +select count(distinct r.id) as total, r.green, r.gold, case when rl.type is not null then true else false end as licence, + case when pids.pid is not null then true else false end as pid, case when r.access_mode in ('Open Access', 'Open Source') then true else false end as oa, + r.peer_reviewed, r.type, c.code as ccode, c.name as cname +from SOURCE.result r +join SOURCE.result_organization ro on ro.id=r.id +join SOURCE.organization o on o.id=ro.organization +join SOURCE.country c on c.code=o.country and c.continent_name='Europe' +left outer join SOURCE.result_licenses rl on rl.id=r.id +left outer join SOURCE.result_pids pids on pids.id=r.id +group by r.green, r.gold, licence, pid, oa, r.peer_reviewed, r.type, c.code, c.name; + +create table TARGET.result_affiliated_year stored as parquet as +select count(distinct r.id) as total, r.green, r.gold, case when rl.type is not null then true else false end as licence, + case when pids.pid is not null then true else false end as pid, case when r.access_mode in ('Open Access', 'Open Source') then true else false end as oa, r.peer_reviewed, r.type, r.year +from SOURCE.result r +join SOURCE.result_organization ro on ro.id=r.id +join SOURCE.organization o on o.id=ro.organization +join SOURCE.country c on c.code=o.country and c.continent_name='Europe' +left outer join SOURCE.result_licenses rl on rl.id=r.id +left outer join SOURCE.result_pids pids on pids.id=r.id +group by r.green, r.gold, licence, pid, oa, r.peer_reviewed, r.type, r.year; + +create table TARGET.result_affiliated_year_country stored as parquet as +select count(distinct r.id) as total, r.green, r.gold, case when rl.type is not null then true else false end as licence, + case when pids.pid is not null then true else false end as pid, case when r.access_mode in ('Open Access', 'Open Source') then true else false end as oa, + r.peer_reviewed, r.type, r.year, c.code as ccode, c.name as cname +from SOURCE.result r +join SOURCE.result_organization ro on ro.id=r.id +join SOURCE.organization o on o.id=ro.organization +join SOURCE.country c on c.code=o.country and c.continent_name='Europe' +left outer join SOURCE.result_licenses rl on rl.id=r.id +left outer join SOURCE.result_pids pids on pids.id=r.id +group by r.green, r.gold, licence, pid, oa, r.peer_reviewed, r.type, r.year, c.code, c.name; + +create table TARGET.result_affiliated_datasource stored as parquet as +select count(distinct r.id) as total, r.green, r.gold, case when rl.type is not null then true else false end as licence, + case when pids.pid is not null then true else false end as pid, case when r.access_mode in ('Open Access', 'Open Source') then true else false end as oa, r.peer_reviewed, r.type, d.name as dname +from SOURCE.result r +join SOURCE.result_organization ro on ro.id=r.id +join SOURCE.organization o on o.id=ro.organization +join SOURCE.country c on c.code=o.country and c.continent_name='Europe' +left outer join SOURCE.result_datasources rd on rd.id=r.id +left outer join SOURCE.datasource d on d.id=rd.datasource +left outer join SOURCE.result_licenses rl on rl.id=r.id +left outer join SOURCE.result_pids pids on pids.id=r.id +group by r.green, r.gold, licence, pid, oa, r.peer_reviewed, r.type, d.name; + +create table TARGET.result_affiliated_datasource_country stored as parquet as +select count(distinct r.id) as total, r.green, r.gold, case when rl.type is not null then true else false end as licence, + case when pids.pid is not null then true else false end as pid, case when r.access_mode in ('Open Access', 'Open Source') then true else false end as oa, + r.peer_reviewed, r.type, d.name as dname, c.code as ccode, c.name as cname +from SOURCE.result r +join SOURCE.result_organization ro on ro.id=r.id +join SOURCE.organization o on o.id=ro.organization +join SOURCE.country c on c.code=o.country and c.continent_name='Europe' +left outer join SOURCE.result_datasources rd on rd.id=r.id +left outer join SOURCE.datasource d on d.id=rd.datasource +left outer join SOURCE.result_licenses rl on rl.id=r.id +left outer join SOURCE.result_pids pids on pids.id=r.id +group by r.green, r.gold, licence, pid, oa, r.peer_reviewed, r.type, d.name, c.code, c.name; + +create table TARGET.result_affiliated_organization stored as parquet as +select count(distinct r.id) as total, r.green, r.gold, case when rl.type is not null then true else false end as licence, + case when pids.pid is not null then true else false end as pid, case when r.access_mode in ('Open Access', 'Open Source') then true else false end as oa, + r.peer_reviewed, r.type, o.name as oname +from SOURCE.result r +join SOURCE.result_organization ro on ro.id=r.id +join SOURCE.organization o on o.id=ro.organization +join SOURCE.country c on c.code=o.country and c.continent_name='Europe' +left outer join SOURCE.result_licenses rl on rl.id=r.id +left outer join SOURCE.result_pids pids on pids.id=r.id +group by r.green, r.gold, licence, pid, oa, r.peer_reviewed, r.type, o.name; + +create table TARGET.result_affiliated_organization_country stored as parquet as +select count(distinct r.id) as total, r.green, r.gold, case when rl.type is not null then true else false end as licence, + case when pids.pid is not null then true else false end as pid, case when r.access_mode in ('Open Access', 'Open Source') then true else false end as oa, + r.peer_reviewed, r.type, o.name as oname, c.code as ccode, c.name as cname +from SOURCE.result r +join SOURCE.result_organization ro on ro.id=r.id +join SOURCE.organization o on o.id=ro.organization +join SOURCE.country c on c.code=o.country and c.continent_name='Europe' +left outer join SOURCE.result_licenses rl on rl.id=r.id +left outer join SOURCE.result_pids pids on pids.id=r.id +group by r.green, r.gold, licence, pid, oa, r.peer_reviewed, r.type, o.name, c.code, c.name; + +create table TARGET.result_affiliated_funder stored as parquet as +select count(distinct r.id) as total, r.green, r.gold, case when rl.type is not null then true else false end as licence, + case when pids.pid is not null then true else false end as pid, case when r.access_mode in ('Open Access', 'Open Source') then true else false end as oa, r.peer_reviewed, r.type, p.funder as pfunder +from SOURCE.result r +join SOURCE.result_organization ro on ro.id=r.id +join SOURCE.organization o on o.id=ro.organization +join SOURCE.country c on c.code=o.country and c.continent_name='Europe' +join SOURCE.result_projects rp on rp.id=r.id +join SOURCE.project p on p.id=rp.project +left outer join SOURCE.result_licenses rl on rl.id=r.id +left outer join SOURCE.result_pids pids on pids.id=r.id +group by r.green, r.gold, licence, pid, oa, r.peer_reviewed, r.type, p.funder; + +create table TARGET.result_affiliated_funder_country stored as parquet as +select count(distinct r.id) as total, r.green, r.gold, case when rl.type is not null then true else false end as licence, + case when pids.pid is not null then true else false end as pid, case when r.access_mode in ('Open Access', 'Open Source') then true else false end as oa, + r.peer_reviewed, r.type, p.funder as pfunder, c.code as ccode, c.name as cname +from SOURCE.result r +join SOURCE.result_organization ro on ro.id=r.id +join SOURCE.organization o on o.id=ro.organization +join SOURCE.country c on c.code=o.country and c.continent_name='Europe' +join SOURCE.result_projects rp on rp.id=r.id +join SOURCE.project p on p.id=rp.project +left outer join SOURCE.result_licenses rl on rl.id=r.id +left outer join SOURCE.result_pids pids on pids.id=r.id +group by r.green, r.gold, licence, pid, oa, r.peer_reviewed, r.type, p.funder, c.code, c.name; + +create table TARGET.result_deposited_country stored as parquet as +select count(distinct r.id) as total, r.green, r.gold, case when rl.type is not null then true else false end as licence, + case when pids.pid is not null then true else false end as pid, case when r.access_mode in ('Open Access', 'Open Source') then true else false end as oa, + r.peer_reviewed, r.type, c.code as ccode, c.name as cname +from SOURCE.result r +join SOURCE.result_datasources rd on rd.id=r.id +join SOURCE.datasource d on d.id=rd.datasource and d.type in ('Institutional Repository','Data Repository', 'Repository', 'Publication Repository') +join SOURCE.datasource_organizations dor on dor.id=d.id +join SOURCE.organization o on o.id=dor.organization +join SOURCE.country c on c.code=o.country and c.continent_name='Europe' +left outer join SOURCE.result_licenses rl on rl.id=r.id +left outer join SOURCE.result_pids pids on pids.id=r.id +group by r.green, r.gold, licence, pid, oa, r.peer_reviewed, r.type, c.code, c.name; + +create table TARGET.result_deposited_year stored as parquet as +select count(distinct r.id) as total, r.green, r.gold, case when rl.type is not null then true else false end as licence, + case when pids.pid is not null then true else false end as pid, case when r.access_mode in ('Open Access', 'Open Source') then true else false end as oa, r.peer_reviewed, r.type, r.year +from SOURCE.result r +join SOURCE.result_datasources rd on rd.id=r.id +join SOURCE.datasource d on d.id=rd.datasource and d.type in ('Institutional Repository','Data Repository', 'Repository', 'Publication Repository') +join SOURCE.datasource_organizations dor on dor.id=d.id +join SOURCE.organization o on o.id=dor.organization +join SOURCE.country c on c.code=o.country and c.continent_name='Europe' +left outer join SOURCE.result_licenses rl on rl.id=r.id +left outer join SOURCE.result_pids pids on pids.id=r.id +group by r.green, r.gold, licence, pid, oa, r.peer_reviewed, r.type, r.year; + +create table TARGET.result_deposited_year_country stored as parquet as +select count(distinct r.id) as total, r.green, r.gold, case when rl.type is not null then true else false end as licence, + case when pids.pid is not null then true else false end as pid, case when r.access_mode in ('Open Access', 'Open Source') then true else false end as oa, + r.peer_reviewed, r.type, r.year, c.code as ccode, c.name as cname +from SOURCE.result r +join SOURCE.result_datasources rd on rd.id=r.id +join SOURCE.datasource d on d.id=rd.datasource and d.type in ('Institutional Repository','Data Repository', 'Repository', 'Publication Repository') +join SOURCE.datasource_organizations dor on dor.id=d.id +join SOURCE.organization o on o.id=dor.organization +join SOURCE.country c on c.code=o.country and c.continent_name='Europe' +left outer join SOURCE.result_licenses rl on rl.id=r.id +left outer join SOURCE.result_pids pids on pids.id=r.id +group by r.green, r.gold, licence, pid, oa, r.peer_reviewed, r.type, r.year, c.code, c.name; + +create table TARGET.result_deposited_datasource stored as parquet as +select count(distinct r.id) as total, r.green, r.gold, case when rl.type is not null then true else false end as licence, + case when pids.pid is not null then true else false end as pid, case when r.access_mode in ('Open Access', 'Open Source') then true else false end as oa, + r.peer_reviewed, r.type, d.name as dname +from SOURCE.result r +join SOURCE.result_datasources rd on rd.id=r.id +join SOURCE.datasource d on d.id=rd.datasource and d.type in ('Institutional Repository','Data Repository', 'Repository', 'Publication Repository') +join SOURCE.datasource_organizations dor on dor.id=d.id +join SOURCE.organization o on o.id=dor.organization +join SOURCE.country c on c.code=o.country and c.continent_name='Europe' +left outer join SOURCE.result_licenses rl on rl.id=r.id +left outer join SOURCE.result_pids pids on pids.id=r.id +group by r.green, r.gold, licence, pid, oa, r.peer_reviewed, r.type, d.name; + +create table TARGET.result_deposited_datasource_country stored as parquet as +select count(distinct r.id) as total, r.green, r.gold, case when rl.type is not null then true else false end as licence, + case when pids.pid is not null then true else false end as pid, case when r.access_mode in ('Open Access', 'Open Source') then true else false end as oa, + r.peer_reviewed, r.type, d.name as dname, c.code as ccode, c.name as cname +from SOURCE.result r +join SOURCE.result_datasources rd on rd.id=r.id +join SOURCE.datasource d on d.id=rd.datasource and d.type in ('Institutional Repository','Data Repository', 'Repository', 'Publication Repository') +join SOURCE.datasource_organizations dor on dor.id=d.id +join SOURCE.organization o on o.id=dor.organization +join SOURCE.country c on c.code=o.country and c.continent_name='Europe' +left outer join SOURCE.result_licenses rl on rl.id=r.id +left outer join SOURCE.result_pids pids on pids.id=r.id +group by r.green, r.gold, licence, pid, oa, r.peer_reviewed, r.type, d.name, c.code, c.name; + +create table TARGET.result_deposited_organization stored as parquet as +select count(distinct r.id) as total, r.green, r.gold, case when rl.type is not null then true else false end as licence, + case when pids.pid is not null then true else false end as pid, case when r.access_mode in ('Open Access', 'Open Source') then true else false end as oa, r.peer_reviewed, r.type, o.name as oname +from SOURCE.result r +join SOURCE.result_datasources rd on rd.id=r.id +join SOURCE.datasource d on d.id=rd.datasource and d.type in ('Institutional Repository','Data Repository', 'Repository', 'Publication Repository') +join SOURCE.datasource_organizations dor on dor.id=d.id +join SOURCE.organization o on o.id=dor.organization +join SOURCE.country c on c.code=o.country and c.continent_name='Europe' +left outer join SOURCE.result_licenses rl on rl.id=r.id +left outer join SOURCE.result_pids pids on pids.id=r.id +group by r.green, r.gold, licence, pid, oa, r.peer_reviewed, r.type, o.name; + +create table TARGET.result_deposited_organization_country stored as parquet as +select count(distinct r.id) as total, r.green, r.gold, case when rl.type is not null then true else false end as licence, + case when pids.pid is not null then true else false end as pid, case when r.access_mode in ('Open Access', 'Open Source') then true else false end as oa, + r.peer_reviewed, r.type, o.name as oname, c.code as ccode, c.name as cname +from SOURCE.result r +join SOURCE.result_datasources rd on rd.id=r.id +join SOURCE.datasource d on d.id=rd.datasource and d.type in ('Institutional Repository','Data Repository', 'Repository', 'Publication Repository') +join SOURCE.datasource_organizations dor on dor.id=d.id +join SOURCE.organization o on o.id=dor.organization +join SOURCE.country c on c.code=o.country and c.continent_name='Europe' +left outer join SOURCE.result_licenses rl on rl.id=r.id +left outer join SOURCE.result_pids pids on pids.id=r.id +group by r.green, r.gold, licence, pid, oa, r.peer_reviewed, r.type, o.name, c.code, c.name; + +create table TARGET.result_deposited_funder stored as parquet as +select count(distinct r.id) as total, r.green, r.gold, case when rl.type is not null then true else false end as licence, + case when pids.pid is not null then true else false end as pid, case when r.access_mode in ('Open Access', 'Open Source') then true else false end as oa, + r.peer_reviewed, r.type, p.funder as pfunder +from SOURCE.result r +join SOURCE.result_datasources rd on rd.id=r.id +join SOURCE.datasource d on d.id=rd.datasource and d.type in ('Institutional Repository','Data Repository', 'Repository', 'Publication Repository') +join SOURCE.datasource_organizations dor on dor.id=d.id +join SOURCE.organization o on o.id=dor.organization +join SOURCE.country c on c.code=o.country and c.continent_name='Europe' +join SOURCE.result_projects rp on rp.id=r.id +join SOURCE.project p on p.id=rp.project +left outer join SOURCE.result_licenses rl on rl.id=r.id +left outer join SOURCE.result_pids pids on pids.id=r.id +group by r.green, r.gold, licence, pid, oa, r.peer_reviewed, r.type, p.funder; + +create table TARGET.result_deposited_funder_country stored as parquet as +select count(distinct r.id) as total, r.green, r.gold, case when rl.type is not null then true else false end as licence, + case when pids.pid is not null then true else false end as pid, case when r.access_mode in ('Open Access', 'Open Source') then true else false end as oa, + r.peer_reviewed, r.type, p.funder as pfunder, c.code as ccode, c.name as cname +from SOURCE.result r +join SOURCE.result_datasources rd on rd.id=r.id +join SOURCE.datasource d on d.id=rd.datasource and d.type in ('Institutional Repository','Data Repository', 'Repository', 'Publication Repository') +join SOURCE.datasource_organizations dor on dor.id=d.id +join SOURCE.organization o on o.id=dor.organization +join SOURCE.country c on c.code=o.country and c.continent_name='Europe' +join SOURCE.result_projects rp on rp.id=r.id +join SOURCE.project p on p.id=rp.project +left outer join SOURCE.result_licenses rl on rl.id=r.id +left outer join SOURCE.result_pids pids on pids.id=r.id +group by r.green, r.gold, licence, pid, oa, r.peer_reviewed, r.type, p.funder, c.code, c.name; + +compute stats TARGET.result_affiliated_country; +compute stats TARGET.result_affiliated_year; +compute stats TARGET.result_affiliated_year_country; +compute stats TARGET.result_affiliated_datasource; +compute stats TARGET.result_affiliated_datasource_country; +compute stats TARGET.result_affiliated_organization; +compute stats TARGET.result_affiliated_organization_country; +compute stats TARGET.result_affiliated_funder; +compute stats TARGET.result_affiliated_funder_country; +compute stats TARGET.result_deposited_country; +compute stats TARGET.result_deposited_year; +compute stats TARGET.result_deposited_year_country; +compute stats TARGET.result_deposited_datasource; +compute stats TARGET.result_deposited_datasource_country; +compute stats TARGET.result_deposited_organization; +compute stats TARGET.result_deposited_organization_country; +compute stats TARGET.result_deposited_funder; +compute stats TARGET.result_deposited_funder_country; diff --git a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/workflow.xml b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/workflow.xml index 321500e2c..824a8b3c7 100644 --- a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/workflow.xml @@ -25,6 +25,14 @@ monitor_db_shadow_name the name of the shadow monitor db + + observatory_db_name + the target monitor db name + + + observatory_db_shadow_name + the name of the shadow monitor db + stats_tool_api_url The url of the API of the stats tool. Is used to trigger the cache update. @@ -305,11 +313,26 @@ ${wf:appPath()}/scripts/step20-createMonitorDB.sql monitor.sh - + - + + + ${jobTracker} + ${nameNode} + observatory.sh + ${stats_db_name} + ${observatory_db_name} + ${observatory_db_shadow_name} + ${wf:appPath()}/scripts/step21-createObservatoryDB.sql + observatory.sh + + + + + + ${jobTracker} ${nameNode} @@ -322,4 +345,4 @@ - + \ No newline at end of file From 168edcbde32da360c92785852311ed651bb305c9 Mon Sep 17 00:00:00 2001 From: antleb Date: Tue, 18 May 2021 15:23:20 +0300 Subject: [PATCH 03/22] added the final steps for the observatory promote wf and some cleanup --- .../dhp/oa/graph/stats/oozie_app/workflow.xml | 21 ++++++++++++ .../graph/stats/oozie_app/scripts/step12.sql | 32 ------------------- 2 files changed, 21 insertions(+), 32 deletions(-) diff --git a/dhp-workflows/dhp-stats-promote/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/workflow.xml b/dhp-workflows/dhp-stats-promote/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/workflow.xml index 0d8ff7ee3..8286e5039 100644 --- a/dhp-workflows/dhp-stats-promote/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-stats-promote/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/workflow.xml @@ -16,6 +16,14 @@ monitor_db_production_name the name of the monitor public database + + observatory_db_name + the monitor database name + + + observatory_db_production_name + the name of the monitor public database + stats_tool_api_url The url of the API of the stats tool. Is used to trigger the cache promote. @@ -77,6 +85,19 @@ ${monitor_db_production_name} updateProductionViews.sh + + + + + + + ${jobTracker} + ${nameNode} + updateProductionViews.sh + ${observatory_db_name} + ${observatory_db_production_name} + updateProductionViews.sh + diff --git a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step12.sql b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step12.sql index 51d3a73c9..47d147f75 100644 --- a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step12.sql +++ b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step12.sql @@ -45,35 +45,3 @@ FROM ${stats_db_name}.dataset UNION ALL SELECT *, bestlicence AS access_mode FROM ${stats_db_name}.otherresearchproduct; - - -------------------------------------------------------------------------------- --- To see with Antonis if the following is needed and where it should be placed -------------------------------------------------------------------------------- -CREATE TABLE ${stats_db_name}.numbers_country AS -SELECT org.country AS country, count(distinct rd.datasource) AS datasources, count(distinct r.id) AS publications -FROM ${stats_db_name}.result r, - ${stats_db_name}.result_datasources rd, - ${stats_db_name}.datasource d, - ${stats_db_name}.datasource_organizations dor, - ${stats_db_name}.organization org -WHERE r.id = rd.id - AND rd.datasource = d.id - AND d.id = dor.id - AND dor.organization = org.id - AND r.type = 'publication' - AND r.bestlicence = 'Open Access' -GROUP BY org.country; - --- ANALYZE TABLE ${stats_db_name}.datasource COMPUTE STATISTICS; --- ANALYZE TABLE ${stats_db_name}.datasource COMPUTE STATISTICS FOR COLUMNS; --- ANALYZE TABLE ${stats_db_name}.publication COMPUTE STATISTICS; --- ANALYZE TABLE ${stats_db_name}.publication COMPUTE STATISTICS FOR COLUMNS; --- ANALYZE TABLE ${stats_db_name}.dataset COMPUTE STATISTICS; --- ANALYZE TABLE ${stats_db_name}.dataset COMPUTE STATISTICS FOR COLUMNS; --- ANALYZE TABLE ${stats_db_name}.software COMPUTE STATISTICS; --- ANALYZE TABLE ${stats_db_name}.software COMPUTE STATISTICS FOR COLUMNS; --- ANALYZE TABLE ${stats_db_name}.otherresearchproduct COMPUTE STATISTICS; --- ANALYZE TABLE ${stats_db_name}.otherresearchproduct COMPUTE STATISTICS FOR COLUMNS; --- ANALYZE TABLE ${stats_db_name}.numbers_country COMPUTE STATISTICS; --- ANALYZE TABLE ${stats_db_name}.numbers_country COMPUTE STATISTICS FOR COLUMNS; \ No newline at end of file From d413b24611765245315203f862566aa24e07d973 Mon Sep 17 00:00:00 2001 From: antleb Date: Thu, 10 Jun 2021 02:35:46 +0300 Subject: [PATCH 04/22] added instances, orgs for monitor, totalcost for projects, apcs --- .../graph/stats/oozie_app/scripts/step11.sql | 3 ++- .../stats/oozie_app/scripts/step16_6.sql | 25 +++++++++++++------ .../scripts/step20-createMonitorDB.sql | 8 +++++- .../graph/stats/oozie_app/scripts/step6.sql | 19 ++++---------- 4 files changed, 32 insertions(+), 23 deletions(-) diff --git a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step11.sql b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step11.sql index d26169fd6..b977302df 100644 --- a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step11.sql +++ b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step11.sql @@ -41,7 +41,8 @@ SELECT p.id, CASE WHEN prr2.id IS NULL THEN 0 ELSE prr2.daysForlastPub END AS daysforlastpub, CASE WHEN prr2.id IS NULL THEN 0 ELSE prr2.dp END AS delayedpubs, p.callidentifier, - p.code + p.code, + p.totalcost FROM ${stats_db_name}.project_tmp p LEFT JOIN (SELECT pr.id, count(distinct pr.result) AS np FROM ${stats_db_name}.project_results pr diff --git a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step16_6.sql b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step16_6.sql index 528aaff52..5280cf3e3 100644 --- a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step16_6.sql +++ b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step16_6.sql @@ -30,10 +30,21 @@ from rcount group by rcount.pid; create view ${stats_db_name}.rndexpenditure as select * from stats_ext.rndexpediture; --- --- ANALYZE TABLE ${stats_db_name}.result_projectcount COMPUTE STATISTICS; --- ANALYZE TABLE ${stats_db_name}.result_projectcount COMPUTE STATISTICS FOR COLUMNS; --- ANALYZE TABLE ${stats_db_name}.result_fundercount COMPUTE STATISTICS; --- ANALYZE TABLE ${stats_db_name}.result_fundercount COMPUTE STATISTICS FOR COLUMNS; --- ANALYZE TABLE ${stats_db_name}.project_resultcount COMPUTE STATISTICS; --- ANALYZE TABLE ${stats_db_name}.project_resultcount COMPUTE STATISTICS FOR COLUMNS; \ No newline at end of file + +create table ${stats_db_name}.result_instance as +select distinct r.* +from ( + select substr(r.id, 4) as id, inst.accessright.classname as accessright, substr(inst.collectedfrom.key, 4) as collectedfrom, + substr(inst.hostedby.key, 4) as hostedby, inst.dateofacceptance.value as dateofacceptance, inst.license.value as license, p.qualifier.classname as pidtype, p.value as pid + from ${openaire_db_name}.result r lateral view explode(r.instance) instances as inst lateral view explode(inst.pid) pids as p) r +join ${stats_db_name}.result res on res.id=r.id; + +create table ${stats_db_name}.result_apc as +select r.id, r.amount, r.currency +from ( + select substr(r.id, 4) as id, inst.processingchargeamount.value as amount, inst.processingchargecurrency.value as currency + from ${openaire_db_name}.result r lateral view explode(r.instance) instances as inst) r +join ${stats_db_name}.result res on res.id=r.id +where r.amount is not null; + +create view ${stats_db_name}.issn_gold_oa_dataset as select * from stats_ext.issn_gold_oa_dataset; \ No newline at end of file diff --git a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step20-createMonitorDB.sql b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step20-createMonitorDB.sql index af5e2a6a4..74aa8536c 100644 --- a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step20-createMonitorDB.sql +++ b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step20-createMonitorDB.sql @@ -16,7 +16,13 @@ create table TARGET.result as select distinct * from ( select * from SOURCE.result r where exists (select 1 from SOURCE.result_projects rp join SOURCE.project p on rp.project=p.id where rp.id=r.id) union all - select * from SOURCE.result r where exists (select 1 from SOURCE.result_concepts rc where rc.id=r.id) ) foo; + select * from SOURCE.result r where exists (select 1 from SOURCE.result_concepts rc where rc.id=r.id) + union all + select * from SOURCE.result r where exists (select 1 from SOURCE.result_project rp join SOURCE.project p on p.id=rp.project join SOURCE.project_organizations po on po.id=p.id join SOURCE.organization o on o.id=po.organization where ro.id=r.id and o.name in ( + 'GEORG-AUGUST-UNIVERSITAT GOTTINGEN STIFTUNG OFFENTLICHEN RECHTS', + 'ATHINA-EREVNITIKO KENTRO KAINOTOMIAS STIS TECHNOLOGIES TIS PLIROFORIAS, TON EPIKOINONION KAI TIS GNOSIS', + 'Consiglio Nazionale delle Ricerche', + 'Universidade do Minho') )) foo; compute stats TARGET.result; create table TARGET.result_citations as select * from SOURCE.result_citations orig where exists (select 1 from TARGET.result r where r.id=orig.id); diff --git a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step6.sql b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step6.sql index 23ef03bc9..5d81e97bb 100644 --- a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step6.sql +++ b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step6.sql @@ -39,7 +39,8 @@ CREATE TABLE ${stats_db_name}.project_tmp daysforlastpub INT, delayedpubs INT, callidentifier STRING, - code STRING + code STRING, + totalcost FLOAT ) CLUSTERED BY (id) INTO 100 buckets stored AS orc tblproperties ('transactional' = 'true'); INSERT INTO ${stats_db_name}.project_tmp @@ -62,7 +63,8 @@ SELECT substr(p.id, 4) AS id, 0 AS daysforlastpub, 0 AS delayedpubs, p.callidentifier.value AS callidentifier, - p.code.value AS code + p.code.value AS code, + p.totalcost AS totalcost FROM ${openaire_db_name}.project p WHERE p.datainfo.deletedbyinference = false; @@ -70,15 +72,4 @@ create table ${stats_db_name}.funder as select distinct xpath_string(fund, '//funder/id') as id, xpath_string(fund, '//funder/name') as name, xpath_string(fund, '//funder/shortname') as shortname -from ${openaire_db_name}.project p lateral view explode(p.fundingtree.value) fundingtree as fund; - --- ANALYZE TABLE ${stats_db_name}.project_oids COMPUTE STATISTICS; --- ANALYZE TABLE ${stats_db_name}.project_oids COMPUTE STATISTICS FOR COLUMNS; --- ANALYZE TABLE ${stats_db_name}.project_organizations COMPUTE STATISTICS; --- ANALYZE TABLE ${stats_db_name}.project_organizations COMPUTE STATISTICS FOR COLUMNS; --- ANALYZE TABLE ${stats_db_name}.project_results COMPUTE STATISTICS; --- ANALYZE TABLE ${stats_db_name}.project_results COMPUTE STATISTICS FOR COLUMNS; --- ANALYZE TABLE ${stats_db_name}.project_tmp COMPUTE STATISTICS; --- ANALYZE TABLE ${stats_db_name}.project_tmp COMPUTE STATISTICS FOR COLUMNS; --- ANALYZE TABLE ${stats_db_name}.funder COMPUTE STATISTICS; --- ANALYZE TABLE ${stats_db_name}.funder COMPUTE STATISTICS FOR COLUMNS; \ No newline at end of file +from ${openaire_db_name}.project p lateral view explode(p.fundingtree.value) fundingtree as fund; \ No newline at end of file From 75780fc63642073bba62edd6ffa6aab84a89bc23 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 14 Jun 2021 09:45:07 +0200 Subject: [PATCH 05/22] extraction of the tar for the dump of crossref, and creation of the dataset --- .../crossref/ExtractCrossrefRecords.java | 65 +++++++++++++++++++ .../GenerateCrossrefDatasetSpark.scala | 61 +++++++++++++++++ 2 files changed, 126 insertions(+) create mode 100644 dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/ExtractCrossrefRecords.java create mode 100644 dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDatasetSpark.scala diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/ExtractCrossrefRecords.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/ExtractCrossrefRecords.java new file mode 100644 index 000000000..f3846f5f7 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/ExtractCrossrefRecords.java @@ -0,0 +1,65 @@ + +package eu.dnetlib.doiboost.crossref; + +import java.io.BufferedOutputStream; +import java.net.URI; +import java.util.zip.GZIPOutputStream; + +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.mortbay.log.Log; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; + +public class ExtractCrossrefRecords { + public static void main(String[] args) throws Exception { + String hdfsServerUri; + String workingPath; + String crossrefFileNameTarGz; + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + ExtractCrossrefRecords.class + .getResourceAsStream( + "/eu/dnetlib/dhp/doiboost/crossref_dump_reader.json"))); + parser.parseArgument(args); + hdfsServerUri = parser.get("hdfsServerUri"); + workingPath = parser.get("workingPath"); + crossrefFileNameTarGz = parser.get("crossrefFileNameTarGz"); + + Path hdfsreadpath = new Path(hdfsServerUri.concat(workingPath).concat(crossrefFileNameTarGz)); + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", hdfsServerUri.concat(workingPath)); + conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); + conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); + FileSystem fs = FileSystem.get(URI.create(hdfsServerUri.concat(workingPath)), conf); + FSDataInputStream crossrefFileStream = fs.open(hdfsreadpath); + try (TarArchiveInputStream tais = new TarArchiveInputStream( + new GzipCompressorInputStream(crossrefFileStream))) { + TarArchiveEntry entry = null; + while ((entry = tais.getNextTarEntry()) != null) { + if (entry.isDirectory()) { + } else { + try ( + FSDataOutputStream out = fs + .create(new Path(workingPath.concat("filess/").concat(entry.getName()).concat(".gz"))); + GZIPOutputStream gzipOs = new GZIPOutputStream(new BufferedOutputStream(out))) { + + IOUtils.copy(tais, gzipOs); + + } + + } + } + } + Log.info("Crossref dump reading completed"); + + } +} diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDatasetSpark.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDatasetSpark.scala new file mode 100644 index 000000000..e186c9b8b --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDatasetSpark.scala @@ -0,0 +1,61 @@ +package eu.dnetlib.doiboost.crossref + +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} +import org.slf4j.{Logger, LoggerFactory} +import org.json4s +import org.json4s.DefaultFormats +import org.json4s.JsonAST._ +import org.json4s.jackson.JsonMethods._ + +import scala.io.Source + +object GenerateCrossrefDatasetSpark { + + + val log: Logger = LoggerFactory.getLogger(GenerateCrossrefDatasetSpark.getClass) + + implicit val mrEncoder: Encoder[CrossrefDT] = Encoders.kryo[CrossrefDT] + + def extractDump(input:String):List[String] = { + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + lazy val json: json4s.JValue = parse(input) + + val a = (json \ "items").extract[JArray] + a.arr.map(s => compact(render(s))) + } + + def crossrefElement(meta: String): CrossrefDT = { + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + lazy val json: json4s.JValue = parse(meta) + val doi:String = (json \ "DOI").extract[String] + val timestamp: Long = (json \ "indexed" \ "timestamp").extract[Long] + new CrossrefDT(doi, meta, timestamp) + + } + + def main(args: Array[String]): Unit = { + val conf = new SparkConf + val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/crossref_dump_reader/generate_dataset_params.json")).mkString) + parser.parseArgument(args) + val master = parser.get("master") + val sourcePath = parser.get("sourcePath") + val targetPath = parser.get("targetPath") + + val spark: SparkSession = SparkSession.builder().config(conf) + .appName(GenerateCrossrefDatasetSpark.getClass.getSimpleName) + .master(master) + .getOrCreate() + + import spark.implicits._ + val sc: SparkContext = spark.sparkContext + + + sc.wholeTextFiles(sourcePath,2000).flatMap(d =>extractDump(d._2)) + .map(meta => crossrefElement(meta)) + .toDS() + .write.mode(SaveMode.Overwrite).save(targetPath) + + } +} From 0f1acdf6b6b28e6460b87c06ddd6e181b19c049d Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 14 Jun 2021 10:08:55 +0200 Subject: [PATCH 06/22] workflow and parameter --- .../generate_dataset_params.json | 21 ++++++ .../oozie_app/config-default.xml | 42 ++++++++++++ .../oozie_app/workflow.xml | 68 +++++++++++++++++++ 3 files changed, 131 insertions(+) create mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader/generate_dataset_params.json create mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader/oozie_app/workflow.xml diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader/generate_dataset_params.json b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader/generate_dataset_params.json new file mode 100644 index 000000000..63e080337 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader/generate_dataset_params.json @@ -0,0 +1,21 @@ +[ + { + "paramName": "s", + "paramLongName": "sourcePath", + "paramDescription": "the source mdstore path", + "paramRequired": true + }, + + { + "paramName": "t", + "paramLongName": "targetPath", + "paramDescription": "the target mdstore path", + "paramRequired": true + }, + { + "paramName": "m", + "paramLongName": "master", + "paramDescription": "the master name", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader/oozie_app/config-default.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader/oozie_app/config-default.xml new file mode 100644 index 000000000..508202e30 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader/oozie_app/config-default.xml @@ -0,0 +1,42 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + oozie.launcher.mapreduce.user.classpath.first + true + + + hive_metastore_uris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + spark2YarnHistoryServerAddress + http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089 + + + spark2EventLogDir + /user/spark/spark2ApplicationHistory + + + spark2ExtraListeners + "com.cloudera.spark.lineage.NavigatorAppListener" + + + spark2SqlQueryExecutionListeners + "com.cloudera.spark.lineage.NavigatorQueryListener" + + \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader/oozie_app/workflow.xml new file mode 100644 index 000000000..fdd4218d0 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader/oozie_app/workflow.xml @@ -0,0 +1,68 @@ + + + + + + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + ${jobTracker} + ${nameNode} + eu.dnetlib.doiboost.crossref.ExtractCrossrefRecords + --hdfsServerUri${nameNode} + --workingPath/data/doiboost/crossref/ + --crossrefFileNameTarGzcrossref.tar.gz + + + + + + + + yarn-cluster + cluster + SparkCreateCrossredDataset + eu.dnetlib.doiboost.crossref.GenerateCrossrefDatasetSpark + dhp-doiboost-${projectVersion}.jar + + --conf spark.dynamicAllocation.enabled=true + --conf spark.dynamicAllocation.maxExecutors=20 + --executor-memory=6G + --driver-memory=7G + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + + --masteryarn-cluster + --sourcePath/data/doiboost/crossref/filess + --targetPath/tmp/miriam/crossref/crossrefDataset + + + + + + + + \ No newline at end of file From 8873e6b6d14886fb0344df248c9333c04545ce6c Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 14 Jun 2021 10:15:57 +0200 Subject: [PATCH 07/22] workflow and parameter --- .../eu/dnetlib/dhp/doiboost/crossref_dump_reader.json | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader.json diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader.json b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader.json new file mode 100644 index 000000000..98866e62d --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader.json @@ -0,0 +1,7 @@ +[ + {"paramName":"n", "paramLongName":"hdfsServerUri", "paramDescription": "the server uri", "paramRequired": true}, + {"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the default work path", "paramRequired": true}, + {"paramName":"f", "paramLongName":"crossrefFileNameTarGz", "paramDescription": "the name of the activities orcid file", "paramRequired": true}, + {"paramName":"issm", "paramLongName":"isSparkSessionManaged", "paramDescription": "the name of the activities orcid file", "paramRequired": false} + +] \ No newline at end of file From 83132ee99afda95926c25d3d087c7bc857350460 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Mon, 14 Jun 2021 11:57:00 +0200 Subject: [PATCH 08/22] fixed a problem with empty mdstore list --- .../raw/MigrateHdfsMdstoresApplication.java | 27 +++++++++++-------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java index f4e783edc..31527283c 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java @@ -14,6 +14,8 @@ import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.SequenceFileOutputFormat; @@ -52,9 +54,8 @@ public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString( - MigrateHdfsMdstoresApplication.class - .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/migrate_hdfs_mstores_parameters.json"))); + .toString(MigrateHdfsMdstoresApplication.class + .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/migrate_hdfs_mstores_parameters.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional @@ -94,14 +95,18 @@ public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication .filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration())) .toArray(size -> new String[size]); - spark - .read() - .parquet(validPaths) - .map((MapFunction) r -> enrichRecord(r), Encoders.STRING()) - .toJavaRDD() - .mapToPair(xml -> new Tuple2<>(new Text(UUID.randomUUID() + ":" + type), new Text(xml))) - // .coalesce(1) - .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); + if (validPaths.length > 0) { + spark + .read() + .parquet(validPaths) + .map((MapFunction) r -> enrichRecord(r), Encoders.STRING()) + .toJavaRDD() + .mapToPair(xml -> new Tuple2<>(new Text(UUID.randomUUID() + ":" + type), new Text(xml))) + // .coalesce(1) + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); + } else { + FileSystem.get(sc.hadoopConfiguration()).createNewFile(new Path(outputPath)); + } } private static String enrichRecord(final Row r) { From ada063ce702373ffc8afc7e5fc41a3188c42a9ec Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Mon, 14 Jun 2021 12:04:47 +0200 Subject: [PATCH 09/22] fixed a problem with empty mdstore list (2) --- .../dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java index 31527283c..5109a0763 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java @@ -14,8 +14,6 @@ import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.SequenceFileOutputFormat; @@ -105,7 +103,10 @@ public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication // .coalesce(1) .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); } else { - FileSystem.get(sc.hadoopConfiguration()).createNewFile(new Path(outputPath)); + spark.emptyDataFrame() + .toJavaRDD() + .mapToPair(xml -> new Tuple2<>(new Text(), new Text())) + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); } } From 93efe4de823fa03d27d9ec363b8df8489e3d9a72 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 14 Jun 2021 13:39:40 +0200 Subject: [PATCH 10/22] split the construction of crossref dataset in two parts. This one just unpacks the tar entries --- ....scala => UnpackCrossrefDumpEntries.scala} | 25 +++++++------------ 1 file changed, 9 insertions(+), 16 deletions(-) rename dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/{GenerateCrossrefDatasetSpark.scala => UnpackCrossrefDumpEntries.scala} (60%) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDatasetSpark.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/UnpackCrossrefDumpEntries.scala similarity index 60% rename from dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDatasetSpark.scala rename to dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/UnpackCrossrefDumpEntries.scala index e186c9b8b..4f8189cf3 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDatasetSpark.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/UnpackCrossrefDumpEntries.scala @@ -1,6 +1,7 @@ package eu.dnetlib.doiboost.crossref import eu.dnetlib.dhp.application.ArgumentApplicationParser +import org.apache.hadoop.io.compress.GzipCodec import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} import org.slf4j.{Logger, LoggerFactory} @@ -11,12 +12,12 @@ import org.json4s.jackson.JsonMethods._ import scala.io.Source -object GenerateCrossrefDatasetSpark { +object UnpackCrossrefDumpEntries { - val log: Logger = LoggerFactory.getLogger(GenerateCrossrefDatasetSpark.getClass) + val log: Logger = LoggerFactory.getLogger(UnpackCrossrefDumpEntries.getClass) + - implicit val mrEncoder: Encoder[CrossrefDT] = Encoders.kryo[CrossrefDT] def extractDump(input:String):List[String] = { implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats @@ -26,14 +27,7 @@ object GenerateCrossrefDatasetSpark { a.arr.map(s => compact(render(s))) } - def crossrefElement(meta: String): CrossrefDT = { - implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats - lazy val json: json4s.JValue = parse(meta) - val doi:String = (json \ "DOI").extract[String] - val timestamp: Long = (json \ "indexed" \ "timestamp").extract[Long] - new CrossrefDT(doi, meta, timestamp) - } def main(args: Array[String]): Unit = { val conf = new SparkConf @@ -44,18 +38,17 @@ object GenerateCrossrefDatasetSpark { val targetPath = parser.get("targetPath") val spark: SparkSession = SparkSession.builder().config(conf) - .appName(GenerateCrossrefDatasetSpark.getClass.getSimpleName) + .appName(UnpackCrossrefDumpEntries.getClass.getSimpleName) .master(master) .getOrCreate() - import spark.implicits._ + val sc: SparkContext = spark.sparkContext - sc.wholeTextFiles(sourcePath,2000).flatMap(d =>extractDump(d._2)) - .map(meta => crossrefElement(meta)) - .toDS() - .write.mode(SaveMode.Overwrite).save(targetPath) + sc.wholeTextFiles(sourcePath,6000).flatMap(d =>extractDump(d._2)) + .saveAsTextFile(targetPath, classOf[GzipCodec]); + } } From ce0cfd79e083d9fdc420620bdc5b9eaf14b4fdda Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 14 Jun 2021 13:40:19 +0200 Subject: [PATCH 11/22] creates the crossref dataset used for doiboost --- .../crossref/GenerateCrossrefDataset.scala | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala new file mode 100644 index 000000000..3b60a9095 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala @@ -0,0 +1,55 @@ +package eu.dnetlib.doiboost.crossref + +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.doiboost.crossref.UnpackCrossrefDumpEntries.getClass +import org.apache.hadoop.io.compress.GzipCodec +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} +import org.json4s +import org.json4s.DefaultFormats +import org.json4s.jackson.JsonMethods.parse +import org.slf4j.{Logger, LoggerFactory} + +import scala.io.Source + +object GenerateCrossrefDataset { + + val log: Logger = LoggerFactory.getLogger(GenerateCrossrefDataset.getClass) + + implicit val mrEncoder: Encoder[CrossrefDT] = Encoders.kryo[CrossrefDT] + + + + def crossrefElement(meta: String): CrossrefDT = { + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + lazy val json: json4s.JValue = parse(meta) + val doi:String = (json \ "DOI").extract[String] + val timestamp: Long = (json \ "indexed" \ "timestamp").extract[Long] + new CrossrefDT(doi, meta, timestamp) + + } + + def main(args: Array[String]): Unit = { + val conf = new SparkConf + val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/crossref_dump_reader/generate_dataset_params.json")).mkString) + parser.parseArgument(args) + val master = parser.get("master") + val sourcePath = parser.get("sourcePath") + val targetPath = parser.get("targetPath") + + val spark: SparkSession = SparkSession.builder().config(conf) + .appName(UnpackCrossrefDumpEntries.getClass.getSimpleName) + .master(master) + .getOrCreate() + val sc: SparkContext = spark.sparkContext + + import spark.implicits._ + + sc.textFile(sourcePath,6000) + .map(meta => crossrefElement(meta)) + .toDS() + .write.mode(SaveMode.Overwrite).save(targetPath) + + } + +} From d6e21bb6ea91cf6912bd41c2442d06a69307963d Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 14 Jun 2021 17:27:19 +0200 Subject: [PATCH 12/22] creates the crossref dataset used for doiboost together with unpacking part from tar --- .../crossref/GenerateCrossrefDataset.scala | 29 +++++++++++++++---- 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala index 3b60a9095..9d17b5162 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala @@ -1,13 +1,12 @@ package eu.dnetlib.doiboost.crossref import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.doiboost.crossref.UnpackCrossrefDumpEntries.getClass -import org.apache.hadoop.io.compress.GzipCodec import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} import org.json4s import org.json4s.DefaultFormats -import org.json4s.jackson.JsonMethods.parse +import org.json4s.JsonAST.JArray +import org.json4s.jackson.JsonMethods.{compact, parse, render} import org.slf4j.{Logger, LoggerFactory} import scala.io.Source @@ -18,6 +17,13 @@ object GenerateCrossrefDataset { implicit val mrEncoder: Encoder[CrossrefDT] = Encoders.kryo[CrossrefDT] + def extractDump(input:String):List[String] = { + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + lazy val json: json4s.JValue = parse(input) + + val a = (json \ "items").extract[JArray] + a.arr.map(s => compact(render(s))) + } def crossrefElement(meta: String): CrossrefDT = { @@ -25,7 +31,7 @@ object GenerateCrossrefDataset { lazy val json: json4s.JValue = parse(meta) val doi:String = (json \ "DOI").extract[String] val timestamp: Long = (json \ "indexed" \ "timestamp").extract[Long] - new CrossrefDT(doi, meta, timestamp) + CrossrefDT(doi, meta, timestamp) } @@ -45,9 +51,20 @@ object GenerateCrossrefDataset { import spark.implicits._ - sc.textFile(sourcePath,6000) + + def extractDump(input:String):List[String] = { + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + lazy val json: json4s.JValue = parse(input) + + val a = (json \ "items").extract[JArray] + a.arr.map(s => compact(render(s))) + } + + + // sc.textFile(sourcePath,6000) + sc.wholeTextFiles(sourcePath,6000).flatMap(d =>extractDump(d._2)) .map(meta => crossrefElement(meta)) - .toDS() + .toDS()//.as[CrossrefDT] .write.mode(SaveMode.Overwrite).save(targetPath) } From f7379255b690683382afbf623bac8635979658d6 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Tue, 15 Jun 2021 09:22:54 +0200 Subject: [PATCH 13/22] changed the workflow to extract info from the dump --- .../crossref/UnpackCrossrefDumpEntries.scala | 54 ------- .../oozie_app/workflow.xml | 58 +++++-- .../dhp/doiboost/oozie_app/workflow.xml | 151 ++++++++++++------ 3 files changed, 143 insertions(+), 120 deletions(-) delete mode 100644 dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/UnpackCrossrefDumpEntries.scala diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/UnpackCrossrefDumpEntries.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/UnpackCrossrefDumpEntries.scala deleted file mode 100644 index 4f8189cf3..000000000 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/UnpackCrossrefDumpEntries.scala +++ /dev/null @@ -1,54 +0,0 @@ -package eu.dnetlib.doiboost.crossref - -import eu.dnetlib.dhp.application.ArgumentApplicationParser -import org.apache.hadoop.io.compress.GzipCodec -import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} -import org.slf4j.{Logger, LoggerFactory} -import org.json4s -import org.json4s.DefaultFormats -import org.json4s.JsonAST._ -import org.json4s.jackson.JsonMethods._ - -import scala.io.Source - -object UnpackCrossrefDumpEntries { - - - val log: Logger = LoggerFactory.getLogger(UnpackCrossrefDumpEntries.getClass) - - - - def extractDump(input:String):List[String] = { - implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats - lazy val json: json4s.JValue = parse(input) - - val a = (json \ "items").extract[JArray] - a.arr.map(s => compact(render(s))) - } - - - - def main(args: Array[String]): Unit = { - val conf = new SparkConf - val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/crossref_dump_reader/generate_dataset_params.json")).mkString) - parser.parseArgument(args) - val master = parser.get("master") - val sourcePath = parser.get("sourcePath") - val targetPath = parser.get("targetPath") - - val spark: SparkSession = SparkSession.builder().config(conf) - .appName(UnpackCrossrefDumpEntries.getClass.getSimpleName) - .master(master) - .getOrCreate() - - - val sc: SparkContext = spark.sparkContext - - - sc.wholeTextFiles(sourcePath,6000).flatMap(d =>extractDump(d._2)) - .saveAsTextFile(targetPath, classOf[GzipCodec]); - - - } -} diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader/oozie_app/workflow.xml index fdd4218d0..c7dc8bed4 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader/oozie_app/workflow.xml @@ -1,9 +1,13 @@ - - - - + + crossrefDumpPath + the working dir base path + + + inputPathCrossref + the working dir base path + sparkDriverMemory memory for driver process @@ -14,25 +18,27 @@ sparkExecutorCores + 2 number of cores used by single executor - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + ${jobTracker} ${nameNode} eu.dnetlib.doiboost.crossref.ExtractCrossrefRecords --hdfsServerUri${nameNode} - --workingPath/data/doiboost/crossref/ - --crossrefFileNameTarGzcrossref.tar.gz + --crossrefFileNameTarGz${crossrefDumpPath}/crossref.tar.gz + --workingPath${crossrefDumpPath} + --outputPath${workingDir}/files/ @@ -42,24 +48,42 @@ yarn-cluster cluster - SparkCreateCrossredDataset - eu.dnetlib.doiboost.crossref.GenerateCrossrefDatasetSpark + SparkGenerateCrossrefDataset + eu.dnetlib.doiboost.crossref.GenerateCrossrefDataset dhp-doiboost-${projectVersion}.jar - --conf spark.dynamicAllocation.enabled=true - --conf spark.dynamicAllocation.maxExecutors=20 - --executor-memory=6G - --driver-memory=7G + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --masteryarn-cluster - --sourcePath/data/doiboost/crossref/filess - --targetPath/tmp/miriam/crossref/crossrefDataset + --sourcePath${workingDir}/files + --targetPath${inputPathCrossref}/crossref_ds_updated + + + + + + + + + + + + + + + + + diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml index 6cb8a577a..f5ce1c323 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml @@ -41,17 +41,21 @@ the Crossref input path - crossrefTimestamp - Timestamp for the Crossref incremental Harvesting - - - esServer - elasticsearch server url for the Crossref Harvesting - - - esIndex - elasticsearch index name for the Crossref Harvesting + crossrefDumpPath + the Crossref dump path + + + + + + + + + + + + @@ -114,55 +118,104 @@ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + - eu.dnetlib.doiboost.crossref.CrossrefImporter - --targetPath${inputPathCrossref}/index_update - --namenode${nameNode} - --esServer${esServer} - --esIndex${esIndex} - --timestamp${crossrefTimestamp} + ${jobTracker} + ${nameNode} + eu.dnetlib.doiboost.crossref.ExtractCrossrefRecords + --hdfsServerUri${nameNode} + --crossrefFileNameTarGz${crossrefDumpPath}/crossref.tar.gz + --workingPath${crossrefDumpPath} + --outputPath${workingDir}/files - + + + + yarn-cluster + cluster + SparkGenerateCrossrefDataset + eu.dnetlib.doiboost.crossref.GenerateCrossrefDataset + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --masteryarn-cluster + --sourcePath${workingDir}/files/ + --targetPath${inputPathCrossref}/crossref_ds_updated + + + + + + + + + + + + + + + - - - yarn-cluster - cluster - GenerateCrossrefDataset - eu.dnetlib.doiboost.crossref.CrossrefDataset - dhp-doiboost-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --workingPath${inputPathCrossref} - --masteryarn-cluster - - - - + + + + + + + + + + + + + + + + + + + + + + + - - - - - - - - + + + + + + + + + From 6ebc236657163fa4ce445034f15a6e596a5213b9 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Tue, 15 Jun 2021 09:23:24 +0200 Subject: [PATCH 14/22] added needed property: outputPath --- .../eu/dnetlib/dhp/doiboost/crossref_dump_reader.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader.json b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader.json index 98866e62d..b0222d422 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader.json +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader.json @@ -2,6 +2,7 @@ {"paramName":"n", "paramLongName":"hdfsServerUri", "paramDescription": "the server uri", "paramRequired": true}, {"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the default work path", "paramRequired": true}, {"paramName":"f", "paramLongName":"crossrefFileNameTarGz", "paramDescription": "the name of the activities orcid file", "paramRequired": true}, - {"paramName":"issm", "paramLongName":"isSparkSessionManaged", "paramDescription": "the name of the activities orcid file", "paramRequired": false} + {"paramName":"issm", "paramLongName":"isSparkSessionManaged", "paramDescription": "the name of the activities orcid file", "paramRequired": false}, + {"paramName":"o", "paramLongName":"outputPath", "paramDescription": "the name of the activities orcid file", "paramRequired": true} ] \ No newline at end of file From 63d74ee379a5c18e25203fe1bb863033427943be Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Tue, 15 Jun 2021 09:24:11 +0200 Subject: [PATCH 15/22] refactoring --- .../crossref/ExtractCrossrefRecords.java | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/ExtractCrossrefRecords.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/ExtractCrossrefRecords.java index f3846f5f7..c7cae1fcb 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/ExtractCrossrefRecords.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/ExtractCrossrefRecords.java @@ -20,9 +20,7 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser; public class ExtractCrossrefRecords { public static void main(String[] args) throws Exception { - String hdfsServerUri; - String workingPath; - String crossrefFileNameTarGz; + final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils .toString( @@ -30,11 +28,12 @@ public class ExtractCrossrefRecords { .getResourceAsStream( "/eu/dnetlib/dhp/doiboost/crossref_dump_reader.json"))); parser.parseArgument(args); - hdfsServerUri = parser.get("hdfsServerUri"); - workingPath = parser.get("workingPath"); - crossrefFileNameTarGz = parser.get("crossrefFileNameTarGz"); + final String hdfsServerUri = parser.get("hdfsServerUri"); + final String workingPath = parser.get("workingPath"); + final String outputPath = parser.get("outputPath"); + final String crossrefFileNameTarGz = parser.get("crossrefFileNameTarGz"); - Path hdfsreadpath = new Path(hdfsServerUri.concat(workingPath).concat(crossrefFileNameTarGz)); + Path hdfsreadpath = new Path(hdfsServerUri.concat(crossrefFileNameTarGz)); Configuration conf = new Configuration(); conf.set("fs.defaultFS", hdfsServerUri.concat(workingPath)); conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); @@ -45,11 +44,10 @@ public class ExtractCrossrefRecords { new GzipCompressorInputStream(crossrefFileStream))) { TarArchiveEntry entry = null; while ((entry = tais.getNextTarEntry()) != null) { - if (entry.isDirectory()) { - } else { + if (!entry.isDirectory()) { try ( FSDataOutputStream out = fs - .create(new Path(workingPath.concat("filess/").concat(entry.getName()).concat(".gz"))); + .create(new Path(outputPath.concat(entry.getName()).concat(".gz"))); GZIPOutputStream gzipOs = new GZIPOutputStream(new BufferedOutputStream(out))) { IOUtils.copy(tais, gzipOs); From 9f9dd00b94fbe08372ddd5971cf716b1c6a92d55 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Tue, 15 Jun 2021 09:24:46 +0200 Subject: [PATCH 16/22] refactoring --- .../eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala index 9d17b5162..e48f68a7f 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala @@ -44,7 +44,7 @@ object GenerateCrossrefDataset { val targetPath = parser.get("targetPath") val spark: SparkSession = SparkSession.builder().config(conf) - .appName(UnpackCrossrefDumpEntries.getClass.getSimpleName) + .appName(GenerateCrossrefDataset.getClass.getSimpleName) .master(master) .getOrCreate() val sc: SparkContext = spark.sparkContext @@ -61,7 +61,6 @@ object GenerateCrossrefDataset { } - // sc.textFile(sourcePath,6000) sc.wholeTextFiles(sourcePath,6000).flatMap(d =>extractDump(d._2)) .map(meta => crossrefElement(meta)) .toDS()//.as[CrossrefDT] From 4f47ad0891c0f88563b7892a301762ec1ee1b35b Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Tue, 15 Jun 2021 09:28:31 +0200 Subject: [PATCH 17/22] no need to rename the folders, just write in overwrite mode, so I changed the name of the output folder --- .../resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml index f5ce1c323..54bd6abf8 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml @@ -164,7 +164,7 @@ --masteryarn-cluster --sourcePath${workingDir}/files/ - --targetPath${inputPathCrossref}/crossref_ds_updated + --targetPath${inputPathCrossref}/crossref_ds From 66e7ef892f428fa41238fb0a539f7835e96eb98d Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Tue, 15 Jun 2021 11:08:54 +0200 Subject: [PATCH 18/22] changed the parameter name --- .../eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml index 54bd6abf8..6d9042302 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml @@ -139,7 +139,7 @@ --hdfsServerUri${nameNode} --crossrefFileNameTarGz${crossrefDumpPath}/crossref.tar.gz --workingPath${crossrefDumpPath} - --outputPath${workingDir}/files + --outputPath${crossrefDumpPath}/files @@ -163,7 +163,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --masteryarn-cluster - --sourcePath${workingDir}/files/ + --sourcePath${crossrefDumpPath}/files/ --targetPath${inputPathCrossref}/crossref_ds @@ -173,7 +173,7 @@ - + From f7c0b80e35d853b4594abaa5f4fcade48e7d4e21 Mon Sep 17 00:00:00 2001 From: antleb Date: Tue, 15 Jun 2021 14:45:48 +0300 Subject: [PATCH 19/22] storing result_instance as parquet --- .../dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step16_6.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step16_6.sql b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step16_6.sql index 5280cf3e3..3a7d9f455 100644 --- a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step16_6.sql +++ b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step16_6.sql @@ -31,7 +31,7 @@ group by rcount.pid; create view ${stats_db_name}.rndexpenditure as select * from stats_ext.rndexpediture; -create table ${stats_db_name}.result_instance as +create table ${stats_db_name}.result_instance stored as parquet as select distinct r.* from ( select substr(r.id, 4) as id, inst.accessright.classname as accessright, substr(inst.collectedfrom.key, 4) as collectedfrom, From 7deac551383faa59b156830add3aa5b680bd5b8e Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Tue, 15 Jun 2021 18:38:20 +0200 Subject: [PATCH 20/22] added one option for resume from in the wf --- .../eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml index 6d9042302..75b0ea31e 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml @@ -110,6 +110,7 @@ ${wf:conf('resumeFrom') eq 'PreprocessORCID'} ${wf:conf('resumeFrom') eq 'CreateDOIBoost'} ${wf:conf('resumeFrom') eq 'GenerateActionSet'} + ${wf:conf('resumeFrom') eq 'GenerateCrossrefDataset'} @@ -139,13 +140,13 @@ --hdfsServerUri${nameNode} --crossrefFileNameTarGz${crossrefDumpPath}/crossref.tar.gz --workingPath${crossrefDumpPath} - --outputPath${crossrefDumpPath}/files + --outputPath${crossrefDumpPath}/files/ - + - + yarn-cluster cluster From 1c47c0d78699151df89f0969b7ea61205bb1409f Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Tue, 15 Jun 2021 21:05:39 +0200 Subject: [PATCH 21/22] modified the number of executors trying to avoid OOM exception --- .../resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml index 75b0ea31e..7bd7d107f 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml @@ -155,7 +155,7 @@ dhp-doiboost-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} + --executor-cores=2 --driver-memory=${sparkDriverMemory} --conf spark.sql.shuffle.partitions=3840 --conf spark.extraListeners=${spark2ExtraListeners} From 7243a40c88244397b7a8cd88159427cba350b5fe Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 16 Jun 2021 15:03:03 +0200 Subject: [PATCH 22/22] code formatting --- .../dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java index 5109a0763..1d4eca2c2 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java @@ -52,8 +52,9 @@ public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString(MigrateHdfsMdstoresApplication.class - .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/migrate_hdfs_mstores_parameters.json"))); + .toString( + MigrateHdfsMdstoresApplication.class + .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/migrate_hdfs_mstores_parameters.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional @@ -103,7 +104,8 @@ public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication // .coalesce(1) .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); } else { - spark.emptyDataFrame() + spark + .emptyDataFrame() .toJavaRDD() .mapToPair(xml -> new Tuple2<>(new Text(), new Text())) .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);