[orcidPropagatio] -
This commit is contained in:
parent
3021dfda77
commit
a9ccd00483
|
@ -621,7 +621,7 @@ public class MergeUtils {
|
|||
return m;
|
||||
}
|
||||
|
||||
private static List<Author> mergeAuthors(List<Author> author, List<Author> author1, int trust) {
|
||||
public static List<Author> mergeAuthors(List<Author> author, List<Author> author1, int trust) {
|
||||
List<List<Author>> authors = new ArrayList<>();
|
||||
if (author != null) {
|
||||
authors.add(author);
|
||||
|
|
|
@ -47,7 +47,8 @@ abstract class SparkEnrichWithOrcidAuthors(propertyPath: String, args: Array[Str
|
|||
.filter(e => ModelSupport.isResult(e._1))
|
||||
.foreach(e => {
|
||||
val resultType = e._1.name()
|
||||
val enc = Encoders.bean(e._2)
|
||||
val clazz = e._2
|
||||
val enc = Encoders.bean(clazz)
|
||||
|
||||
val matched = spark.read
|
||||
.schema(Encoders.bean(classOf[ORCIDAuthorEnricherResult]).schema)
|
||||
|
@ -65,13 +66,34 @@ abstract class SparkEnrichWithOrcidAuthors(propertyPath: String, args: Array[Str
|
|||
when(size(col("enriched_author")).gt(0), col("enriched_author"))
|
||||
.otherwise(col("author"))
|
||||
)
|
||||
.drop("enriched_author").as[Result]
|
||||
.groupByKey(r => r.getId)(Encoders[String])
|
||||
.mapGroups((key: String, group: Iterator[Result]) => {
|
||||
var r = group.next()
|
||||
group.foreach(r1 => r = MergeUtils.mergeResult(r,r1))
|
||||
r
|
||||
}).as[Result]
|
||||
.drop("enriched_author")
|
||||
.write
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(s"${workingDir}/${resultType}_tobemerged")
|
||||
|
||||
|
||||
spark.read.schema(enc.schema)
|
||||
.json(s"${workingDir}/${resultType}_tobemerged")
|
||||
.as(enc)
|
||||
// .map(r => r.asInstanceOf[enc])(oafEntityEncoder)
|
||||
.groupByKey(r => clazz.getMethod("getId").invoke(r).toString)(Encoders.STRING)
|
||||
.mapGroups { (key: String, group: Iterator[Any]) =>
|
||||
// Recupera il primo elemento e i metodi richiesti
|
||||
|
||||
val firstRecord = group.next()
|
||||
val getAuthor = clazz.getMethod("getAuthor")
|
||||
val setAuthor = clazz.getMethod("setAuthor", classOf[java.util.List[_]])
|
||||
|
||||
// Fusione degli autori
|
||||
val authors = group.foldLeft(getAuthor.invoke(firstRecord).asInstanceOf[java.util.List[eu.dnetlib.dhp.schema.oaf.Author]]) { (acc, currentRecord) =>
|
||||
MergeUtils.mergeAuthors(getAuthor.invoke(currentRecord).asInstanceOf[java.util.List[eu.dnetlib.dhp.schema.oaf.Author]], acc, 0)
|
||||
}
|
||||
|
||||
// Aggiorna il record con gli autori fusi
|
||||
setAuthor.invoke(firstRecord, authors)
|
||||
firstRecord
|
||||
}(enc)
|
||||
.write
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
|
|
|
@ -7,9 +7,12 @@ import java.util.Optional;
|
|||
import java.util.stream.Collectors;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils;
|
||||
import eu.dnetlib.dhp.utils.ORCIDAuthorEnricherResult;
|
||||
import org.apache.spark.api.java.function.FilterFunction;
|
||||
import org.apache.spark.api.java.function.ForeachFunction;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.api.java.function.MapGroupsFunction;
|
||||
import org.apache.spark.sql.*;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -22,6 +25,7 @@ import eu.dnetlib.dhp.schema.oaf.Relation;
|
|||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.utils.OrcidAuthor;
|
||||
import scala.Tuple2;
|
||||
import static org.apache.spark.sql.functions.*;
|
||||
|
||||
public class SparkPropagateOrcidAuthor extends SparkEnrichWithOrcidAuthors {
|
||||
private static final Logger log = LoggerFactory.getLogger(SparkPropagateOrcidAuthor.class);
|
||||
|
@ -85,6 +89,46 @@ public class SparkPropagateOrcidAuthor extends SparkEnrichWithOrcidAuthors {
|
|||
return orcid;
|
||||
|
||||
}
|
||||
public void generateGraph(SparkSession spark, String graphPath, String workingDir, String targetPath){
|
||||
|
||||
ModelSupport.entityTypes.keySet().stream().filter(ModelSupport::isResult)
|
||||
.forEach(e -> {
|
||||
Class resultClazz = ModelSupport.entityTypes.get(e);
|
||||
Dataset<Row> matched = spark
|
||||
.read()
|
||||
.schema(Encoders.bean(ORCIDAuthorEnricherResult.class).schema())
|
||||
.parquet(workingDir + "/" + e.name() + "_matched")
|
||||
.selectExpr("id","enriched_author");
|
||||
|
||||
Dataset<Row> result = spark.read().schema(Encoders.bean(resultClazz).schema())
|
||||
.json(graphPath + "/" + e.name());
|
||||
result.joinWith(matched, result.col("id").equalTo(matched.col("id")), "left")
|
||||
.withColumn(
|
||||
"author",
|
||||
when(size(col("enriched_author")).gt(0), col("enriched_author"))
|
||||
.otherwise(col("author"))
|
||||
)
|
||||
.drop("enriched_author")
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(workingDir + "/_tobemerged")
|
||||
;
|
||||
|
||||
spark.read().schema(Encoders.bean(resultClazz).schema())
|
||||
.json(workingDir + "/_tobemerged").as(Encoders.bean(resultClazz))
|
||||
.groupByKey((MapFunction<R, String> )r -> ((R) r).getId(),Encoders.STRING() )
|
||||
.mapGroups((MapGroupsFunction<String, R,R>) (k,it) -> {
|
||||
R res = it.next();
|
||||
it.forEachRemaining(x -> res.setAuthor(MergeUtils.mergeAuthors(res.getAuthor(), x.getAuthor(), 0)));
|
||||
return res;
|
||||
}, Encoders.bean(resultClazz) )
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression","gzip")
|
||||
.json(targetPath + "/" + e.name());
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createTemporaryData(SparkSession spark, String graphPath, String orcidPath, String targetPath) {
|
||||
|
|
Loading…
Reference in New Issue