forked from D-Net/dnet-hadoop
implement first version of scholexplorer integration for the generation of final graph
This commit is contained in:
parent
7fa49f6956
commit
7bd224f051
|
@ -59,7 +59,7 @@ object SparkConvertRDDtoDataset {
|
||||||
log.info("Converting Relation")
|
log.info("Converting Relation")
|
||||||
|
|
||||||
|
|
||||||
val rddRelation =spark.sparkContext.textFile(s"$sourcePath/relation").map(s => mapper.readValue(s, classOf[Relation]))
|
val rddRelation =spark.sparkContext.textFile(s"$sourcePath/relation").map(s => mapper.readValue(s, classOf[Relation])).filter(r=> r.getSource.startsWith("50") && r.getTarget.startsWith("50"))
|
||||||
spark.createDataset(rddRelation).as[Relation].write.mode(SaveMode.Overwrite).save(s"$relPath")
|
spark.createDataset(rddRelation).as[Relation].write.mode(SaveMode.Overwrite).save(s"$relPath")
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -51,10 +51,14 @@ object SparkCreateScholix {
|
||||||
|
|
||||||
relationDS.joinWith(summaryDS, relationDS("_1").equalTo(summaryDS("_1")), "left")
|
relationDS.joinWith(summaryDS, relationDS("_1").equalTo(summaryDS("_1")), "left")
|
||||||
.map { input: ((String, Relation), (String, ScholixSummary)) =>
|
.map { input: ((String, Relation), (String, ScholixSummary)) =>
|
||||||
|
if (input._1!= null && input._2!= null) {
|
||||||
val rel: Relation = input._1._2
|
val rel: Relation = input._1._2
|
||||||
val source: ScholixSummary = input._2._2
|
val source: ScholixSummary = input._2._2
|
||||||
(rel.getTarget, ScholixUtils.scholixFromSource(rel, source))
|
(rel.getTarget, ScholixUtils.scholixFromSource(rel, source))
|
||||||
|
}
|
||||||
|
else null
|
||||||
}(Encoders.tuple(Encoders.STRING, scholixEncoder))
|
}(Encoders.tuple(Encoders.STRING, scholixEncoder))
|
||||||
|
.filter(r => r!= null)
|
||||||
.write.mode(SaveMode.Overwrite).save(s"$targetPath/scholix_from_source")
|
.write.mode(SaveMode.Overwrite).save(s"$targetPath/scholix_from_source")
|
||||||
|
|
||||||
val scholixSource: Dataset[(String, Scholix)] = spark.read.load(s"$targetPath/scholix_from_source").as[(String, Scholix)](Encoders.tuple(Encoders.STRING, scholixEncoder))
|
val scholixSource: Dataset[(String, Scholix)] = spark.read.load(s"$targetPath/scholix_from_source").as[(String, Scholix)](Encoders.tuple(Encoders.STRING, scholixEncoder))
|
||||||
|
|
|
@ -289,7 +289,7 @@ object ScholixUtils {
|
||||||
if (r.getInstance() == null || r.getInstance().isEmpty)
|
if (r.getInstance() == null || r.getInstance().isEmpty)
|
||||||
return List()
|
return List()
|
||||||
r.getInstance().asScala.filter(i => i.getUrl!= null && !i.getUrl.isEmpty)
|
r.getInstance().asScala.filter(i => i.getUrl!= null && !i.getUrl.isEmpty)
|
||||||
|
.filter(i => i.getPid!= null && i.getUrl != null)
|
||||||
.flatMap(i => findURLForPID(i.getPid.asScala.toList, i.getUrl.asScala.toList))
|
.flatMap(i => findURLForPID(i.getPid.asScala.toList, i.getUrl.asScala.toList))
|
||||||
.map(i => new ScholixIdentifier(i._1.getValue, i._1.getQualifier.getClassid, i._2)).distinct.toList
|
.map(i => new ScholixIdentifier(i._1.getValue, i._1.getQualifier.getClassid, i._2)).distinct.toList
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,7 +54,7 @@
|
||||||
--executor-cores=${sparkExecutorCores}
|
--executor-cores=${sparkExecutorCores}
|
||||||
--driver-memory=${sparkDriverMemory}
|
--driver-memory=${sparkDriverMemory}
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
--conf spark.sql.shuffle.partitions=5000
|
--conf spark.sql.shuffle.partitions=20000
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
@ -79,7 +79,7 @@
|
||||||
--executor-cores=${sparkExecutorCores}
|
--executor-cores=${sparkExecutorCores}
|
||||||
--driver-memory=${sparkDriverMemory}
|
--driver-memory=${sparkDriverMemory}
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
--conf spark.sql.shuffle.partitions=6000
|
--conf spark.sql.shuffle.partitions=20000
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
|
Loading…
Reference in New Issue