diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java index da2c5bb78a..65d5e6f946 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java @@ -4,8 +4,13 @@ package eu.dnetlib.dhp.broker.oa; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -29,15 +34,14 @@ import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; public class PartitionEventsByDsIdJob { private static final Logger log = LoggerFactory.getLogger(PartitionEventsByDsIdJob.class); - private static final String OPENDOAR_NSPREFIX = "opendoar____::"; + private static final String OPENDOAR_NSPREFIX = "10|opendoar____::"; public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString( - PartitionEventsByDsIdJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); + .toString(PartitionEventsByDsIdJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/od_partitions_params.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional @@ -54,13 +58,25 @@ public class PartitionEventsByDsIdJob { final String partitionPath = parser.get("workingPath") + "/eventsByOpendoarId"; log.info("partitionPath: {}", partitionPath); + final String opendoarIds = parser.get("opendoarIds"); + log.info("opendoarIds: {}", opendoarIds); + + final Set validOpendoarIds = new HashSet<>(); + if (!opendoarIds.trim().equals("-")) { + validOpendoarIds.addAll(Arrays.stream(opendoarIds.split(",")) + .map(String::trim) + .filter(StringUtils::isNotBlank) + .map(s -> OPENDOAR_NSPREFIX + DigestUtils.md5Hex(s)) + .collect(Collectors.toSet())); + } + runWithSparkSession(conf, isSparkSessionManaged, spark -> { ClusterUtils .readPath(spark, eventsPath, Event.class) .filter(e -> StringUtils.isNotBlank(e.getMap().getTargetDatasourceId())) - .filter(e -> e.getMap().getTargetDatasourceId().contains(OPENDOAR_NSPREFIX)) - .limit(10000) + .filter(e -> e.getMap().getTargetDatasourceId().startsWith(OPENDOAR_NSPREFIX)) + .filter(e -> validOpendoarIds.contains(e.getMap().getTargetDatasourceId())) .map(e -> messageFromNotification(e), Encoders.bean(ShortEventMessageWithGroupId.class)) .coalesce(1) .write() diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/od_partitions_params.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/od_partitions_params.json new file mode 100644 index 0000000000..10ba926abf --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/od_partitions_params.json @@ -0,0 +1,14 @@ +[ + { + "paramName": "o", + "paramLongName": "workingPath", + "paramDescription": "the path where the temporary data will be stored", + "paramRequired": true + }, + { + "paramName": "list", + "paramLongName": "opendoarIds", + "paramDescription": "the opendoar IDs whitelist (comma separated)", + "paramRequired": true + } +] diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/config-default.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/opendoarPartition/oozie_app/config-default.xml similarity index 100% rename from dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/config-default.xml rename to dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/opendoarPartition/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/opendoarPartition/oozie_app/workflow.xml similarity index 61% rename from dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml rename to dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/opendoarPartition/oozie_app/workflow.xml index 8bae626f17..dba3c9f73b 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/opendoarPartition/oozie_app/workflow.xml @@ -1,60 +1,13 @@ - + - graphInputPath - the path where the graph is stored + opendoarIds + the opendoar IDs whitelist (comma separated) workingPath the path where the the generated data will be stored - - - datasourceIdWhitelist - - - a white list (comma separeted, - for empty list) of datasource ids - - - datasourceTypeWhitelist - - - a white list (comma separeted, - for empty list) of datasource types - - - datasourceIdBlacklist - - - a black list (comma separeted, - for empty list) of datasource ids - - - esEventIndexName - the elasticsearch index name for events - - - esNotificationsIndexName - the elasticsearch index name for notifications - - - esIndexHost - the elasticsearch host - - - maxIndexedEventsForDsAndTopic - the max number of events for each couple (ds/topic) - - - brokerApiBaseUrl - the url of the broker service api - - - brokerDbUrl - the url of the broker database - - - brokerDbUser - the user of the broker database - - - brokerDbPassword - the password of the broker database sparkDriverMemory @@ -111,13 +64,13 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + yarn cluster @@ -134,8 +87,8 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - --graphPath${graphInputPath} --workingPath${workingPath} + --opendoarIds${opendoarIds}