Master branch updates from beta September 2023 #337
|
@ -116,54 +116,45 @@ object SparkConvertRDDtoDataset {
|
||||||
.map(s => mapper.readValue(s, classOf[Relation]))
|
.map(s => mapper.readValue(s, classOf[Relation]))
|
||||||
.filter(r => r.getDataInfo != null && !r.getDataInfo.getDeletedbyinference)
|
.filter(r => r.getDataInfo != null && !r.getDataInfo.getDeletedbyinference)
|
||||||
.filter(r => r.getSource.startsWith("50") && r.getTarget.startsWith("50"))
|
.filter(r => r.getSource.startsWith("50") && r.getTarget.startsWith("50"))
|
||||||
.filter(r => filterRelations(subRelTypeFilter, relClassFilter, r))
|
.filter(r => filterRelations(r))
|
||||||
//filter OpenCitations relations
|
//filter OpenCitations relations
|
||||||
.filter(r =>
|
// .filter(r =>
|
||||||
r.getDataInfo.getProvenanceaction != null &&
|
// r.getDataInfo.getProvenanceaction != null &&
|
||||||
!"sysimport:crosswalk:opencitations".equals(r.getDataInfo.getProvenanceaction.getClassid)
|
// !"sysimport:crosswalk:opencitations".equals(r.getDataInfo.getProvenanceaction.getClassid)
|
||||||
)
|
// )
|
||||||
|
|
||||||
spark.createDataset(rddRelation).as[Relation].write.mode(SaveMode.Overwrite).save(s"$relPath")
|
spark.createDataset(rddRelation).as[Relation].write.mode(SaveMode.Overwrite).save(s"$relPath")
|
||||||
}
|
}
|
||||||
|
|
||||||
private def filterRelations(subRelTypeFilter: String, relClassFilter: List[String], r: Relation): Boolean = {
|
private def filterRelations(r: Relation): Boolean = {
|
||||||
if (StringUtils.isNotBlank(subRelTypeFilter)) {
|
|
||||||
subRelTypeFilter.equalsIgnoreCase(r.getSubRelType)
|
|
||||||
} else {
|
|
||||||
!relClassFilter.exists(k => k.equalsIgnoreCase(r.getRelClass))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/** *
|
||||||
//TODO: finalise implementation
|
* We filter relation generated by dedups
|
||||||
private def processResult[T<: Result](
|
* and all the relation that have one single collectedFrom OpenCitation
|
||||||
implicit ct: ClassTag[T],
|
|
||||||
log: Logger,
|
|
||||||
spark: SparkSession,
|
|
||||||
sourcePath: String,
|
|
||||||
entityPath: String,
|
|
||||||
clazz: Class[T]
|
|
||||||
): Unit = {
|
|
||||||
val entityType = clazz.getSimpleName.toLowerCase
|
|
||||||
|
|
||||||
log.info(s"Converting $entityType")
|
|
||||||
|
|
||||||
val mapper = new ObjectMapper() with ScalaObjectMapper
|
|
||||||
mapper.registerModule(DefaultScalaModule)
|
|
||||||
|
|
||||||
val rdd = spark.sparkContext
|
|
||||||
.textFile(s"$sourcePath/$entityType")
|
|
||||||
.map(s => mapper.readValue(s, clazz))
|
|
||||||
.filter(r => r.getDataInfo != null && !r.getDataInfo.getDeletedbyinference);
|
|
||||||
|
|
||||||
implicit val encoder: Encoder[T] = Encoders.kryo(clazz)
|
|
||||||
spark
|
|
||||||
.createDataset(rdd)
|
|
||||||
.as[T]
|
|
||||||
.write
|
|
||||||
.mode(SaveMode.Overwrite)
|
|
||||||
.save(s"$entityPath/$entityType")
|
|
||||||
}
|
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
val relClassFilter = List(
|
||||||
|
ModelConstants.MERGES,
|
||||||
|
ModelConstants.IS_MERGED_IN,
|
||||||
|
ModelConstants.HAS_AMONG_TOP_N_SIMILAR_DOCS,
|
||||||
|
ModelConstants.IS_AMONG_TOP_N_SIMILAR_DOCS
|
||||||
|
)
|
||||||
|
if (relClassFilter.exists(k => k.equalsIgnoreCase(r.getRelClass)))
|
||||||
|
false
|
||||||
|
else {
|
||||||
|
if (r.getCollectedfrom == null || r.getCollectedfrom.size() == 0)
|
||||||
|
false
|
||||||
|
else if (r.getCollectedfrom.size() > 1)
|
||||||
|
true
|
||||||
|
else if (
|
||||||
|
r.getCollectedfrom.size() == 1 && r.getCollectedfrom.get(0) != null && "OpenCitations".equalsIgnoreCase(
|
||||||
|
r.getCollectedfrom.get(0).getValue
|
||||||
|
)
|
||||||
|
)
|
||||||
|
false
|
||||||
|
else
|
||||||
|
true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,7 @@
|
||||||
</property>
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>hive_jdbc_url</name>
|
<name>hive_jdbc_url</name>
|
||||||
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
|
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000/;UseNativeQuery=1;?spark.executor.memory=19166291558;spark.yarn.executor.memoryOverhead=3225;spark.driver.memory=11596411699;spark.yarn.driver.memoryOverhead=1228</value>
|
||||||
</property>
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>oozie.wf.workflow.notification.url</name>
|
<name>oozie.wf.workflow.notification.url</name>
|
||||||
|
|
|
@ -42,7 +42,9 @@ SELECT p.id,
|
||||||
CASE WHEN prr2.id IS NULL THEN 0 ELSE prr2.dp END AS delayedpubs,
|
CASE WHEN prr2.id IS NULL THEN 0 ELSE prr2.dp END AS delayedpubs,
|
||||||
p.callidentifier,
|
p.callidentifier,
|
||||||
p.code,
|
p.code,
|
||||||
p.totalcost
|
p.totalcost,
|
||||||
|
p.fundedamount,
|
||||||
|
p.currency
|
||||||
FROM ${stats_db_name}.project_tmp p
|
FROM ${stats_db_name}.project_tmp p
|
||||||
LEFT JOIN (SELECT pr.id, count(distinct pr.result) AS np
|
LEFT JOIN (SELECT pr.id, count(distinct pr.result) AS np
|
||||||
FROM ${stats_db_name}.project_results pr
|
FROM ${stats_db_name}.project_results pr
|
||||||
|
|
|
@ -59,7 +59,7 @@ UNION ALL
|
||||||
SELECT * FROM ${stats_db_name}.otherresearchproduct_sources;
|
SELECT * FROM ${stats_db_name}.otherresearchproduct_sources;
|
||||||
|
|
||||||
|
|
||||||
create table ${stats_db_name}.result_orcid STORED AS PARQUET as
|
CREATE TABLE IF NOT EXISTS ${stats_db_name}.result_orcid STORED AS PARQUET as
|
||||||
select distinct res.id, regexp_replace(res.orcid, 'http://orcid.org/' ,'') as orcid
|
select distinct res.id, regexp_replace(res.orcid, 'http://orcid.org/' ,'') as orcid
|
||||||
from (
|
from (
|
||||||
SELECT substr(res.id, 4) as id, auth_pid.value as orcid
|
SELECT substr(res.id, 4) as id, auth_pid.value as orcid
|
||||||
|
@ -69,7 +69,7 @@ from (
|
||||||
LATERAL VIEW explode(auth.pid.qualifier.classid) apt as author_pid_type
|
LATERAL VIEW explode(auth.pid.qualifier.classid) apt as author_pid_type
|
||||||
WHERE res.datainfo.deletedbyinference = FALSE and res.datainfo.invisible = FALSE and author_pid_type = 'orcid') as res;
|
WHERE res.datainfo.deletedbyinference = FALSE and res.datainfo.invisible = FALSE and author_pid_type = 'orcid') as res;
|
||||||
|
|
||||||
create table ${stats_db_name}.result_result stored as parquet as
|
CREATE TABLE IF NOT EXISTS ${stats_db_name}.result_result stored as parquet as
|
||||||
select substr(rel.source, 4) as source, substr(rel.target, 4) as target, relclass, subreltype
|
select substr(rel.source, 4) as source, substr(rel.target, 4) as target, relclass, subreltype
|
||||||
from ${openaire_db_name}.relation rel
|
from ${openaire_db_name}.relation rel
|
||||||
join ${openaire_db_name}.result r1 on rel.source=r1.id
|
join ${openaire_db_name}.result r1 on rel.source=r1.id
|
||||||
|
@ -82,7 +82,7 @@ where reltype='resultResult'
|
||||||
and r2.resulttype.classname != 'other'
|
and r2.resulttype.classname != 'other'
|
||||||
and rel.datainfo.deletedbyinference=false and rel.datainfo.invisible = FALSE;
|
and rel.datainfo.deletedbyinference=false and rel.datainfo.invisible = FALSE;
|
||||||
|
|
||||||
create table ${stats_db_name}.result_citations_oc stored as parquet as
|
CREATE TABLE IF NOT EXISTS ${stats_db_name}.result_citations_oc stored as parquet as
|
||||||
select substr(target, 4) as id, count(distinct substr(source, 4)) as citations
|
select substr(target, 4) as id, count(distinct substr(source, 4)) as citations
|
||||||
from ${openaire_db_name}.relation rel
|
from ${openaire_db_name}.relation rel
|
||||||
join ${openaire_db_name}.result r1 on rel.source=r1.id
|
join ${openaire_db_name}.result r1 on rel.source=r1.id
|
||||||
|
@ -97,7 +97,7 @@ where relClass='Cites' and rel.datainfo.provenanceaction.classid = 'sysimport:cr
|
||||||
and rel.datainfo.deletedbyinference=false and rel.datainfo.invisible = FALSE
|
and rel.datainfo.deletedbyinference=false and rel.datainfo.invisible = FALSE
|
||||||
group by substr(target, 4);
|
group by substr(target, 4);
|
||||||
|
|
||||||
create table ${stats_db_name}.result_references_oc stored as parquet as
|
CREATE TABLE IF NOT EXISTS ${stats_db_name}.result_references_oc stored as parquet as
|
||||||
select substr(source, 4) as id, count(distinct substr(target, 4)) as references
|
select substr(source, 4) as id, count(distinct substr(target, 4)) as references
|
||||||
from ${openaire_db_name}.relation rel
|
from ${openaire_db_name}.relation rel
|
||||||
join ${openaire_db_name}.result r1 on rel.source=r1.id
|
join ${openaire_db_name}.result r1 on rel.source=r1.id
|
||||||
|
|
|
@ -48,7 +48,9 @@ CREATE TABLE ${stats_db_name}.project_tmp
|
||||||
delayedpubs INT,
|
delayedpubs INT,
|
||||||
callidentifier STRING,
|
callidentifier STRING,
|
||||||
code STRING,
|
code STRING,
|
||||||
totalcost FLOAT
|
totalcost FLOAT,
|
||||||
|
fundedamount FLOAT,
|
||||||
|
currency STRING
|
||||||
) CLUSTERED BY (id) INTO 100 buckets stored AS orc tblproperties ('transactional' = 'true');
|
) CLUSTERED BY (id) INTO 100 buckets stored AS orc tblproperties ('transactional' = 'true');
|
||||||
|
|
||||||
INSERT INTO ${stats_db_name}.project_tmp
|
INSERT INTO ${stats_db_name}.project_tmp
|
||||||
|
@ -72,7 +74,9 @@ SELECT substr(p.id, 4) AS id,
|
||||||
0 AS delayedpubs,
|
0 AS delayedpubs,
|
||||||
p.callidentifier.value AS callidentifier,
|
p.callidentifier.value AS callidentifier,
|
||||||
p.code.value AS code,
|
p.code.value AS code,
|
||||||
p.totalcost AS totalcost
|
p.totalcost AS totalcost,
|
||||||
|
p.fundedamount AS fundedamount,
|
||||||
|
p.currency.value AS currency
|
||||||
FROM ${openaire_db_name}.project p
|
FROM ${openaire_db_name}.project p
|
||||||
WHERE p.datainfo.deletedbyinference = false and p.datainfo.invisible=false;
|
WHERE p.datainfo.deletedbyinference = false and p.datainfo.invisible=false;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue