72 lines
2.0 KiB
Java
72 lines
2.0 KiB
Java
package eu.dnetlib.lbs.utils;
|
|
|
|
import java.util.Date;
|
|
import java.util.function.Function;
|
|
import java.util.stream.Collectors;
|
|
|
|
import org.apache.commons.codec.digest.DigestUtils;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
|
import com.google.gson.Gson;
|
|
|
|
import eu.dnetlib.lbs.elasticsearch.Event;
|
|
import eu.dnetlib.lbs.events.input.EventMessage;
|
|
|
|
public class JsonMessageToEventFunction implements Function<String, Event> {
|
|
|
|
private final Gson gson = new Gson();
|
|
|
|
private static final Log log = LogFactory.getLog(JsonMessageToEventFunction.class);
|
|
|
|
@Override
|
|
public Event apply(final String json) {
|
|
try {
|
|
final EventMessage msg = gson.fromJson(json, EventMessage.class);
|
|
if (msg == null || msg.hasNulls()) {
|
|
log.warn("Message is null or some required field is null");
|
|
return null;
|
|
}
|
|
|
|
final long now = new Date().getTime();
|
|
|
|
final Event event = new Event();
|
|
event.setEventId("event-" + DigestUtils.md5Hex(msg.getTopic()).substring(0, 6) + "-" + calculatePayloadHash(msg.getPayload()));
|
|
event.setProducerId(msg.getProducerId());
|
|
event.setPayload(msg.getPayload());
|
|
event.setMap(msg.getMap()
|
|
.entrySet()
|
|
.stream()
|
|
.filter(e -> e.getValue().asObject() != null)
|
|
.collect(Collectors.toMap(
|
|
e -> e.getKey(),
|
|
e -> e.getValue().asObject())));
|
|
event.setTopic(msg.getTopic());
|
|
event.setCreationDate(now);
|
|
event.setExpiryDate(calculateExpiryDate(now, msg.getTthDays()));
|
|
event.setInstantMessage(msg.getTthDays() == 0);
|
|
return event;
|
|
} catch (final Throwable e) {
|
|
log.warn("Error converting json: " + json, e);
|
|
return null;
|
|
}
|
|
}
|
|
|
|
private Long calculateExpiryDate(final long now, final int ttlDays) {
|
|
if (ttlDays < 0) {
|
|
return null;
|
|
} else if (ttlDays == 0) {
|
|
return now;
|
|
} else {
|
|
final long duration = ttlDays * 24 * 60 * 60 * 1000;
|
|
return now + duration;
|
|
}
|
|
}
|
|
|
|
private static String calculatePayloadHash(final String payload) {
|
|
// TODO : the payload should be normalized before hashing it
|
|
return DigestUtils.md5Hex(payload);
|
|
}
|
|
|
|
}
|