Merge branch 'beta' into hierarchical_orgs_relations

This commit is contained in:
Claudio Atzori 2021-11-08 09:40:24 +01:00
commit 9cb8e4ad21
7 changed files with 16 additions and 11 deletions

View File

@ -75,7 +75,7 @@ object SparkResolveRelation {
if (targetResolved != null && targetResolved._1.nonEmpty) if (targetResolved != null && targetResolved._1.nonEmpty)
currentRelation.setTarget(targetResolved._1) currentRelation.setTarget(targetResolved._1)
currentRelation currentRelation
}.filter(r => !r.getSource.startsWith("unresolved") && !r.getTarget.startsWith("unresolved")) }
.write .write
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.save(s"$workingPath/relation_resolved") .save(s"$workingPath/relation_resolved")
@ -88,6 +88,7 @@ object SparkResolveRelation {
fs.rename(new Path(s"$graphBasePath/relation"), new Path(s"$workingPath/relation")) fs.rename(new Path(s"$graphBasePath/relation"), new Path(s"$workingPath/relation"))
spark.read.load(s"$workingPath/relation_resolved").as[Relation] spark.read.load(s"$workingPath/relation_resolved").as[Relation]
.filter(r => !r.getSource.startsWith("unresolved") && !r.getTarget.startsWith("unresolved"))
.map(r => mapper.writeValueAsString(r)) .map(r => mapper.writeValueAsString(r))
.write .write
.option("compression", "gzip") .option("compression", "gzip")

View File

@ -59,7 +59,7 @@ object SparkConvertRDDtoDataset {
log.info("Converting Relation") log.info("Converting Relation")
val rddRelation =spark.sparkContext.textFile(s"$sourcePath/relation").map(s => mapper.readValue(s, classOf[Relation])) val rddRelation =spark.sparkContext.textFile(s"$sourcePath/relation").map(s => mapper.readValue(s, classOf[Relation])).filter(r=> r.getSource.startsWith("50") && r.getTarget.startsWith("50"))
spark.createDataset(rddRelation).as[Relation].write.mode(SaveMode.Overwrite).save(s"$relPath") spark.createDataset(rddRelation).as[Relation].write.mode(SaveMode.Overwrite).save(s"$relPath")

View File

@ -51,10 +51,14 @@ object SparkCreateScholix {
relationDS.joinWith(summaryDS, relationDS("_1").equalTo(summaryDS("_1")), "left") relationDS.joinWith(summaryDS, relationDS("_1").equalTo(summaryDS("_1")), "left")
.map { input: ((String, Relation), (String, ScholixSummary)) => .map { input: ((String, Relation), (String, ScholixSummary)) =>
val rel: Relation = input._1._2 if (input._1!= null && input._2!= null) {
val source: ScholixSummary = input._2._2 val rel: Relation = input._1._2
(rel.getTarget, ScholixUtils.scholixFromSource(rel, source)) val source: ScholixSummary = input._2._2
(rel.getTarget, ScholixUtils.scholixFromSource(rel, source))
}
else null
}(Encoders.tuple(Encoders.STRING, scholixEncoder)) }(Encoders.tuple(Encoders.STRING, scholixEncoder))
.filter(r => r!= null)
.write.mode(SaveMode.Overwrite).save(s"$targetPath/scholix_from_source") .write.mode(SaveMode.Overwrite).save(s"$targetPath/scholix_from_source")
val scholixSource: Dataset[(String, Scholix)] = spark.read.load(s"$targetPath/scholix_from_source").as[(String, Scholix)](Encoders.tuple(Encoders.STRING, scholixEncoder)) val scholixSource: Dataset[(String, Scholix)] = spark.read.load(s"$targetPath/scholix_from_source").as[(String, Scholix)](Encoders.tuple(Encoders.STRING, scholixEncoder))

View File

@ -289,7 +289,7 @@ object ScholixUtils {
if (r.getInstance() == null || r.getInstance().isEmpty) if (r.getInstance() == null || r.getInstance().isEmpty)
return List() return List()
r.getInstance().asScala.filter(i => i.getUrl!= null && !i.getUrl.isEmpty) r.getInstance().asScala.filter(i => i.getUrl!= null && !i.getUrl.isEmpty)
.filter(i => i.getPid!= null && i.getUrl != null)
.flatMap(i => findURLForPID(i.getPid.asScala.toList, i.getUrl.asScala.toList)) .flatMap(i => findURLForPID(i.getPid.asScala.toList, i.getUrl.asScala.toList))
.map(i => new ScholixIdentifier(i._1.getValue, i._1.getQualifier.getClassid, i._2)).distinct.toList .map(i => new ScholixIdentifier(i._1.getValue, i._1.getQualifier.getClassid, i._2)).distinct.toList
} }

View File

@ -24,7 +24,7 @@
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.shuffle.partitions=8000 --conf spark.sql.shuffle.partitions=15000
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}

View File

@ -54,7 +54,7 @@
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.shuffle.partitions=5000 --conf spark.sql.shuffle.partitions=20000
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
@ -79,7 +79,7 @@
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.shuffle.partitions=6000 --conf spark.sql.shuffle.partitions=20000
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}

View File

@ -65,7 +65,7 @@ FROM ${stats_db_name}.project_tmp p
UPDATE ${stats_db_name}.publication_tmp UPDATE ${stats_db_name}.publication_tmp
SET delayed = 'yes' SET delayed = 'yes'
WHERE publication_tmp.id IN (SELECT distinct r.id WHERE publication_tmp.id IN (SELECT distinct r.id
FROM stats_wf_db_obs.result r, FROM ${stats_db_name}.result r,
${stats_db_name}.project_results pr, ${stats_db_name}.project_results pr,
${stats_db_name}.project_tmp p ${stats_db_name}.project_tmp p
WHERE r.id = pr.result WHERE r.id = pr.result
@ -75,7 +75,7 @@ WHERE publication_tmp.id IN (SELECT distinct r.id
UPDATE ${stats_db_name}.dataset_tmp UPDATE ${stats_db_name}.dataset_tmp
SET delayed = 'yes' SET delayed = 'yes'
WHERE dataset_tmp.id IN (SELECT distinct r.id WHERE dataset_tmp.id IN (SELECT distinct r.id
FROM stats_wf_db_obs.result r, FROM ${stats_db_name}.result r,
${stats_db_name}.project_results pr, ${stats_db_name}.project_results pr,
${stats_db_name}.project_tmp p ${stats_db_name}.project_tmp p
WHERE r.id = pr.result WHERE r.id = pr.result