[orcipPropagation]rewritten in scala. generategraph again not abstract

This commit is contained in:
Miriam Baglioni 2024-12-20 12:40:33 +01:00
parent 1853da1e2c
commit 29611b2091
3 changed files with 121 additions and 143 deletions

View File

@ -1,16 +1,19 @@
package eu.dnetlib.dhp.common.author package eu.dnetlib.dhp.common.author
import eu.dnetlib.dhp.application.AbstractScalaApplication import eu.dnetlib.dhp.application.AbstractScalaApplication
import eu.dnetlib.dhp.schema.common.{ModelConstants, ModelSupport} import eu.dnetlib.dhp.schema.common.{EntityType, ModelConstants, ModelSupport}
import eu.dnetlib.dhp.utils.{MatchData, ORCIDAuthorEnricher, ORCIDAuthorEnricherResult} import eu.dnetlib.dhp.utils.{MatchData, ORCIDAuthorEnricher, ORCIDAuthorEnricherResult}
import org.apache.spark.sql._ import org.apache.spark.sql._
import org.apache.spark.sql.functions._ import org.apache.spark.sql.functions._
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}
import eu.dnetlib.dhp.common.enrichment.Constants.PROPAGATION_DATA_INFO_TYPE import eu.dnetlib.dhp.common.enrichment.Constants.PROPAGATION_DATA_INFO_TYPE
import eu.dnetlib.dhp.schema.oaf.Result import eu.dnetlib.dhp.schema.oaf.{OafEntity, Result}
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils
import org.apache.spark.api.java.function.{MapFunction, MapGroupsFunction}
import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.expressions.Aggregator
import java.util
import java.util.Iterator
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
abstract class SparkEnrichWithOrcidAuthors(propertyPath: String, args: Array[String], log: Logger) abstract class SparkEnrichWithOrcidAuthors(propertyPath: String, args: Array[String], log: Logger)
@ -41,7 +44,57 @@ abstract class SparkEnrichWithOrcidAuthors(propertyPath: String, args: Array[Str
generateGraph(spark, graphPath, workingDir, targetPath) generateGraph(spark, graphPath, workingDir, targetPath)
} }
def generateGraph(spark: SparkSession, graphPath: String, workingDir: String, targetPath: String): Unit private def processAndMerge(spark: SparkSession, inputPath: String, outputPath: String, clazz: Class[Result], encoder: Encoder[Result]): Unit = {
var tmp = spark.read
.schema(Encoders.bean(clazz).schema)
.json(inputPath).as(encoder)
tmp.groupByKey(r => r.getId)(Encoders.STRING)
.mapGroups((k, it) => {
val p: Result = it.next
it.foldLeft(p.getAuthor)((x,r) => MergeUtils.mergeAuthors(x, r.getAuthor,0))
p
})(encoder).write.mode(SaveMode.Overwrite).option("compression", "gzip").json(outputPath)
}
private def generateGraph(spark: SparkSession, graphPath: String, workingDir: String, targetPath: String): Unit = {
ModelSupport.entityTypes
.keySet().asScala
.filter(ModelSupport.isResult).foreach((e: EntityType) => {
val resultClazz: Class[Result] = ModelSupport.entityTypes.get(e).asInstanceOf[Class[Result]]
val matched: Dataset[Row] = spark.read
.schema(Encoders.bean(classOf[ORCIDAuthorEnricherResult]).schema)
.parquet(workingDir + "/" + e.name + "_matched")
.selectExpr("id", "enriched_author")
val result: Dataset[Row] = spark.read
.schema(Encoders.bean(resultClazz).schema)
.json(graphPath + "/" + e.name)
result.join(matched, Seq("id"), "left")
.withColumn("author", when(size(col("enriched_author")).gt(0), col("enriched_author"))
.otherwise(col("author")))
.drop("enriched_author").as(Encoders.bean(resultClazz))
.groupByKey(r => r.getId)(Encoders.STRING)
.mapGroups((k, it) => {
val p: Result = it.next
it.foldLeft(p.getAuthor)((x,r) => MergeUtils.mergeAuthors(x, r.getAuthor,0))
p
})(Encoders.bean(resultClazz))
.write
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(targetPath + "/" + e.name )
})
}
// def generateGraph(spark: SparkSession, graphPath: String, workingDir: String, targetPath: String): Unit
def createTemporaryData(spark: SparkSession, graphPath: String, orcidPath: String, targetPath: String): Unit def createTemporaryData(spark: SparkSession, graphPath: String, orcidPath: String, targetPath: String): Unit

View File

