update eventId generator

This commit is contained in:
Michele Artini 2020-07-18 09:40:36 +02:00
parent cc5d13da85
commit 346a1d2b5a
2 changed files with 11 additions and 151 deletions

View File

@ -34,7 +34,10 @@ public class EventFactory {
final MappedFields map = createMapFromResult(updateInfo); final MappedFields map = createMapFromResult(updateInfo);
final String eventId = calculateEventId( final String eventId = calculateEventId(
updateInfo.getTopicPath(), updateInfo.getTarget().getOpenaireId(), updateInfo.getHighlightValueAsString()); updateInfo.getTopicPath(), updateInfo.getTargetDs().getOpenaireId(), updateInfo
.getTarget()
.getOpenaireId(),
updateInfo.getHighlightValueAsString());
res.setEventId(eventId); res.setEventId(eventId);
res.setProducerId(PRODUCER_ID); res.setProducerId(PRODUCER_ID);
@ -93,11 +96,13 @@ public class EventFactory {
return map; return map;
} }
private static String calculateEventId(final String topic, final String publicationId, final String value) { private static String calculateEventId(final String topic, final String dsId, final String publicationId,
final String value) {
return "event-" return "event-"
+ DigestUtils.md5Hex(topic).substring(0, 6) + "-" + DigestUtils.md5Hex(topic).substring(0, 4) + "-"
+ DigestUtils.md5Hex(publicationId).substring(0, 8) + "-" + DigestUtils.md5Hex(dsId).substring(0, 4) + "-"
+ DigestUtils.md5Hex(value).substring(0, 8); + DigestUtils.md5Hex(publicationId).substring(0, 7) + "-"
+ DigestUtils.md5Hex(value).substring(0, 5);
} }
private static long calculateExpiryDate(final long now) { private static long calculateExpiryDate(final long now) {

View File

@ -64,157 +64,12 @@
</configuration> </configuration>
</global> </global>
<start to="join_entities_step0"/> <start to="generate_events"/>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<action name="join_entities_step0">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>JoinStep0</name>
<class>eu.dnetlib.dhp.broker.oa.JoinStep0Job</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="join_entities_step1"/>
<error to="Kill"/>
</action>
<action name="join_entities_step1">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>JoinStep1</name>
<class>eu.dnetlib.dhp.broker.oa.JoinStep1Job</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="join_entities_step2"/>
<error to="Kill"/>
</action>
<action name="join_entities_step2">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>JoinStep2</name>
<class>eu.dnetlib.dhp.broker.oa.JoinStep2Job</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="join_entities_step3"/>
<error to="Kill"/>
</action>
<action name="join_entities_step3">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>JoinStep3</name>
<class>eu.dnetlib.dhp.broker.oa.JoinStep3Job</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="join_entities_step4"/>
<error to="Kill"/>
</action>
<action name="join_entities_step4">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>JoinStep4</name>
<class>eu.dnetlib.dhp.broker.oa.JoinStep4Job</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="prepare_groups"/>
<error to="Kill"/>
</action>
<action name="prepare_groups">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>PrepareGroupsJob</name>
<class>eu.dnetlib.dhp.broker.oa.PrepareGroupsJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="generate_events"/>
<error to="Kill"/>
</action>
<action name="generate_events"> <action name="generate_events">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>