forked from D-Net/dnet-hadoop
Merge branch 'stable_ids' of https://code-repo.d4science.org/D-Net/dnet-hadoop into stable_ids
This commit is contained in:
commit
180d671127
|
@ -18,7 +18,7 @@ import eu.dnetlib.dhp.schema.oaf.Field;
|
||||||
|
|
||||||
public class DatePicker {
|
public class DatePicker {
|
||||||
|
|
||||||
public static final String DATE_PATTERN = "^\\d{4}-\\d{2}-\\d{2}$";
|
public static final String DATE_PATTERN = "^(\\d{4})-(\\d{2})-(\\d{2})";
|
||||||
private static final String DATE_DEFAULT_SUFFIX = "01-01";
|
private static final String DATE_DEFAULT_SUFFIX = "01-01";
|
||||||
private static final int YEAR_LB = 1300;
|
private static final int YEAR_LB = 1300;
|
||||||
private static final int YEAR_UB = Year.now().getValue() + 5;
|
private static final int YEAR_UB = Year.now().getValue() + 5;
|
||||||
|
@ -28,6 +28,7 @@ public class DatePicker {
|
||||||
final Map<String, Integer> frequencies = dateofacceptance
|
final Map<String, Integer> frequencies = dateofacceptance
|
||||||
.parallelStream()
|
.parallelStream()
|
||||||
.filter(StringUtils::isNotBlank)
|
.filter(StringUtils::isNotBlank)
|
||||||
|
.map(d -> substringBefore(d, "T"))
|
||||||
.collect(Collectors.toConcurrentMap(w -> w, w -> 1, Integer::sum));
|
.collect(Collectors.toConcurrentMap(w -> w, w -> 1, Integer::sum));
|
||||||
|
|
||||||
if (frequencies.isEmpty()) {
|
if (frequencies.isEmpty()) {
|
||||||
|
|
|
@ -0,0 +1,44 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.oa.dedup;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import com.clearspring.analytics.util.Lists;
|
||||||
|
|
||||||
|
public class DatePickerTest {
|
||||||
|
|
||||||
|
Collection<String> dates = Lists.newArrayList();
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPickISO() {
|
||||||
|
dates.add("2016-01-01T12:00:00Z");
|
||||||
|
dates.add("2016-06-16T12:00:00Z");
|
||||||
|
dates.add("2020-01-01T12:00:00Z");
|
||||||
|
dates.add("2020-10-01T12:00:00Z");
|
||||||
|
assertEquals("2020-10-01", DatePicker.pick(dates).getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPickSimple() {
|
||||||
|
dates.add("2016-01-01");
|
||||||
|
dates.add("2016-06-16");
|
||||||
|
dates.add("2020-01-01");
|
||||||
|
dates.add("2020-10-01");
|
||||||
|
assertEquals("2020-10-01", DatePicker.pick(dates).getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPickFrequent() {
|
||||||
|
dates.add("2016-02-01");
|
||||||
|
dates.add("2016-02-01");
|
||||||
|
dates.add("2016-02-01");
|
||||||
|
dates.add("2020-10-01");
|
||||||
|
assertEquals("2016-02-01", DatePicker.pick(dates).getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -52,8 +52,9 @@ public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication
|
||||||
public static void main(final String[] args) throws Exception {
|
public static void main(final String[] args) throws Exception {
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
IOUtils
|
IOUtils
|
||||||
.toString(MigrateHdfsMdstoresApplication.class
|
.toString(
|
||||||
.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/migrate_hdfs_mstores_parameters.json")));
|
MigrateHdfsMdstoresApplication.class
|
||||||
|
.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/migrate_hdfs_mstores_parameters.json")));
|
||||||
parser.parseArgument(args);
|
parser.parseArgument(args);
|
||||||
|
|
||||||
final Boolean isSparkSessionManaged = Optional
|
final Boolean isSparkSessionManaged = Optional
|
||||||
|
@ -103,7 +104,8 @@ public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication
|
||||||
// .coalesce(1)
|
// .coalesce(1)
|
||||||
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
|
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
|
||||||
} else {
|
} else {
|
||||||
spark.emptyDataFrame()
|
spark
|
||||||
|
.emptyDataFrame()
|
||||||
.toJavaRDD()
|
.toJavaRDD()
|
||||||
.mapToPair(xml -> new Tuple2<>(new Text(), new Text()))
|
.mapToPair(xml -> new Tuple2<>(new Text(), new Text()))
|
||||||
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
|
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
|
||||||
|
|
|
@ -41,7 +41,8 @@ SELECT p.id,
|
||||||
CASE WHEN prr2.id IS NULL THEN 0 ELSE prr2.daysForlastPub END AS daysforlastpub,
|
CASE WHEN prr2.id IS NULL THEN 0 ELSE prr2.daysForlastPub END AS daysforlastpub,
|
||||||
CASE WHEN prr2.id IS NULL THEN 0 ELSE prr2.dp END AS delayedpubs,
|
CASE WHEN prr2.id IS NULL THEN 0 ELSE prr2.dp END AS delayedpubs,
|
||||||
p.callidentifier,
|
p.callidentifier,
|
||||||
p.code
|
p.code,
|
||||||
|
p.totalcost
|
||||||
FROM ${stats_db_name}.project_tmp p
|
FROM ${stats_db_name}.project_tmp p
|
||||||
LEFT JOIN (SELECT pr.id, count(distinct pr.result) AS np
|
LEFT JOIN (SELECT pr.id, count(distinct pr.result) AS np
|
||||||
FROM ${stats_db_name}.project_results pr
|
FROM ${stats_db_name}.project_results pr
|
||||||
|
|
|
@ -30,10 +30,21 @@ from rcount
|
||||||
group by rcount.pid;
|
group by rcount.pid;
|
||||||
|
|
||||||
create view ${stats_db_name}.rndexpenditure as select * from stats_ext.rndexpediture;
|
create view ${stats_db_name}.rndexpenditure as select * from stats_ext.rndexpediture;
|
||||||
--
|
|
||||||
-- ANALYZE TABLE ${stats_db_name}.result_projectcount COMPUTE STATISTICS;
|
create table ${stats_db_name}.result_instance stored as parquet as
|
||||||
-- ANALYZE TABLE ${stats_db_name}.result_projectcount COMPUTE STATISTICS FOR COLUMNS;
|
select distinct r.*
|
||||||
-- ANALYZE TABLE ${stats_db_name}.result_fundercount COMPUTE STATISTICS;
|
from (
|
||||||
-- ANALYZE TABLE ${stats_db_name}.result_fundercount COMPUTE STATISTICS FOR COLUMNS;
|
select substr(r.id, 4) as id, inst.accessright.classname as accessright, substr(inst.collectedfrom.key, 4) as collectedfrom,
|
||||||
-- ANALYZE TABLE ${stats_db_name}.project_resultcount COMPUTE STATISTICS;
|
substr(inst.hostedby.key, 4) as hostedby, inst.dateofacceptance.value as dateofacceptance, inst.license.value as license, p.qualifier.classname as pidtype, p.value as pid
|
||||||
-- ANALYZE TABLE ${stats_db_name}.project_resultcount COMPUTE STATISTICS FOR COLUMNS;
|
from ${openaire_db_name}.result r lateral view explode(r.instance) instances as inst lateral view explode(inst.pid) pids as p) r
|
||||||
|
join ${stats_db_name}.result res on res.id=r.id;
|
||||||
|
|
||||||
|
create table ${stats_db_name}.result_apc as
|
||||||
|
select r.id, r.amount, r.currency
|
||||||
|
from (
|
||||||
|
select substr(r.id, 4) as id, inst.processingchargeamount.value as amount, inst.processingchargecurrency.value as currency
|
||||||
|
from ${openaire_db_name}.result r lateral view explode(r.instance) instances as inst) r
|
||||||
|
join ${stats_db_name}.result res on res.id=r.id
|
||||||
|
where r.amount is not null;
|
||||||
|
|
||||||
|
create view ${stats_db_name}.issn_gold_oa_dataset as select * from stats_ext.issn_gold_oa_dataset;
|
|
@ -16,7 +16,13 @@ create table TARGET.result as
|
||||||
select distinct * from (
|
select distinct * from (
|
||||||
select * from SOURCE.result r where exists (select 1 from SOURCE.result_projects rp join SOURCE.project p on rp.project=p.id where rp.id=r.id)
|
select * from SOURCE.result r where exists (select 1 from SOURCE.result_projects rp join SOURCE.project p on rp.project=p.id where rp.id=r.id)
|
||||||
union all
|
union all
|
||||||
select * from SOURCE.result r where exists (select 1 from SOURCE.result_concepts rc where rc.id=r.id) ) foo;
|
select * from SOURCE.result r where exists (select 1 from SOURCE.result_concepts rc where rc.id=r.id)
|
||||||
|
union all
|
||||||
|
select * from SOURCE.result r where exists (select 1 from SOURCE.result_project rp join SOURCE.project p on p.id=rp.project join SOURCE.project_organizations po on po.id=p.id join SOURCE.organization o on o.id=po.organization where ro.id=r.id and o.name in (
|
||||||
|
'GEORG-AUGUST-UNIVERSITAT GOTTINGEN STIFTUNG OFFENTLICHEN RECHTS',
|
||||||
|
'ATHINA-EREVNITIKO KENTRO KAINOTOMIAS STIS TECHNOLOGIES TIS PLIROFORIAS, TON EPIKOINONION KAI TIS GNOSIS',
|
||||||
|
'Consiglio Nazionale delle Ricerche',
|
||||||
|
'Universidade do Minho') )) foo;
|
||||||
compute stats TARGET.result;
|
compute stats TARGET.result;
|
||||||
|
|
||||||
create table TARGET.result_citations as select * from SOURCE.result_citations orig where exists (select 1 from TARGET.result r where r.id=orig.id);
|
create table TARGET.result_citations as select * from SOURCE.result_citations orig where exists (select 1 from TARGET.result r where r.id=orig.id);
|
||||||
|
|
|
@ -39,7 +39,8 @@ CREATE TABLE ${stats_db_name}.project_tmp
|
||||||
daysforlastpub INT,
|
daysforlastpub INT,
|
||||||
delayedpubs INT,
|
delayedpubs INT,
|
||||||
callidentifier STRING,
|
callidentifier STRING,
|
||||||
code STRING
|
code STRING,
|
||||||
|
totalcost FLOAT
|
||||||
) CLUSTERED BY (id) INTO 100 buckets stored AS orc tblproperties ('transactional' = 'true');
|
) CLUSTERED BY (id) INTO 100 buckets stored AS orc tblproperties ('transactional' = 'true');
|
||||||
|
|
||||||
INSERT INTO ${stats_db_name}.project_tmp
|
INSERT INTO ${stats_db_name}.project_tmp
|
||||||
|
@ -62,7 +63,8 @@ SELECT substr(p.id, 4) AS id,
|
||||||
0 AS daysforlastpub,
|
0 AS daysforlastpub,
|
||||||
0 AS delayedpubs,
|
0 AS delayedpubs,
|
||||||
p.callidentifier.value AS callidentifier,
|
p.callidentifier.value AS callidentifier,
|
||||||
p.code.value AS code
|
p.code.value AS code,
|
||||||
|
p.totalcost AS totalcost
|
||||||
FROM ${openaire_db_name}.project p
|
FROM ${openaire_db_name}.project p
|
||||||
WHERE p.datainfo.deletedbyinference = false;
|
WHERE p.datainfo.deletedbyinference = false;
|
||||||
|
|
||||||
|
@ -71,14 +73,3 @@ select distinct xpath_string(fund, '//funder/id') as id,
|
||||||
xpath_string(fund, '//funder/name') as name,
|
xpath_string(fund, '//funder/name') as name,
|
||||||
xpath_string(fund, '//funder/shortname') as shortname
|
xpath_string(fund, '//funder/shortname') as shortname
|
||||||
from ${openaire_db_name}.project p lateral view explode(p.fundingtree.value) fundingtree as fund;
|
from ${openaire_db_name}.project p lateral view explode(p.fundingtree.value) fundingtree as fund;
|
||||||
|
|
||||||
-- ANALYZE TABLE ${stats_db_name}.project_oids COMPUTE STATISTICS;
|
|
||||||
-- ANALYZE TABLE ${stats_db_name}.project_oids COMPUTE STATISTICS FOR COLUMNS;
|
|
||||||
-- ANALYZE TABLE ${stats_db_name}.project_organizations COMPUTE STATISTICS;
|
|
||||||
-- ANALYZE TABLE ${stats_db_name}.project_organizations COMPUTE STATISTICS FOR COLUMNS;
|
|
||||||
-- ANALYZE TABLE ${stats_db_name}.project_results COMPUTE STATISTICS;
|
|
||||||
-- ANALYZE TABLE ${stats_db_name}.project_results COMPUTE STATISTICS FOR COLUMNS;
|
|
||||||
-- ANALYZE TABLE ${stats_db_name}.project_tmp COMPUTE STATISTICS;
|
|
||||||
-- ANALYZE TABLE ${stats_db_name}.project_tmp COMPUTE STATISTICS FOR COLUMNS;
|
|
||||||
-- ANALYZE TABLE ${stats_db_name}.funder COMPUTE STATISTICS;
|
|
||||||
-- ANALYZE TABLE ${stats_db_name}.funder COMPUTE STATISTICS FOR COLUMNS;
|
|
Loading…
Reference in New Issue