forked from D-Net/dnet-hadoop
changed method signature from set of mapkey entries to String representing path on file system where to find the map
This commit is contained in:
parent
931fa2ff00
commit
ae1b7fbfdb
|
@ -1,10 +1,17 @@
|
||||||
|
/**
|
||||||
|
* This class splits the dumped results according to the research community - research initiative/infrastructure
|
||||||
|
* they are related to. The information about the community is found in the element "context.id" in the result.
|
||||||
|
* Since the context that can be found in the result can be associated not only to communities, a community Map
|
||||||
|
* is provided. It will guide the splitting process.
|
||||||
|
* Note: the repartition(1) just before writing the results related to a community. This is a choice due
|
||||||
|
* to uploading constraints (just one file for each community) As soon as a better solution will be in place
|
||||||
|
* remove the repartition
|
||||||
|
*/
|
||||||
package eu.dnetlib.dhp.oa.graph.dump.community;
|
package eu.dnetlib.dhp.oa.graph.dump.community;
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
@ -19,19 +26,19 @@ import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult;
|
||||||
|
|
||||||
public class CommunitySplit implements Serializable {
|
public class CommunitySplit implements Serializable {
|
||||||
|
|
||||||
public void run(Boolean isSparkSessionManaged, String inputPath, String outputPath, CommunityMap communityMap) {
|
public void run(Boolean isSparkSessionManaged, String inputPath, String outputPath, String communityMapPath) {
|
||||||
SparkConf conf = new SparkConf();
|
SparkConf conf = new SparkConf();
|
||||||
runWithSparkSession(
|
runWithSparkSession(
|
||||||
conf,
|
conf,
|
||||||
isSparkSessionManaged,
|
isSparkSessionManaged,
|
||||||
spark -> {
|
spark -> {
|
||||||
Utils.removeOutputDir(spark, outputPath);
|
Utils.removeOutputDir(spark, outputPath);
|
||||||
execSplit(spark, inputPath, outputPath, communityMap.keySet());// , inputClazz);
|
execSplit(spark, inputPath, outputPath, Utils.getCommunityMap(spark, communityMapPath).keySet());
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void execSplit(SparkSession spark, String inputPath, String outputPath,
|
private static void execSplit(SparkSession spark, String inputPath, String outputPath,
|
||||||
Set<String> communities) {// }, Class<R> inputClazz) {
|
Set<String> communities) {
|
||||||
|
|
||||||
Dataset<CommunityResult> result = Utils
|
Dataset<CommunityResult> result = Utils
|
||||||
.readPath(spark, inputPath + "/publication", CommunityResult.class)
|
.readPath(spark, inputPath + "/publication", CommunityResult.class)
|
||||||
|
|
Loading…
Reference in New Issue