fixed last issues

This commit is contained in:
Miriam Baglioni 2023-11-17 16:49:06 +01:00
parent db388ebc21
commit 9a06a552c4
3 changed files with 58 additions and 19 deletions

View File

@ -10,7 +10,6 @@ import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
@ -20,8 +19,7 @@ import org.apache.spark.sql.SparkSession;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.spark_project.jetty.util.StringUtil;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.eosc.model.Affiliation;
@ -132,13 +130,14 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable {
.joinWith(organization, eoscRelation.col("target").equalTo(organization.col("id")))
.map(
(MapFunction<Tuple2<Relation, Organization>, eu.dnetlib.dhp.eosc.model.Relation>) t2 -> {
if (isToBeDumpedOrg(t2._2()))
return eu.dnetlib.dhp.eosc.model.Relation
.newInstance(t2._1().getSource(), t2._1().getTarget());
return null;
if (isToBeRemovedOrg(t2._2()))
return new eu.dnetlib.dhp.eosc.model.Relation();
return eu.dnetlib.dhp.eosc.model.Relation
.newInstance(t2._1().getSource(), t2._1().getTarget());
},
Encoders.bean(eu.dnetlib.dhp.eosc.model.Relation.class))
.filter(Objects::nonNull)
.filter((FilterFunction<eu.dnetlib.dhp.eosc.model.Relation>) r -> StringUtil.isNotBlank(r.getSource()))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
@ -163,19 +162,54 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable {
.filter((FilterFunction<Organization>) o -> !o.getDataInfo().getDeletedbyinference());
Dataset<ResultOrganizations> resultOrganization = relations
.joinWith(organizations, relations.col("source").equalTo(organizations.col("id")), "left")
.map((MapFunction<Tuple2<Relation, Organization>, ResultOrganizations>) t2 -> getResultOrganizations(t2), Encoders.bean(ResultOrganizations.class))
.filter(Objects::nonNull);
.joinWith(organizations, relations.col("source").equalTo(organizations.col("id")))
.map((MapFunction<Tuple2<Relation, Organization>, ResultOrganizations>) t2 -> {
ResultOrganizations rOrg = new ResultOrganizations();
System.out.println(resultOrganization.count());
if (t2._2() != null) {
rOrg.setResultId(t2._1().getTarget());
Affiliation org = new Affiliation();
org.setId(t2._2().getId());
if (Optional.ofNullable(t2._2().getLegalname()).isPresent()) {
org.setName(t2._2().getLegalname().getValue());
} else {
org.setName("");
}
HashMap<String, Set<String>> organizationPids = new HashMap<>();
if (Optional.ofNullable(t2._2().getPid()).isPresent())
t2._2().getPid().forEach(p -> {
if (!organizationPids.containsKey(p.getQualifier().getClassid()))
organizationPids.put(p.getQualifier().getClassid(), new HashSet<>());
organizationPids.get(p.getQualifier().getClassid()).add(p.getValue());
});
List<OrganizationPid> pids = new ArrayList<>();
for (String key : organizationPids.keySet()) {
for (String value : organizationPids.get(key)) {
OrganizationPid pid = new OrganizationPid();
pid.setValue(value);
pid.setType(key);
pids.add(pid);
}
}
org.setPid(pids);
rOrg.setAffiliation(org);
return rOrg;
}
return rOrg;
}, Encoders.bean(ResultOrganizations.class))
.filter((FilterFunction<ResultOrganizations>) ro -> ro.getResultId() != null);
// resultOrganization.count();
results
.joinWith(resultOrganization, results.col("id").equalTo(resultOrganization.col("resultId")), "left")
.groupByKey(
(MapFunction<Tuple2<Result, ResultOrganizations>, String>) t2 -> t2._1().getId(), Encoders.STRING())
.mapGroups(
(MapGroupsFunction<String, Tuple2<Result, ResultOrganizations>, Result>) (s, it) -> addAffiliation(it)
, Encoders.bean(Result.class))
(MapGroupsFunction<String, Tuple2<Result, ResultOrganizations>, Result>) (s, it) -> addAffiliation(it),
Encoders.bean(Result.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
@ -242,7 +276,7 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable {
private static eu.dnetlib.dhp.eosc.model.Organization mapOrganization(Organization org) {
if (isToBeDumpedOrg(org))
if (isToBeRemovedOrg(org))
return null;
eu.dnetlib.dhp.eosc.model.Organization organization = new eu.dnetlib.dhp.eosc.model.Organization();
@ -298,7 +332,7 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable {
return organization;
}
private static boolean isToBeDumpedOrg(Organization org) {
private static boolean isToBeRemovedOrg(Organization org) {
if (Boolean.TRUE.equals(org.getDataInfo().getDeletedbyinference()))
return true;
if (!Optional.ofNullable(org.getLegalname()).isPresent()

View File

@ -112,7 +112,7 @@ public class SparkUpdateProjectInfo implements Serializable {
Dataset<Project> project = Utils.readPath(spark, inputPath + "/project", Project.class);
Dataset<String> projectIds = result
.joinWith(resultProject, result.col("id").equalTo(resultProject.col("resultId")), "left")
.joinWith(resultProject, result.col("id").equalTo(resultProject.col("resultId")))
.flatMap(
(FlatMapFunction<Tuple2<Result, ResultProject>, String>) t2 -> t2
._2()
@ -136,8 +136,7 @@ public class SparkUpdateProjectInfo implements Serializable {
result
.joinWith(
resultProject, result.col("id").equalTo(resultProject.col("resultId")),
"left")
resultProject, result.col("id").equalTo(resultProject.col("resultId")))
.map(
(MapFunction<Tuple2<Result, ResultProject>, ResultProject>) t2 -> t2._2(),
Encoders.bean(ResultProject.class))

View File

@ -19,6 +19,12 @@
"paramLongName": "isSparkSessionManaged",
"paramDescription": "the path used to store temporary output files",
"paramRequired": false
},
{
"paramName": "wp",
"paramLongName": "workingPath",
"paramDescription": "the path used to store temporary output files",
"paramRequired": true
}
]