fixed issue. Need to extend wf

This commit is contained in:
Miriam Baglioni 2023-11-14 11:33:50 +01:00
parent fd242c1c87
commit 0f602bae9d
2 changed files with 37 additions and 20 deletions

View File

@ -76,13 +76,13 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable {
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, workingPath + resultType + "extendedaffiliation");
addOrganizations(spark, inputPath, workingPath, outputPath, resultType);
dumpOrganizationAndRelations(spark, inputPath, workingPath, outputPath, resultType);
addOrganizations(spark, inputPath, workingPath, resultType);
dumpOrganizationAndRelations(spark, inputPath, workingPath, resultType);
});
}
private static void dumpOrganizationAndRelations(SparkSession spark, String inputPath, String workingPath,
String outputPath, String resultType) {
String resultType) {
Dataset<Relation> relation = Utils
.readPath(spark, inputPath + "/relation", Relation.class)
.filter(
@ -102,36 +102,46 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable {
.map((MapFunction<Tuple2<Result, Relation>, Relation>) t2 -> t2._2(), Encoders.bean(Relation.class));
// from eoscRelation select the organization
eoscRelation
Dataset<String> organizationIds = eoscRelation
.joinWith(organization, eoscRelation.col("target").equalTo(organization.col("id")))
.map(
(MapFunction<Tuple2<Relation, Organization>, eu.dnetlib.dhp.eosc.model.Organization>) t2 -> mapOrganization(
(MapFunction<Tuple2<Relation, Organization>, String>) t2 -> t2._2().getId(),
Encoders.STRING())
.distinct();
organizationIds
.joinWith(organization, organizationIds.col("value").equalTo(organization.col("id")))
.map(
(MapFunction<Tuple2<String, Organization>, eu.dnetlib.dhp.eosc.model.Organization>) t2 -> mapOrganization(
t2._2()),
Encoders.bean(eu.dnetlib.dhp.eosc.model.Organization.class))
.groupByKey((MapFunction<eu.dnetlib.dhp.eosc.model.Organization, String>) o -> o.getId(), Encoders.STRING())
.mapGroups(
(MapGroupsFunction<String, eu.dnetlib.dhp.eosc.model.Organization, eu.dnetlib.dhp.eosc.model.Organization>) (
k, v) -> v.next(),
Encoders.bean(eu.dnetlib.dhp.eosc.model.Organization.class))
.filter(Objects::nonNull)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "organization");
.json(workingPath + resultType + "/organization");
eoscRelation
.joinWith(organization, eoscRelation.col("target").equalTo(organization.col("id")))
.map(
(MapFunction<Tuple2<Relation, Organization>, eu.dnetlib.dhp.eosc.model.Relation>) t2 -> eu.dnetlib.dhp.eosc.model.Relation
.newInstance(t2._1().getSource(), t2._1().getTarget()),
(MapFunction<Tuple2<Relation, Organization>, eu.dnetlib.dhp.eosc.model.Relation>) t2 -> {
if (isToBeDumpedOrg(t2._2()))
return eu.dnetlib.dhp.eosc.model.Relation
.newInstance(t2._1().getSource(), t2._1().getTarget());
return null;
},
Encoders.bean(eu.dnetlib.dhp.eosc.model.Relation.class))
.filter(Objects::nonNull)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "resultOrganization");
.json(workingPath + resultType + "/resultOrganization");
}
private static void addOrganizations(SparkSession spark, String inputPath, String workingPath, String outputPath,
private static void addOrganizations(SparkSession spark, String inputPath, String workingPath,
String resultType) {
Dataset<Result> results = Utils
@ -219,10 +229,7 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable {
private static eu.dnetlib.dhp.eosc.model.Organization mapOrganization(Organization org) {
if (Boolean.TRUE.equals(org.getDataInfo().getDeletedbyinference()))
return null;
if (!Optional.ofNullable(org.getLegalname()).isPresent()
&& !Optional.ofNullable(org.getLegalshortname()).isPresent())
if (isToBeDumpedOrg(org))
return null;
eu.dnetlib.dhp.eosc.model.Organization organization = new eu.dnetlib.dhp.eosc.model.Organization();
@ -278,4 +285,13 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable {
return organization;
}
private static boolean isToBeDumpedOrg(Organization org) {
if (Boolean.TRUE.equals(org.getDataInfo().getDeletedbyinference()))
return true;
if (!Optional.ofNullable(org.getLegalname()).isPresent()
&& !Optional.ofNullable(org.getLegalshortname()).isPresent())
return true;
return false;
}
}

View File

@ -12,4 +12,5 @@ depositionId=6616871
removeSet=merges;isMergedIn
postgresURL=jdbc:postgresql://postgresql.services.openaire.eu:5432/dnet_openaireplus
postgresUser=dnet
postgresPassword=dnetPwd
postgresPassword=dnetPwd
isLookUpUrl=http://services.openaire.eu:8280/is/services/isLookUp?wsdl