This commit is contained in:
Claudio Atzori 2020-12-07 21:53:13 +01:00
parent 2fcc24b36e
commit fba11eef2a
1 changed files with 20 additions and 11 deletions

View File

@ -18,6 +18,8 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SaveMode;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -40,8 +42,9 @@ public class PartitionEventsByDsIdJob {
final ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils IOUtils
.toString(PartitionEventsByDsIdJob.class .toString(
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/od_partitions_params.json"))); PartitionEventsByDsIdJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/od_partitions_params.json")));
parser.parseArgument(args); parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional final Boolean isSparkSessionManaged = Optional
@ -63,21 +66,27 @@ public class PartitionEventsByDsIdJob {
final Set<String> validOpendoarIds = new HashSet<>(); final Set<String> validOpendoarIds = new HashSet<>();
if (!opendoarIds.trim().equals("-")) { if (!opendoarIds.trim().equals("-")) {
validOpendoarIds.addAll(Arrays.stream(opendoarIds.split(",")) validOpendoarIds
.map(String::trim) .addAll(
.filter(StringUtils::isNotBlank) Arrays
.map(s -> OPENDOAR_NSPREFIX + DigestUtils.md5Hex(s)) .stream(opendoarIds.split(","))
.collect(Collectors.toSet())); .map(String::trim)
.filter(StringUtils::isNotBlank)
.map(s -> OPENDOAR_NSPREFIX + DigestUtils.md5Hex(s))
.collect(Collectors.toSet()));
} }
log.info("validOpendoarIds: {}", validOpendoarIds);
runWithSparkSession(conf, isSparkSessionManaged, spark -> { runWithSparkSession(conf, isSparkSessionManaged, spark -> {
ClusterUtils ClusterUtils
.readPath(spark, eventsPath, Event.class) .readPath(spark, eventsPath, Event.class)
.filter(e -> StringUtils.isNotBlank(e.getMap().getTargetDatasourceId())) .filter((FilterFunction<Event>) e -> StringUtils.isNotBlank(e.getMap().getTargetDatasourceId()))
.filter(e -> e.getMap().getTargetDatasourceId().startsWith(OPENDOAR_NSPREFIX)) .filter((FilterFunction<Event>) e -> e.getMap().getTargetDatasourceId().startsWith(OPENDOAR_NSPREFIX))
.filter(e -> validOpendoarIds.contains(e.getMap().getTargetDatasourceId())) .filter((FilterFunction<Event>) e -> validOpendoarIds.contains(e.getMap().getTargetDatasourceId()))
.map(e -> messageFromNotification(e), Encoders.bean(ShortEventMessageWithGroupId.class)) .map(
(MapFunction<Event, ShortEventMessageWithGroupId>) e -> messageFromNotification(e),
Encoders.bean(ShortEventMessageWithGroupId.class))
.coalesce(1) .coalesce(1)
.write() .write()
.partitionBy("group") .partitionBy("group")