fixed issue

This commit is contained in:
Miriam Baglioni 2023-11-13 11:30:02 +01:00
parent d32b0861a2
commit dbfd744f9c
5 changed files with 23 additions and 18 deletions

View File

@ -101,9 +101,6 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable {
.joinWith(relation, result.col("id").equalTo(relation.col("source")))
.map((MapFunction<Tuple2<Result, Relation>, Relation>) t2 -> t2._2(), Encoders.bean(Relation.class));
eoscRelation
.foreach((ForeachFunction<Relation>) r -> System.out.println(new ObjectMapper().writeValueAsString(r)));
// from eoscRelation select the organization
eoscRelation
.joinWith(organization, eoscRelation.col("target").equalTo(organization.col("id")))

View File

@ -77,17 +77,6 @@ public class SelectEoscResultsJobStep1 implements Serializable {
String inputPath, String outputPath,
Class<R> inputClazz, String communityMapPath, String eoscDatasourceIdsPath) {
// final StructType structureSchema = new StructType()
// .add("eoscId", DataTypes.StringType)
// .add("graphId", DataTypes.StringType)
// .add("graphName", DataTypes.StringType);
//
// // .fromDDL("`graphId`: STRING, `eoscId`:STRING");
// org.apache.spark.sql.Dataset<Row> df = spark
// .read()
// .schema(structureSchema)
// .json(eoscDatasourceIdsPath);
List<MasterDuplicate> df = Utils
.readPath(spark, eoscDatasourceIdsPath, MasterDuplicate.class)
.collectAsList();

View File

@ -112,7 +112,7 @@ public class SparkUpdateProjectInfo implements Serializable {
Dataset<Project> project = Utils.readPath(spark, inputPath + "/project", Project.class);
Dataset<String> projectIds = result
.joinWith(resultProject, result.col("id").equalTo(resultProject.col("resultId")))
.joinWith(resultProject, result.col("id").equalTo(resultProject.col("resultId")), "left")
.flatMap(
(FlatMapFunction<Tuple2<Result, ResultProject>, String>) t2 -> t2
._2()
@ -134,7 +134,13 @@ public class SparkUpdateProjectInfo implements Serializable {
.option("compression", "gzip")
.json(outputPath + "project");
resultProject
result
.joinWith(
resultProject, result.col("id").equalTo(resultProject.col("resultId")),
"left")
.map(
(MapFunction<Tuple2<Result, ResultProject>, ResultProject>) t2 -> t2._2(),
Encoders.bean(ResultProject.class))
.flatMap(
(FlatMapFunction<ResultProject, Relation>) rp -> rp
.getProjectsList()

View File

@ -0,0 +1,15 @@
#PROPERTIES FOR EOSC DUMP
sourcePath=/tmp/prod_provision/graph/18_graph_blacklisted/
outputPath=/tmp/miriam/graph_dumps/eosc_prod_extended
#accessToken for the openaire sandbox following
accessToken=OzzOsyucEIHxCEfhlpsMo3myEiwpCza3trCRL7ddfGTAK9xXkIP2MbXd6Vg4
connectionUrl=https://sandbox.zenodo.org/api/deposit/depositions
singleDeposition=false
conceptRecordId=1094304
depositionType=version
metadata=""
depositionId=6616871
removeSet=merges;isMergedIn
postgresURL=jdbc:postgresql://postgresql.services.openaire.eu:5432/dnet_openaireplus
postgresUser=dnet
postgresPassword=dnetPwd

View File

@ -225,8 +225,6 @@
<error to="Kill"/>
</action>
<action name="dump_eosc_orp">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>