[orcidenrichment] fixing issue

This commit is contained in:
Miriam Baglioni 2024-11-06 16:36:34 +01:00
parent e4f89f9800
commit 227e84be99
2 changed files with 9 additions and 9 deletions

View File

@ -33,7 +33,7 @@ public class SparkPropagateOrcidAuthor extends SparkEnrichWithOrcidAuthors {
// Create instance and run the Spark application // Create instance and run the Spark application
SparkPropagateOrcidAuthor app = new SparkPropagateOrcidAuthor("/eu/dnetlib/dhp/wf/subworkflows/orcidtoresultfromsemrel/input_orcidtoresult_parameters.json", args, log); SparkPropagateOrcidAuthor app = new SparkPropagateOrcidAuthor("/eu/dnetlib/dhp/wf/subworkflows/orcidtoresultfromsemrel/input_orcidtoresult_parameters.json", args, log);
app.run(); app.initialize().run();
} }
@ -67,7 +67,7 @@ public class SparkPropagateOrcidAuthor extends SparkEnrichWithOrcidAuthors {
.keySet().stream().filter(ModelSupport::isResult) .keySet().stream().filter(ModelSupport::isResult)
.forEach(e -> { .forEach(e -> {
Dataset<Row> orcidDnet = spark.read().schema(Encoders.bean(Result.class).schema()) Dataset<Row> orcidDnet = spark.read().schema(Encoders.bean(Result.class).schema())
.json(graphPath + e.name()) .json(graphPath + "/"+ e.name())
.as(Encoders.bean(Result.class)) .as(Encoders.bean(Result.class))
.filter((FilterFunction<Result>) r -> r.getAuthor().stream() .filter((FilterFunction<Result>) r -> r.getAuthor().stream()
.anyMatch(a -> a.getPid() .anyMatch(a -> a.getPid()
@ -80,13 +80,13 @@ public class SparkPropagateOrcidAuthor extends SparkEnrichWithOrcidAuthors {
.selectExpr("_1 as target", "_2 as orcid_authors"); .selectExpr("_1 as target", "_2 as orcid_authors");
Dataset<Row> result = spark.read().schema(Encoders.bean(Result.class).schema()) Dataset<Row> result = spark.read().schema(Encoders.bean(Result.class).schema())
.json(graphPath + e.name()) .json(graphPath + "/"+ e.name())
.as(Encoders.bean(Result.class)) .as(Encoders.bean(Result.class))
.selectExpr("id", "author as graph_authors"); .selectExpr("id", "author as graph_authors");
Dataset<Row> supplements = spark.read() Dataset<Row> supplements = spark.read()
.schema(Encoders.bean(Relation.class).schema()) .schema(Encoders.bean(Relation.class).schema())
.json(graphPath + "relation") .json(graphPath + "/"+ "relation")
.where("relclass IN('" + ModelConstants.IS_SUPPLEMENT_TO + "', '" + .where("relclass IN('" + ModelConstants.IS_SUPPLEMENT_TO + "', '" +
ModelConstants.IS_SUPPLEMENTED_BY + "')") ModelConstants.IS_SUPPLEMENTED_BY + "')")
.selectExpr("source as id", "target"); .selectExpr("source as id", "target");
@ -98,7 +98,7 @@ public class SparkPropagateOrcidAuthor extends SparkEnrichWithOrcidAuthors {
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.parquet(targetPath + e.name() + "_unmatched"); .parquet(targetPath + "/"+ e.name() + "_unmatched");
}); });
} }

View File

@ -112,10 +112,10 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=8000 --conf spark.sql.shuffle.partitions=8000
</spark-opts> </spark-opts>
<arg>--graphPath</arg><arg>${sourcePath}/</arg> <arg>--graphPath</arg><arg>${sourcePath}</arg>
<arg>--orcidPath</arg><arg>${sourcePath}/</arg> <arg>--orcidPath</arg><arg>${sourcePath}</arg>
<arg>--workingDir</arg><arg>${workingDir}/</arg> <arg>--workingDir</arg><arg>${workingDir}</arg>
<arg>--targetPath</arg><arg>${outputPath}/</arg> <arg>--targetPath</arg><arg>${outputPath}</arg>
<arg>--matchingSource</arg><arg>graph</arg> <arg>--matchingSource</arg><arg>graph</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>