Task required for running on kubernetes and for running incremental generation of the graph

This commit is contained in:
Giambattista Bloisi 2024-10-30 11:22:04 +01:00
parent 398edcd894
commit e221ab864b
6 changed files with 324 additions and 0 deletions

View File

@ -0,0 +1,108 @@
package eu.dnetlib.dhp.oa.merge;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Arrays;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
/**
* Copy specified entities from a graph snapshot to another
*/
public class CopyEntitiesSparkJob {
private static final Logger log = LoggerFactory.getLogger(CopyEntitiesSparkJob.class);
private ArgumentApplicationParser parser;
public CopyEntitiesSparkJob(ArgumentApplicationParser parser) {
this.parser = parser;
}
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
CopyEntitiesSparkJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/merge/copy_graph_entities_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
new CopyEntitiesSparkJob(parser).run(isSparkSessionManaged);
}
public void run(Boolean isSparkSessionManaged)
throws ISLookUpException {
String graphInputPath = parser.get("graphInputPath");
log.info("graphInputPath: {}", graphInputPath);
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
String entities = parser.get("entities");
log.info("entities: {}", entities);
String format = parser.get("format");
log.info("format: {}", format);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
Arrays
.stream(entities.split(","))
.map(x -> x.trim().toLowerCase())
.filter(ModelSupport.oafTypes::containsKey)
.forEachOrdered(
entity -> {
switch (format.toLowerCase()) {
case "text":
spark
.read()
.text(graphInputPath + "/" + entity)
.write()
.option("compression", "gzip")
.mode("overwrite")
.text(outputPath + "/" + entity);
break;
case "json":
spark
.read()
.json(graphInputPath + "/" + entity)
.write()
.option("compression", "gzip")
.mode("overwrite")
.json(outputPath + "/" + entity);
break;
case "parquet":
spark
.read()
.parquet(graphInputPath + "/" + entity)
.write()
.option("compression", "gzip")
.mode("overwrite")
.parquet(outputPath + "/" + entity);
break;
}
});
});
}
}

View File

@ -0,0 +1,32 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "gin",
"paramLongName": "graphInputPath",
"paramDescription": "the input graph root path",
"paramRequired": true
},
{
"paramName": "out",
"paramLongName": "outputPath",
"paramDescription": "the output graph root path",
"paramRequired": true
},
{
"paramName": "ent",
"paramLongName": "entities",
"paramDescription": "the output graph root path",
"paramRequired": true
},
{
"paramName": "fmt",
"paramLongName": "format",
"paramDescription": "the output graph root path",
"paramRequired": true
}
]

View File

@ -0,0 +1,14 @@
[
{
"paramName": "p",
"paramLongName": "inputPath",
"paramDescription": "the path where delta is stored",
"paramRequired": true
},
{
"paramName": "o",
"paramLongName": "graphOutputPath",
"paramDescription": "the path where store the graph",
"paramRequired": true
}
]

View File

@ -0,0 +1,5 @@
[
{"paramName":"w", "paramLongName":"relationPath", "paramDescription": "the source Path", "paramRequired": true},
{"paramName":"g", "paramLongName":"graphBasePath", "paramDescription": "the path of the raw graph", "paramRequired": true},
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the target path", "paramRequired": true}
]

View File

