removed not needed variable
This commit is contained in:
parent
98d28bab5c
commit
7400cd019d
|
@ -8,11 +8,13 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
import org.apache.spark.sql.*;
|
import org.apache.spark.sql.*;
|
||||||
|
import org.jetbrains.annotations.NotNull;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -74,7 +76,6 @@ public class SparkOrganizationRelation implements Serializable {
|
||||||
Dataset<Relation> relationDataset = Utils.readPath(spark, inputPath, Relation.class);
|
Dataset<Relation> relationDataset = Utils.readPath(spark, inputPath, Relation.class);
|
||||||
|
|
||||||
relationDataset.createOrReplaceTempView("relation");
|
relationDataset.createOrReplaceTempView("relation");
|
||||||
Set<String> organizationSet = organizationMap.keySet();
|
|
||||||
|
|
||||||
List<eu.dnetlib.dhp.schema.dump.oaf.graph.Relation> relList = new ArrayList<>();
|
List<eu.dnetlib.dhp.schema.dump.oaf.graph.Relation> relList = new ArrayList<>();
|
||||||
|
|
||||||
|
@ -95,15 +96,9 @@ public class SparkOrganizationRelation implements Serializable {
|
||||||
}, Encoders.bean(MergedRels.class))
|
}, Encoders.bean(MergedRels.class))
|
||||||
.filter(Objects::nonNull)
|
.filter(Objects::nonNull)
|
||||||
.collectAsList()
|
.collectAsList()
|
||||||
.forEach(mergedRels -> {
|
.forEach(getMergedRelsConsumer(organizationMap, relList));
|
||||||
String oId = mergedRels.getOrganizationId();
|
|
||||||
organizationSet.remove(oId);
|
|
||||||
organizationMap
|
|
||||||
.get(oId)
|
|
||||||
.forEach(community -> addRelations(relList, community, mergedRels.getRepresentativeId()));
|
|
||||||
});
|
|
||||||
|
|
||||||
organizationSet
|
organizationMap.keySet()
|
||||||
.forEach(
|
.forEach(
|
||||||
oId -> organizationMap
|
oId -> organizationMap
|
||||||
.get(oId)
|
.get(oId)
|
||||||
|
@ -118,6 +113,17 @@ public class SparkOrganizationRelation implements Serializable {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@NotNull
|
||||||
|
private static Consumer<MergedRels> getMergedRelsConsumer(OrganizationMap organizationMap, List<eu.dnetlib.dhp.schema.dump.oaf.graph.Relation> relList) {
|
||||||
|
return mergedRels -> {
|
||||||
|
String oId = mergedRels.getOrganizationId();
|
||||||
|
organizationMap
|
||||||
|
.get(oId)
|
||||||
|
.forEach(community -> addRelations(relList, community, mergedRels.getRepresentativeId()));
|
||||||
|
organizationMap.remove(oId);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
private static void addRelations(List<eu.dnetlib.dhp.schema.dump.oaf.graph.Relation> relList, String community,
|
private static void addRelations(List<eu.dnetlib.dhp.schema.dump.oaf.graph.Relation> relList, String community,
|
||||||
String organization) {
|
String organization) {
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue