|
|
|
@ -20,12 +20,11 @@ import org.slf4j.LoggerFactory;
|
|
|
|
|
|
|
|
|
|
import com.google.gson.Gson;
|
|
|
|
|
|
|
|
|
|
import eu.dnetlib.broker.api.ShortEventMessage;
|
|
|
|
|
import eu.dnetlib.broker.objects.OaBrokerEventPayload;
|
|
|
|
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
|
|
|
|
import eu.dnetlib.dhp.broker.model.Event;
|
|
|
|
|
import eu.dnetlib.dhp.broker.model.ShortEventMessageWithGroupId;
|
|
|
|
|
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
|
|
|
|
|
import scala.Tuple2;
|
|
|
|
|
|
|
|
|
|
public class PartitionEventsByDsIdJob {
|
|
|
|
|
|
|
|
|
@ -61,13 +60,11 @@ public class PartitionEventsByDsIdJob {
|
|
|
|
|
.readPath(spark, eventsPath, Event.class)
|
|
|
|
|
.filter(e -> StringUtils.isNotBlank(e.getMap().getTargetDatasourceId()))
|
|
|
|
|
.filter(e -> e.getMap().getTargetDatasourceId().contains(OPENDOAR_NSPREFIX))
|
|
|
|
|
.map(
|
|
|
|
|
e -> new Tuple2<>(
|
|
|
|
|
StringUtils.substringAfter(e.getMap().getTargetDatasourceId(), OPENDOAR_NSPREFIX),
|
|
|
|
|
messageFromNotification(e)),
|
|
|
|
|
Encoders.tuple(Encoders.STRING(), Encoders.bean(ShortEventMessage.class)))
|
|
|
|
|
.limit(10000)
|
|
|
|
|
.map(e -> messageFromNotification(e), Encoders.bean(ShortEventMessageWithGroupId.class))
|
|
|
|
|
.coalesce(1)
|
|
|
|
|
.write()
|
|
|
|
|
.partitionBy("_1")
|
|
|
|
|
.partitionBy("group")
|
|
|
|
|
.mode(SaveMode.Overwrite)
|
|
|
|
|
.json(partitionPath);
|
|
|
|
|
|
|
|
|
@ -77,7 +74,6 @@ public class PartitionEventsByDsIdJob {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static void renameSubDirs(final String path) throws IOException {
|
|
|
|
|
final String prefix = "_1=";
|
|
|
|
|
final FileSystem fs = FileSystem.get(new Configuration());
|
|
|
|
|
|
|
|
|
|
log.info("** Renaming subdirs of " + path);
|
|
|
|
@ -85,8 +81,8 @@ public class PartitionEventsByDsIdJob {
|
|
|
|
|
if (fileStatus.isDirectory()) {
|
|
|
|
|
final Path oldPath = fileStatus.getPath();
|
|
|
|
|
final String oldName = oldPath.getName();
|
|
|
|
|
if (oldName.startsWith(prefix)) {
|
|
|
|
|
final Path newPath = new Path(path + "/" + StringUtils.substringAfter(oldName, prefix));
|
|
|
|
|
if (oldName.contains("=")) {
|
|
|
|
|
final Path newPath = new Path(path + "/" + StringUtils.substringAfter(oldName, "="));
|
|
|
|
|
log.info(" * " + oldPath.getName() + " -> " + newPath.getName());
|
|
|
|
|
fs.rename(oldPath, newPath);
|
|
|
|
|
}
|
|
|
|
@ -94,18 +90,19 @@ public class PartitionEventsByDsIdJob {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static ShortEventMessage messageFromNotification(final Event e) {
|
|
|
|
|
private static ShortEventMessageWithGroupId messageFromNotification(final Event e) {
|
|
|
|
|
final Gson gson = new Gson();
|
|
|
|
|
|
|
|
|
|
final OaBrokerEventPayload payload = gson.fromJson(e.getPayload(), OaBrokerEventPayload.class);
|
|
|
|
|
|
|
|
|
|
final ShortEventMessage res = new ShortEventMessage();
|
|
|
|
|
final ShortEventMessageWithGroupId res = new ShortEventMessageWithGroupId();
|
|
|
|
|
|
|
|
|
|
res.setOriginalId(payload.getResult().getOriginalId());
|
|
|
|
|
res.setTitle(payload.getResult().getTitles().stream().filter(StringUtils::isNotBlank).findFirst().orElse(null));
|
|
|
|
|
res.setTopic(e.getTopic());
|
|
|
|
|
res.setTrust(payload.getTrust());
|
|
|
|
|
res.generateMessageFromObject(payload.getHighlight());
|
|
|
|
|
res.setGroup(StringUtils.substringAfter(e.getMap().getTargetDatasourceId(), OPENDOAR_NSPREFIX));
|
|
|
|
|
|
|
|
|
|
return res;
|
|
|
|
|
}
|
|
|
|
|