diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/ShortEventMessageWithGroupId.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/ShortEventMessageWithGroupId.java new file mode 100644 index 000000000..1a1e9764b --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/ShortEventMessageWithGroupId.java @@ -0,0 +1,25 @@ + +package eu.dnetlib.dhp.broker.model; + +import java.io.Serializable; + +import eu.dnetlib.broker.api.ShortEventMessage; + +public class ShortEventMessageWithGroupId extends ShortEventMessage implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 4704889388757626630L; + + private String group; + + public String getGroup() { + return group; + } + + public void setGroup(final String group) { + this.group = group; + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java index 8a9009f32..d5c53ea36 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java @@ -14,6 +14,7 @@ import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.spark.SparkConf; import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.TypedColumn; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,6 +75,7 @@ public class GenerateStatsJob { .agg(aggr) .map(t -> t._2, Encoders.bean(DatasourceStats.class)) .write() + .mode(SaveMode.Overwrite) .jdbc(dbUrl, "oa_datasource_stats_temp", connectionProperties); log.info("*** updateStats"); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java index 0748624f7..da2c5bb78 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java @@ -20,12 +20,11 @@ import org.slf4j.LoggerFactory; import com.google.gson.Gson; -import eu.dnetlib.broker.api.ShortEventMessage; import eu.dnetlib.broker.objects.OaBrokerEventPayload; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.broker.model.Event; +import eu.dnetlib.dhp.broker.model.ShortEventMessageWithGroupId; import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; -import scala.Tuple2; public class PartitionEventsByDsIdJob { @@ -61,13 +60,11 @@ public class PartitionEventsByDsIdJob { .readPath(spark, eventsPath, Event.class) .filter(e -> StringUtils.isNotBlank(e.getMap().getTargetDatasourceId())) .filter(e -> e.getMap().getTargetDatasourceId().contains(OPENDOAR_NSPREFIX)) - .map( - e -> new Tuple2<>( - StringUtils.substringAfter(e.getMap().getTargetDatasourceId(), OPENDOAR_NSPREFIX), - messageFromNotification(e)), - Encoders.tuple(Encoders.STRING(), Encoders.bean(ShortEventMessage.class))) + .limit(10000) + .map(e -> messageFromNotification(e), Encoders.bean(ShortEventMessageWithGroupId.class)) + .coalesce(1) .write() - .partitionBy("_1") + .partitionBy("group") .mode(SaveMode.Overwrite) .json(partitionPath); @@ -77,7 +74,6 @@ public class PartitionEventsByDsIdJob { } private static void renameSubDirs(final String path) throws IOException { - final String prefix = "_1="; final FileSystem fs = FileSystem.get(new Configuration()); log.info("** Renaming subdirs of " + path); @@ -85,8 +81,8 @@ public class PartitionEventsByDsIdJob { if (fileStatus.isDirectory()) { final Path oldPath = fileStatus.getPath(); final String oldName = oldPath.getName(); - if (oldName.startsWith(prefix)) { - final Path newPath = new Path(path + "/" + StringUtils.substringAfter(oldName, prefix)); + if (oldName.contains("=")) { + final Path newPath = new Path(path + "/" + StringUtils.substringAfter(oldName, "=")); log.info(" * " + oldPath.getName() + " -> " + newPath.getName()); fs.rename(oldPath, newPath); } @@ -94,18 +90,19 @@ public class PartitionEventsByDsIdJob { } } - private static ShortEventMessage messageFromNotification(final Event e) { + private static ShortEventMessageWithGroupId messageFromNotification(final Event e) { final Gson gson = new Gson(); final OaBrokerEventPayload payload = gson.fromJson(e.getPayload(), OaBrokerEventPayload.class); - final ShortEventMessage res = new ShortEventMessage(); + final ShortEventMessageWithGroupId res = new ShortEventMessageWithGroupId(); res.setOriginalId(payload.getResult().getOriginalId()); res.setTitle(payload.getResult().getTitles().stream().filter(StringUtils::isNotBlank).findFirst().orElse(null)); res.setTopic(e.getTopic()); res.setTrust(payload.getTrust()); res.generateMessageFromObject(payload.getHighlight()); + res.setGroup(StringUtils.substringAfter(e.getMap().getTargetDatasourceId(), OPENDOAR_NSPREFIX)); return res; } diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml index 4184b71bd..14e33b091 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml @@ -44,6 +44,18 @@ brokerApiBaseUrl the url of the broker service api + + brokerDbUrl + the url of the broker database + + + brokerDbUser + the user of the broker database + + + brokerDbPassword + the password of the broker database + sparkDriverMemory memory for driver process @@ -509,8 +521,11 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - --graphPath${graphInputPath} --workingPath${workingPath} + --dbUrl${brokerDbUrl} + --dbUser${brokerDbUser} + --dbPassword${brokerDbPassword} + --brokerApiBaseUrl${brokerApiBaseUrl} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml index 407b9f42f..8bae626f1 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml @@ -111,18 +111,18 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + yarn cluster - GenerateStatsJob - eu.dnetlib.dhp.broker.oa.GenerateStatsJob + PartitionEventsByDsIdJob + eu.dnetlib.dhp.broker.oa.PartitionEventsByDsIdJob dhp-broker-events-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -134,11 +134,8 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 + --graphPath${graphInputPath} --workingPath${workingPath} - --dbUrl${brokerDbUrl} - --dbUser${brokerDbUser} - --dbPassword${brokerDbPassword} - --brokerApiBaseUrl${brokerApiBaseUrl}