diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala index 556106180..362cb2028 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala @@ -116,54 +116,45 @@ object SparkConvertRDDtoDataset { .map(s => mapper.readValue(s, classOf[Relation])) .filter(r => r.getDataInfo != null && !r.getDataInfo.getDeletedbyinference) .filter(r => r.getSource.startsWith("50") && r.getTarget.startsWith("50")) - .filter(r => filterRelations(subRelTypeFilter, relClassFilter, r)) - //filter OpenCitations relations - .filter(r => - r.getDataInfo.getProvenanceaction != null && - !"sysimport:crosswalk:opencitations".equals(r.getDataInfo.getProvenanceaction.getClassid) - ) + .filter(r => filterRelations(r)) + //filter OpenCitations relations +// .filter(r => +// r.getDataInfo.getProvenanceaction != null && +// !"sysimport:crosswalk:opencitations".equals(r.getDataInfo.getProvenanceaction.getClassid) +// ) spark.createDataset(rddRelation).as[Relation].write.mode(SaveMode.Overwrite).save(s"$relPath") } - private def filterRelations(subRelTypeFilter: String, relClassFilter: List[String], r: Relation): Boolean = { - if (StringUtils.isNotBlank(subRelTypeFilter)) { - subRelTypeFilter.equalsIgnoreCase(r.getSubRelType) - } else { - !relClassFilter.exists(k => k.equalsIgnoreCase(r.getRelClass)) + private def filterRelations(r: Relation): Boolean = { + + /** * + * We filter relation generated by dedups + * and all the relation that have one single collectedFrom OpenCitation + */ + + val relClassFilter = List( + ModelConstants.MERGES, + ModelConstants.IS_MERGED_IN, + ModelConstants.HAS_AMONG_TOP_N_SIMILAR_DOCS, + ModelConstants.IS_AMONG_TOP_N_SIMILAR_DOCS + ) + if (relClassFilter.exists(k => k.equalsIgnoreCase(r.getRelClass))) + false + else { + if (r.getCollectedfrom == null || r.getCollectedfrom.size() == 0) + false + else if (r.getCollectedfrom.size() > 1) + true + else if ( + r.getCollectedfrom.size() == 1 && r.getCollectedfrom.get(0) != null && "OpenCitations".equalsIgnoreCase( + r.getCollectedfrom.get(0).getValue + ) + ) + false + else + true } } - /* - //TODO: finalise implementation - private def processResult[T<: Result]( - implicit ct: ClassTag[T], - log: Logger, - spark: SparkSession, - sourcePath: String, - entityPath: String, - clazz: Class[T] - ): Unit = { - val entityType = clazz.getSimpleName.toLowerCase - - log.info(s"Converting $entityType") - - val mapper = new ObjectMapper() with ScalaObjectMapper - mapper.registerModule(DefaultScalaModule) - - val rdd = spark.sparkContext - .textFile(s"$sourcePath/$entityType") - .map(s => mapper.readValue(s, clazz)) - .filter(r => r.getDataInfo != null && !r.getDataInfo.getDeletedbyinference); - - implicit val encoder: Encoder[T] = Encoders.kryo(clazz) - spark - .createDataset(rdd) - .as[T] - .write - .mode(SaveMode.Overwrite) - .save(s"$entityPath/$entityType") - } - */ - } diff --git a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/config-default.xml b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/config-default.xml index 9331d4ac5..63fc84d75 100644 --- a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/config-default.xml +++ b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/config-default.xml @@ -21,7 +21,7 @@ hive_jdbc_url - jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000 + jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000/;UseNativeQuery=1;?spark.executor.memory=19166291558;spark.yarn.executor.memoryOverhead=3225;spark.driver.memory=11596411699;spark.yarn.driver.memoryOverhead=1228 oozie.wf.workflow.notification.url diff --git a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step11.sql b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step11.sql index d699b68c3..41c3ed751 100644 --- a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step11.sql +++ b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step11.sql @@ -42,7 +42,9 @@ SELECT p.id, CASE WHEN prr2.id IS NULL THEN 0 ELSE prr2.dp END AS delayedpubs, p.callidentifier, p.code, - p.totalcost + p.totalcost, + p.fundedamount, + p.currency FROM ${stats_db_name}.project_tmp p LEFT JOIN (SELECT pr.id, count(distinct pr.result) AS np FROM ${stats_db_name}.project_results pr diff --git a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step13.sql b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step13.sql index aee66fd5e..24e1a1355 100644 --- a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step13.sql +++ b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step13.sql @@ -59,7 +59,7 @@ UNION ALL SELECT * FROM ${stats_db_name}.otherresearchproduct_sources; -create table ${stats_db_name}.result_orcid STORED AS PARQUET as +CREATE TABLE IF NOT EXISTS ${stats_db_name}.result_orcid STORED AS PARQUET as select distinct res.id, regexp_replace(res.orcid, 'http://orcid.org/' ,'') as orcid from ( SELECT substr(res.id, 4) as id, auth_pid.value as orcid @@ -69,7 +69,7 @@ from ( LATERAL VIEW explode(auth.pid.qualifier.classid) apt as author_pid_type WHERE res.datainfo.deletedbyinference = FALSE and res.datainfo.invisible = FALSE and author_pid_type = 'orcid') as res; -create table ${stats_db_name}.result_result stored as parquet as +CREATE TABLE IF NOT EXISTS ${stats_db_name}.result_result stored as parquet as select substr(rel.source, 4) as source, substr(rel.target, 4) as target, relclass, subreltype from ${openaire_db_name}.relation rel join ${openaire_db_name}.result r1 on rel.source=r1.id @@ -82,7 +82,7 @@ where reltype='resultResult' and r2.resulttype.classname != 'other' and rel.datainfo.deletedbyinference=false and rel.datainfo.invisible = FALSE; -create table ${stats_db_name}.result_citations_oc stored as parquet as +CREATE TABLE IF NOT EXISTS ${stats_db_name}.result_citations_oc stored as parquet as select substr(target, 4) as id, count(distinct substr(source, 4)) as citations from ${openaire_db_name}.relation rel join ${openaire_db_name}.result r1 on rel.source=r1.id @@ -97,7 +97,7 @@ where relClass='Cites' and rel.datainfo.provenanceaction.classid = 'sysimport:cr and rel.datainfo.deletedbyinference=false and rel.datainfo.invisible = FALSE group by substr(target, 4); -create table ${stats_db_name}.result_references_oc stored as parquet as +CREATE TABLE IF NOT EXISTS ${stats_db_name}.result_references_oc stored as parquet as select substr(source, 4) as id, count(distinct substr(target, 4)) as references from ${openaire_db_name}.relation rel join ${openaire_db_name}.result r1 on rel.source=r1.id diff --git a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step6.sql b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step6.sql index 5461afde6..c31180c14 100644 --- a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step6.sql +++ b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step6.sql @@ -48,7 +48,9 @@ CREATE TABLE ${stats_db_name}.project_tmp delayedpubs INT, callidentifier STRING, code STRING, - totalcost FLOAT + totalcost FLOAT, + fundedamount FLOAT, + currency STRING ) CLUSTERED BY (id) INTO 100 buckets stored AS orc tblproperties ('transactional' = 'true'); INSERT INTO ${stats_db_name}.project_tmp @@ -72,7 +74,9 @@ SELECT substr(p.id, 4) AS id, 0 AS delayedpubs, p.callidentifier.value AS callidentifier, p.code.value AS code, - p.totalcost AS totalcost + p.totalcost AS totalcost, + p.fundedamount AS fundedamount, + p.currency.value AS currency FROM ${openaire_db_name}.project p WHERE p.datainfo.deletedbyinference = false and p.datainfo.invisible=false;