@ -0,0 +1,83 @@
package eu.dnetlib.dhp.oa.graph.raw
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.common.HdfsSupport
import eu.dnetlib.dhp.schema.common.ModelSupport
import eu.dnetlib.dhp.schema.oaf.Oaf
import eu.dnetlib.dhp.utils.DHPUtils
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.parse
import org.slf4j.LoggerFactory
import scala.collection.JavaConverters._
import scala.io.Source
object CopyIncrementalOafSparkApplication {
def main(args: Array[String]): Unit = {
val log = LoggerFactory.getLogger(getClass)
val conf = new SparkConf()
val parser = new ArgumentApplicationParser(
Source
.fromInputStream(
getClass.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/copy_incremental_oaf_parameters.json")
)
.mkString
)
parser.parseArgument(args)
val spark =
SparkSession
.builder()
.config(conf)
.appName(getClass.getSimpleName)
.getOrCreate()
val sc: SparkContext = spark.sparkContext
val inputPath = parser.get("inputPath")
log.info("inputPath: {}", inputPath)
val graphOutputPath = parser.get("graphOutputPath")
log.info("graphOutputPath: {}", graphOutputPath)
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
val types = ModelSupport.oafTypes.entrySet.asScala
.map(e => Tuple2(e.getKey, e.getValue))
val oaf = spark.read.textFile(inputPath)
val mapper =
new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
types.foreach(t =>
oaf
.filter(o => isOafType(o, t._1))
.map(j => mapper.readValue(j, t._2).asInstanceOf[Oaf])
.map(s => mapper.writeValueAsString(s))(Encoders.STRING)
.write
.option("compression", "gzip")
.mode(SaveMode.Append)
.text(s"$graphOutputPath/${t._1}")
)
}
def isOafType(input: String, oafType: String): Boolean = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: org.json4s.JValue = parse(input)
if (oafType == "relation") {
val hasSource = (json \ "source").extractOrElse[String](null)
val hasTarget = (json \ "target").extractOrElse[String](null)
hasSource != null && hasTarget != null
} else {
val hasId = (json \ "id").extractOrElse[String](null)
val resultType = (json \ "resulttype" \ "classid").extractOrElse[String]("")
hasId != null && oafType.startsWith(resultType)
}
}
}

View File

@ -0,0 +1,82 @@
package eu.dnetlib.dhp.oa.graph.resolution
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.Relation
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{col, expr}
import org.slf4j.{Logger, LoggerFactory}
object SparkResolveRelationById {
def main(args: Array[String]): Unit = {
val log: Logger = LoggerFactory.getLogger(getClass)
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(
IOUtils.toString(
getClass.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/resolution/resolve_relationsbyid_params.json"
)
)
)
parser.parseArgument(args)
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.appName(getClass.getSimpleName)
.getOrCreate()
val graphBasePath = parser.get("graphBasePath")
log.info(s"graphBasePath -> $graphBasePath")
val relationPath = parser.get("relationPath")
log.info(s"relationPath -> $relationPath")
val targetPath = parser.get("targetPath")
log.info(s"targetPath -> $targetPath")
implicit val relEncoder: Encoder[Relation] = Encoders.bean(classOf[Relation])
val mergedrels =
spark.read.json(relationPath).where("relclass = 'merges'").selectExpr("source as dedupId", "target as mergedId")
spark.read
.schema(Encoders.bean(classOf[Relation]).schema)
.json(s"$graphBasePath/relation")
.as[Relation]
.map(r => resolveRelations(r))
.join(mergedrels, col("source") === mergedrels.col("mergedId"), "left")
.withColumn("source", expr("coalesce(dedupId, source)"))
.drop("mergedId", "dedupID")
.join(mergedrels, col("target") === mergedrels.col("mergedId"), "left")
.withColumn("target", expr("coalesce(dedupId, target)"))
.drop("mergedId", "dedupID")
.write
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(s"$targetPath/relation")
}
private def resolveRelations(r: Relation): Relation = {
if (r.getSource.startsWith("unresolved::"))
r.setSource(resolvePid(r.getSource.substring(12)))
if (r.getTarget.startsWith("unresolved::"))
r.setTarget(resolvePid(r.getTarget.substring(12)))
r
}
private def resolvePid(str: String): String = {
val parts = str.split("::")
val id = parts(0)
val scheme: String = parts.last match {
case "arxiv" => "arXiv"
case _ => parts.last
}
IdentifierFactory.idFromPid("50", scheme, id, true)
}
}