1
0
Fork 0

gzipped output

This commit is contained in:
Michele Artini 2020-12-10 11:59:28 +01:00
parent 27e96767e0
commit 94bfed1c84
3 changed files with 15 additions and 18 deletions

View File

@ -30,9 +30,8 @@ public class CheckDuplictedIdsJob {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
CheckDuplictedIdsJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
.toString(CheckDuplictedIdsJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
parser.parseArgument(args);
final SparkConf conf = new SparkConf();
@ -59,8 +58,8 @@ public class CheckDuplictedIdsJob {
.map(o -> ClusterUtils.incrementAccumulator(o, total), Encoders.tuple(Encoders.STRING(), Encoders.LONG()))
.write()
.mode(SaveMode.Overwrite)
.json(countPath);
;
.option("compression", "gzip")
.json(countPath);;
}

View File

@ -42,9 +42,8 @@ public class PartitionEventsByDsIdJob {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
PartitionEventsByDsIdJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/od_partitions_params.json")));
.toString(PartitionEventsByDsIdJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/od_partitions_params.json")));
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
@ -67,13 +66,12 @@ public class PartitionEventsByDsIdJob {
final Set<String> validOpendoarIds = new HashSet<>();
if (!opendoarIds.trim().equals("-")) {
validOpendoarIds
.addAll(
Arrays
.stream(opendoarIds.split(","))
.map(String::trim)
.filter(StringUtils::isNotBlank)
.map(s -> OPENDOAR_NSPREFIX + DigestUtils.md5Hex(s))
.collect(Collectors.toSet()));
.addAll(Arrays
.stream(opendoarIds.split(","))
.map(String::trim)
.filter(StringUtils::isNotBlank)
.map(s -> OPENDOAR_NSPREFIX + DigestUtils.md5Hex(s))
.collect(Collectors.toSet()));
}
log.info("validOpendoarIds: {}", validOpendoarIds);
@ -84,13 +82,12 @@ public class PartitionEventsByDsIdJob {
.filter((FilterFunction<Event>) e -> StringUtils.isNotBlank(e.getMap().getTargetDatasourceId()))
.filter((FilterFunction<Event>) e -> e.getMap().getTargetDatasourceId().startsWith(OPENDOAR_NSPREFIX))
.filter((FilterFunction<Event>) e -> validOpendoarIds.contains(e.getMap().getTargetDatasourceId()))
.map(
(MapFunction<Event, ShortEventMessageWithGroupId>) e -> messageFromNotification(e),
Encoders.bean(ShortEventMessageWithGroupId.class))
.map((MapFunction<Event, ShortEventMessageWithGroupId>) e -> messageFromNotification(e), Encoders.bean(ShortEventMessageWithGroupId.class))
.coalesce(1)
.write()
.partitionBy("group")
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(partitionPath);
});

View File

@ -67,6 +67,7 @@ public class ClusterUtils {
.map(o -> ClusterUtils.incrementAccumulator(o, acc), Encoders.bean(clazz))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(path);
}