diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/CheckDuplictedIdsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/CheckDuplictedIdsJob.java index 5ca865e8fc..951afb6c5e 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/CheckDuplictedIdsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/CheckDuplictedIdsJob.java @@ -30,9 +30,8 @@ public class CheckDuplictedIdsJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString( - CheckDuplictedIdsJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); + .toString(CheckDuplictedIdsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); parser.parseArgument(args); final SparkConf conf = new SparkConf(); @@ -59,8 +58,8 @@ public class CheckDuplictedIdsJob { .map(o -> ClusterUtils.incrementAccumulator(o, total), Encoders.tuple(Encoders.STRING(), Encoders.LONG())) .write() .mode(SaveMode.Overwrite) - .json(countPath); - ; + .option("compression", "gzip") + .json(countPath);; } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java index e9644122f3..1e060d824f 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java @@ -42,9 +42,8 @@ public class PartitionEventsByDsIdJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString( - PartitionEventsByDsIdJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/od_partitions_params.json"))); + .toString(PartitionEventsByDsIdJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/od_partitions_params.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional @@ -67,13 +66,12 @@ public class PartitionEventsByDsIdJob { final Set validOpendoarIds = new HashSet<>(); if (!opendoarIds.trim().equals("-")) { validOpendoarIds - .addAll( - Arrays - .stream(opendoarIds.split(",")) - .map(String::trim) - .filter(StringUtils::isNotBlank) - .map(s -> OPENDOAR_NSPREFIX + DigestUtils.md5Hex(s)) - .collect(Collectors.toSet())); + .addAll(Arrays + .stream(opendoarIds.split(",")) + .map(String::trim) + .filter(StringUtils::isNotBlank) + .map(s -> OPENDOAR_NSPREFIX + DigestUtils.md5Hex(s)) + .collect(Collectors.toSet())); } log.info("validOpendoarIds: {}", validOpendoarIds); @@ -84,13 +82,12 @@ public class PartitionEventsByDsIdJob { .filter((FilterFunction) e -> StringUtils.isNotBlank(e.getMap().getTargetDatasourceId())) .filter((FilterFunction) e -> e.getMap().getTargetDatasourceId().startsWith(OPENDOAR_NSPREFIX)) .filter((FilterFunction) e -> validOpendoarIds.contains(e.getMap().getTargetDatasourceId())) - .map( - (MapFunction) e -> messageFromNotification(e), - Encoders.bean(ShortEventMessageWithGroupId.class)) + .map((MapFunction) e -> messageFromNotification(e), Encoders.bean(ShortEventMessageWithGroupId.class)) .coalesce(1) .write() .partitionBy("group") .mode(SaveMode.Overwrite) + .option("compression", "gzip") .json(partitionPath); }); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java index d8b8dd8079..04985a6ab9 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java @@ -67,6 +67,7 @@ public class ClusterUtils { .map(o -> ClusterUtils.incrementAccumulator(o, acc), Encoders.bean(clazz)) .write() .mode(SaveMode.Overwrite) + .option("compression", "gzip") .json(path); }