@ -89,112 +89,68 @@ public class SparkPropagateOrcidAuthor extends SparkEnrichWithOrcidAuthors {
} }
private <T> void processAndMerge( // private <T extends Result> void processAndMerge(
SparkSession spark, // SparkSession spark,
String inputPath, // String inputPath,
String outputPath, // String outputPath,
Class<T> clazz, // Class<T> clazz,
Encoder<T> encoder) { // Encoder<T> encoder) {
//
spark.read() // spark.read()
.schema(Encoders.bean(clazz).schema()) // .schema(Encoders.bean(clazz).schema())
.json(inputPath) // .json(inputPath)
.as(encoder) // .as(encoder)
.groupByKey((MapFunction<T, String>) p -> { // .groupByKey((MapFunction<T, String>) OafEntity::getId, Encoders.STRING())
try { // .mapGroups((MapGroupsFunction<String, T, T>) (k, it) -> {
return (String) clazz.getMethod("getId").invoke(p); // T p = it.next();
} catch (Exception e) { // it.forEachRemaining(r -> p.setAuthor(MergeUtils.mergeAuthors(p.getAuthor(),r.getAuthor(),0)));
throw new RuntimeException("Error invoking getId method", e); // return p;
} // }, encoder)
}, Encoders.STRING()) // .write()
.mapGroups((MapGroupsFunction<String, T, T>) (k, it) -> { // .mode(SaveMode.Overwrite)
T p = it.next(); // .option("compression", "gzip")
it.forEachRemaining(r -> { // .json(outputPath);
try { // }
List<Author> currentAuthors = (List<Author>) clazz.getMethod("getAuthor").invoke(p); // @Override
List<Author> newAuthors = (List<Author>) clazz.getMethod("getAuthor").invoke(r); //public void generateGraph(SparkSession spark, String graphPath, String workingDir, String targetPath){
//
clazz.getMethod("setAuthor", List.class) // ModelSupport.entityTypes.keySet().stream().filter(ModelSupport::isResult)
.invoke(p, MergeUtils.mergeAuthors(currentAuthors, newAuthors, 0)); // .forEach(e -> {
} catch (Exception e) { // Class resultClazz = ModelSupport.entityTypes.get(e);
throw new RuntimeException("Error merging authors", e); // Dataset<Row> matched = spark
} // .read()
}); // .schema(Encoders.bean(ORCIDAuthorEnricherResult.class).schema())
return p; // .parquet(workingDir + "/" + e.name() + "_matched")
}, encoder) // .selectExpr("id","enriched_author");
.write() // Dataset<Row> result = spark.read().schema(Encoders.bean(resultClazz).schema())
.mode(SaveMode.Overwrite) // .json(graphPath + "/" + e.name());
.option("compression", "gzip") //
.json(outputPath); //
} //
@Override // result.join(matched, result.col("id").equalTo(matched.col("id")), "left")
public void generateGraph(SparkSession spark, String graphPath, String workingDir, String targetPath){ // .withColumn(
// "author",
ModelSupport.entityTypes.keySet().stream().filter(ModelSupport::isResult) // when(size(col("enriched_author")).gt(0), col("enriched_author"))
.forEach(e -> { // .otherwise(col("author"))
Class resultClazz = ModelSupport.entityTypes.get(e); // )
Dataset<Row> matched = spark // .drop(matched.col("id"))
.read() // .drop("enriched_author")
.schema(Encoders.bean(ORCIDAuthorEnricherResult.class).schema()) // .write()
.parquet(workingDir + "/" + e.name() + "_matched") // .mode(SaveMode.Overwrite)
.selectExpr("id","enriched_author"); // .option("compression", "gzip")
Dataset<Row> result = spark.read().schema(Encoders.bean(resultClazz).schema()) // .json(workingDir + "/" + e.name() + "/_tobemerged")
.json(graphPath + "/" + e.name()); // ;
// processAndMerge(
// spark,
// workingDir + "/" + e.name() + "/_tobemerged",
result.join(matched, result.col("id").equalTo(matched.col("id")), "left") // targetPath + "/" + e.name(),
.withColumn( // resultClazz,
"author", // Encoders.bean(resultClazz)
when(size(col("enriched_author")).gt(0), col("enriched_author")) // );
.otherwise(col("author")) // });
) //
.drop(matched.col("id")) //
.drop("enriched_author") // }
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingDir + "/" + e.name() + "/_tobemerged")
;
});
processAndMerge(
spark,
workingDir + "/publication/_tobemerged",
targetPath + "/publication",
Publication.class,
Encoders.bean(Publication.class)
);
processAndMerge(
spark,
workingDir + "/dataset/_tobemerged",
targetPath + "/dataset",
eu.dnetlib.dhp.schema.oaf.Dataset.class,
Encoders.bean(eu.dnetlib.dhp.schema.oaf.Dataset.class)
);
processAndMerge(
spark,
workingDir + "/otherresearchproduct/_tobemerged",
targetPath + "/otherresearchproduct",
OtherResearchProduct.class,
Encoders.bean(OtherResearchProduct.class)
);
processAndMerge(
spark,
workingDir + "/software/_tobemerged",
targetPath + "/software",
Software.class,
Encoders.bean(Software.class)
);
}
@Override @Override
public void createTemporaryData(SparkSession spark, String graphPath, String orcidPath, String targetPath) { public void createTemporaryData(SparkSession spark, String graphPath, String orcidPath, String targetPath) {

View File

@ -87,37 +87,6 @@ class SparkEnrichGraphWithOrcidAuthors(propertyPath: String, args: Array[String]
orcidWorksWithAuthors.unpersist() orcidWorksWithAuthors.unpersist()
} }
override def generateGraph(spark: SparkSession, graphPath: String, workingDir: String, targetPath: String): Unit = {
ModelSupport.entityTypes.asScala
.filter(e => ModelSupport.isResult(e._1))
.foreach(e => {
val resultType = e._1.name()
val enc = Encoders.bean(e._2)
val matched = spark.read
.schema(Encoders.bean(classOf[ORCIDAuthorEnricherResult]).schema)
.parquet(s"${workingDir}/${resultType}_matched")
.selectExpr("id", "enriched_author")
spark.read
.schema(enc.schema)
.json(s"$graphPath/$resultType")
.join(matched, Seq("id"), "left")
.withColumn(
"author",
when(size(col("enriched_author")).gt(0), col("enriched_author"))
.otherwise(col("author"))
)
.drop("enriched_author")
.write
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(s"${targetPath}/${resultType}")
})
}
} }
object SparkEnrichGraphWithOrcidAuthors { object SparkEnrichGraphWithOrcidAuthors {