fixed conflicts

This commit is contained in:
Miriam Baglioni 2020-09-24 15:35:01 +02:00
commit de6c4d46d8
33 changed files with 1386 additions and 75 deletions

View File

@ -19,7 +19,7 @@
<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_annotation_type_member_declaration" value="do not insert"/> <setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_annotation_type_member_declaration" value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_declaration_throws" value="do not insert"/> <setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_declaration_throws" value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.parentheses_positions_in_switch_statement" value="common_lines"/> <setting id="org.eclipse.jdt.core.formatter.parentheses_positions_in_switch_statement" value="common_lines"/>
<setting id="org.eclipse.jdt.core.formatter.comment.format_javadoc_comments" value="true"/> <setting id="org.eclipse.jdt.core.formatter.comment.format_javadoc_comments" value="false"/>
<setting id="org.eclipse.jdt.core.formatter.indentation.size" value="4"/> <setting id="org.eclipse.jdt.core.formatter.indentation.size" value="4"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_postfix_operator" value="do not insert"/> <setting id="org.eclipse.jdt.core.formatter.insert_space_after_postfix_operator" value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.parentheses_positions_in_enum_constant_declaration" value="common_lines"/> <setting id="org.eclipse.jdt.core.formatter.parentheses_positions_in_enum_constant_declaration" value="common_lines"/>

View File

@ -24,8 +24,10 @@ public class Instance implements Serializable {
private String type; private String type;
private List<String> url; private List<String> url;
private String publicationdate;// dateofacceptance; private String publicationdate;// dateofacceptance;
private String refereed; // peer-review status private String refereed; // peer-review status

View File

@ -31,6 +31,10 @@
<artifactId>elasticsearch-hadoop</artifactId> <artifactId>elasticsearch-hadoop</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency> <dependency>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>

View File

@ -0,0 +1,31 @@
package eu.dnetlib.dhp.broker.model;
import java.io.Serializable;
public class ConditionParams implements Serializable {
/**
*
*/
private static final long serialVersionUID = 2719901844537516110L;
private String value;
private String otherValue;
public String getValue() {
return value;
}
public void setValue(final String value) {
this.value = value;
}
public String getOtherValue() {
return otherValue;
}
public void setOtherValue(final String otherValue) {
this.otherValue = otherValue;
}
}

View File

@ -2,7 +2,6 @@
package eu.dnetlib.dhp.broker.model; package eu.dnetlib.dhp.broker.model;
import java.text.ParseException; import java.text.ParseException;
import java.util.Date;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -19,16 +18,12 @@ public class EventFactory {
private final static String PRODUCER_ID = "OpenAIRE"; private final static String PRODUCER_ID = "OpenAIRE";
private static final int TTH_DAYS = 365;
private final static String[] DATE_PATTERNS = { private final static String[] DATE_PATTERNS = {
"yyyy-MM-dd" "yyyy-MM-dd"
}; };
public static Event newBrokerEvent(final UpdateInfo<?> updateInfo) { public static Event newBrokerEvent(final UpdateInfo<?> updateInfo) {
final long now = new Date().getTime();
final Event res = new Event(); final Event res = new Event();
final MappedFields map = createMapFromResult(updateInfo); final MappedFields map = createMapFromResult(updateInfo);
@ -44,8 +39,8 @@ public class EventFactory {
res.setPayload(updateInfo.asBrokerPayload().toJSON()); res.setPayload(updateInfo.asBrokerPayload().toJSON());
res.setMap(map); res.setMap(map);
res.setTopic(updateInfo.getTopicPath()); res.setTopic(updateInfo.getTopicPath());
res.setCreationDate(now); res.setCreationDate(0l);
res.setExpiryDate(calculateExpiryDate(now)); res.setExpiryDate(Long.MAX_VALUE);
res.setInstantMessage(false); res.setInstantMessage(false);
return res; return res;
@ -96,7 +91,9 @@ public class EventFactory {
return map; return map;
} }
private static String calculateEventId(final String topic, final String dsId, final String publicationId, private static String calculateEventId(final String topic,
final String dsId,
final String publicationId,
final String value) { final String value) {
return "event-" return "event-"
+ DigestUtils.md5Hex(topic).substring(0, 4) + "-" + DigestUtils.md5Hex(topic).substring(0, 4) + "-"
@ -105,10 +102,6 @@ public class EventFactory {
+ DigestUtils.md5Hex(value).substring(0, 5); + DigestUtils.md5Hex(value).substring(0, 5);
} }
private static long calculateExpiryDate(final long now) {
return now + TTH_DAYS * 24 * 60 * 60 * 1000;
}
private static long parseDateTolong(final String date) { private static long parseDateTolong(final String date) {
if (StringUtils.isBlank(date)) { if (StringUtils.isBlank(date)) {
return -1; return -1;

View File

@ -0,0 +1,37 @@
package eu.dnetlib.dhp.broker.model;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
@JsonIgnoreProperties(ignoreUnknown = true)
public class MapCondition implements Serializable {
/**
*
*/
private static final long serialVersionUID = -7137490975452466813L;
private String field;
private List<ConditionParams> listParams = new ArrayList<>();
public String getField() {
return field;
}
public void setField(final String field) {
this.field = field;
}
public List<ConditionParams> getListParams() {
return listParams;
}
public void setListParams(final List<ConditionParams> listParams) {
this.listParams = listParams;
}
}

View File

@ -0,0 +1,93 @@
package eu.dnetlib.dhp.broker.model;
import java.io.Serializable;
public class Notification implements Serializable {
/**
*
*/
private static final long serialVersionUID = -1770420972526995727L;
private String notificationId;
private String subscriptionId;
private String producerId;
private String eventId;
private String topic;
private Long date;
private String payload;
private MappedFields map;
public String getNotificationId() {
return notificationId;
}
public void setNotificationId(final String notificationId) {
this.notificationId = notificationId;
}
public String getSubscriptionId() {
return subscriptionId;
}
public void setSubscriptionId(final String subscriptionId) {
this.subscriptionId = subscriptionId;
}
public String getProducerId() {
return producerId;
}
public void setProducerId(final String producerId) {
this.producerId = producerId;
}
public String getEventId() {
return eventId;
}
public void setEventId(final String eventId) {
this.eventId = eventId;
}
public String getTopic() {
return topic;
}
public void setTopic(final String topic) {
this.topic = topic;
}
public String getPayload() {
return payload;
}
public void setPayload(final String payload) {
this.payload = payload;
}
public MappedFields getMap() {
return map;
}
public void setMap(final MappedFields map) {
this.map = map;
}
public Long getDate() {
return date;
}
public void setDate(final Long date) {
this.date = date;
}
}

View File

@ -0,0 +1,74 @@
package eu.dnetlib.dhp.broker.model;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.ObjectMapper;
@JsonIgnoreProperties(ignoreUnknown = true)
public class Subscription implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1051702214740830010L;
private String subscriptionId;
private String subscriber;
private String topic;
private String conditions;
public String getSubscriptionId() {
return subscriptionId;
}
public void setSubscriptionId(final String subscriptionId) {
this.subscriptionId = subscriptionId;
}
public String getSubscriber() {
return subscriber;
}
public void setSubscriber(final String subscriber) {
this.subscriber = subscriber;
}
public String getTopic() {
return topic;
}
public void setTopic(final String topic) {
this.topic = topic;
}
public String getConditions() {
return conditions;
}
public void setConditions(final String conditions) {
this.conditions = conditions;
}
public Map<String, List<ConditionParams>> conditionsAsMap() {
final ObjectMapper mapper = new ObjectMapper();
try {
final List<MapCondition> list = mapper
.readValue(
getConditions(), mapper.getTypeFactory().constructCollectionType(List.class, MapCondition.class));
return list
.stream()
.filter(mc -> !mc.getListParams().isEmpty())
.collect(Collectors.toMap(MapCondition::getField, MapCondition::getListParams));
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@ -3,11 +3,16 @@ package eu.dnetlib.dhp.broker.oa;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.IOException;
import java.util.Optional; import java.util.Optional;
import java.util.Properties;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.TypedColumn; import org.apache.spark.sql.TypedColumn;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -28,8 +33,8 @@ public class GenerateStatsJob {
final ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils IOUtils
.toString( .toString(
IndexOnESJob.class GenerateStatsJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/stats_params.json")));
parser.parseArgument(args); parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional final Boolean isSparkSessionManaged = Optional
@ -43,21 +48,50 @@ public class GenerateStatsJob {
final String eventsPath = parser.get("workingPath") + "/events"; final String eventsPath = parser.get("workingPath") + "/events";
log.info("eventsPath: {}", eventsPath); log.info("eventsPath: {}", eventsPath);
final String statsPath = parser.get("workingPath") + "/stats"; final String dbUrl = parser.get("dbUrl");
log.info("stats: {}", statsPath); log.info("dbUrl: {}", dbUrl);
final String dbUser = parser.get("dbUser");
log.info("dbUser: {}", dbUser);
final String dbPassword = parser.get("dbPassword");
log.info("dbPassword: {}", "***");
final String brokerApiBaseUrl = parser.get("brokerApiBaseUrl");
log.info("brokerApiBaseUrl: {}", brokerApiBaseUrl);
final TypedColumn<Event, DatasourceStats> aggr = new StatsAggregator().toColumn(); final TypedColumn<Event, DatasourceStats> aggr = new StatsAggregator().toColumn();
final Properties connectionProperties = new Properties();
connectionProperties.put("user", dbUser);
connectionProperties.put("password", dbPassword);
runWithSparkSession(conf, isSparkSessionManaged, spark -> { runWithSparkSession(conf, isSparkSessionManaged, spark -> {
final Dataset<DatasourceStats> stats = ClusterUtils ClusterUtils
.readPath(spark, eventsPath, Event.class) .readPath(spark, eventsPath, Event.class)
.groupByKey(e -> e.getMap().getTargetDatasourceId(), Encoders.STRING()) .groupByKey(e -> e.getTopic() + "@@@" + e.getMap().getTargetDatasourceId(), Encoders.STRING())
.agg(aggr) .agg(aggr)
.map(t -> t._2, Encoders.bean(DatasourceStats.class)); .map(t -> t._2, Encoders.bean(DatasourceStats.class))
.write()
.jdbc(dbUrl, "oa_datasource_stats_temp", connectionProperties);
log.info("*** updateStats");
updateStats(brokerApiBaseUrl);
log.info("*** ALL done.");
ClusterUtils.save(stats, statsPath, DatasourceStats.class, null);
}); });
} }
private static String updateStats(final String brokerApiBaseUrl) throws IOException {
final String url = brokerApiBaseUrl + "/api/openaireBroker/stats/update";
final HttpGet req = new HttpGet(url);
try (final CloseableHttpClient client = HttpClients.createDefault()) {
try (final CloseableHttpResponse response = client.execute(req)) {
return IOUtils.toString(response.getEntity().getContent());
}
}
}
} }

View File

@ -0,0 +1,126 @@
package eu.dnetlib.dhp.broker.oa;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.TypedColumn;
import org.apache.spark.util.LongAccumulator;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.model.Event;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.EventGroup;
import eu.dnetlib.dhp.broker.oa.util.aggregators.subset.EventSubsetAggregator;
public class IndexEventSubsetJob {
private static final Logger log = LoggerFactory.getLogger(IndexEventSubsetJob.class);
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
IndexEventSubsetJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_event_subset.json")));
parser.parseArgument(args);
final SparkConf conf = new SparkConf();
final String eventsPath = parser.get("workingPath") + "/events";
log.info("eventsPath: {}", eventsPath);
final String index = parser.get("index");
log.info("index: {}", index);
final String indexHost = parser.get("esHost");
log.info("indexHost: {}", indexHost);
final int maxEventsForTopic = NumberUtils.toInt(parser.get("maxEventsForTopic"));
log.info("maxEventsForTopic: {}", maxEventsForTopic);
final String brokerApiBaseUrl = parser.get("brokerApiBaseUrl");
log.info("brokerApiBaseUrl: {}", brokerApiBaseUrl);
final SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
final TypedColumn<Event, EventGroup> aggr = new EventSubsetAggregator(maxEventsForTopic).toColumn();
final LongAccumulator total = spark.sparkContext().longAccumulator("total_indexed");
final long now = new Date().getTime();
final Dataset<Event> subset = ClusterUtils
.readPath(spark, eventsPath, Event.class)
.groupByKey(e -> e.getTopic() + '@' + e.getMap().getTargetDatasourceId(), Encoders.STRING())
.agg(aggr)
.map(t -> t._2, Encoders.bean(EventGroup.class))
.flatMap(g -> g.getData().iterator(), Encoders.bean(Event.class));
final JavaRDD<String> inputRdd = subset
.map(e -> prepareEventForIndexing(e, now, total), Encoders.STRING())
.javaRDD();
final Map<String, String> esCfg = new HashMap<>();
// esCfg.put("es.nodes", "10.19.65.51, 10.19.65.52, 10.19.65.53, 10.19.65.54");
esCfg.put("es.index.auto.create", "false");
esCfg.put("es.nodes", indexHost);
esCfg.put("es.mapping.id", "eventId"); // THE PRIMARY KEY
esCfg.put("es.batch.write.retry.count", "8");
esCfg.put("es.batch.write.retry.wait", "60s");
esCfg.put("es.batch.size.entries", "200");
esCfg.put("es.nodes.wan.only", "true");
log.info("*** Start indexing");
JavaEsSpark.saveJsonToEs(inputRdd, index, esCfg);
log.info("*** End indexing");
log.info("*** Deleting old events");
final String message = deleteOldEvents(brokerApiBaseUrl, now - 1000);
log.info("*** Deleted events: " + message);
}
private static String deleteOldEvents(final String brokerApiBaseUrl, final long l) throws Exception {
final String url = brokerApiBaseUrl + "/api/events/byCreationDate/0/" + l;
final HttpDelete req = new HttpDelete(url);
try (final CloseableHttpClient client = HttpClients.createDefault()) {
try (final CloseableHttpResponse response = client.execute(req)) {
return IOUtils.toString(response.getEntity().getContent());
}
}
}
private static String prepareEventForIndexing(final Event e, final long creationDate, final LongAccumulator acc)
throws JsonProcessingException {
acc.add(1);
e.setCreationDate(creationDate);
e.setExpiryDate(Long.MAX_VALUE);
return new ObjectMapper().writeValueAsString(e);
}
}

View File

@ -0,0 +1,238 @@
package eu.dnetlib.dhp.broker.oa;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.model.ConditionParams;
import eu.dnetlib.dhp.broker.model.Event;
import eu.dnetlib.dhp.broker.model.MappedFields;
import eu.dnetlib.dhp.broker.model.Notification;
import eu.dnetlib.dhp.broker.model.Subscription;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.NotificationGroup;
import eu.dnetlib.dhp.broker.oa.util.SubscriptionUtils;
public class IndexNotificationsJob {
private static final Logger log = LoggerFactory.getLogger(IndexNotificationsJob.class);
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
IndexNotificationsJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json")));
parser.parseArgument(args);
final SparkConf conf = new SparkConf();
final String eventsPath = parser.get("workingPath") + "/events";
log.info("eventsPath: {}", eventsPath);
final String index = parser.get("index");
log.info("index: {}", index);
final String indexHost = parser.get("esHost");
log.info("indexHost: {}", indexHost);
final String brokerApiBaseUrl = parser.get("brokerApiBaseUrl");
log.info("brokerApiBaseUrl: {}", brokerApiBaseUrl);
final SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
final LongAccumulator total = spark.sparkContext().longAccumulator("total_indexed");
final long startTime = new Date().getTime();
final List<Subscription> subscriptions = listSubscriptions(brokerApiBaseUrl);
log.info("Number of subscriptions: " + subscriptions.size());
if (subscriptions.size() > 0) {
final Dataset<Notification> notifications = ClusterUtils
.readPath(spark, eventsPath, Event.class)
.map(e -> generateNotifications(e, subscriptions, startTime), Encoders.bean(NotificationGroup.class))
.flatMap(g -> g.getData().iterator(), Encoders.bean(Notification.class));
final JavaRDD<String> inputRdd = notifications
.map(n -> prepareForIndexing(n, total), Encoders.STRING())
.javaRDD();
final Map<String, String> esCfg = new HashMap<>();
// esCfg.put("es.nodes", "10.19.65.51, 10.19.65.52, 10.19.65.53, 10.19.65.54");
esCfg.put("es.index.auto.create", "false");
esCfg.put("es.nodes", indexHost);
esCfg.put("es.mapping.id", "notificationId"); // THE PRIMARY KEY
esCfg.put("es.batch.write.retry.count", "8");
esCfg.put("es.batch.write.retry.wait", "60s");
esCfg.put("es.batch.size.entries", "200");
esCfg.put("es.nodes.wan.only", "true");
log.info("*** Start indexing");
JavaEsSpark.saveJsonToEs(inputRdd, index, esCfg);
log.info("*** End indexing");
log.info("*** Deleting old notifications");
final String message = deleteOldNotifications(brokerApiBaseUrl, startTime - 1000);
log.info("*** Deleted notifications: " + message);
log.info("*** sendNotifications (emails, ...)");
sendNotifications(brokerApiBaseUrl, startTime - 1000);
log.info("*** ALL done.");
}
}
private static NotificationGroup generateNotifications(final Event e,
final List<Subscription> subscriptions,
final long date) {
final List<Notification> list = subscriptions
.stream()
.filter(
s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic()))
.filter(s -> verifyConditions(e.getMap(), s.conditionsAsMap()))
.map(s -> generateNotification(s, e, date))
.collect(Collectors.toList());
return new NotificationGroup(list);
}
private static Notification generateNotification(final Subscription s, final Event e, final long date) {
final Notification n = new Notification();
n.setNotificationId("ntf-" + DigestUtils.md5Hex(s.getSubscriptionId() + "@@@" + e.getEventId()));
n.setSubscriptionId(s.getSubscriptionId());
n.setEventId(e.getEventId());
n.setProducerId(e.getProducerId());
n.setTopic(e.getTopic());
n.setPayload(e.getPayload());
n.setMap(e.getMap());
n.setDate(date);
return n;
}
private static boolean verifyConditions(final MappedFields map,
final Map<String, List<ConditionParams>> conditions) {
if (conditions.containsKey("targetDatasourceName")
&& !SubscriptionUtils
.verifyExact(map.getTargetDatasourceName(), conditions.get("targetDatasourceName").get(0).getValue())) {
return false;
}
if (conditions.containsKey("trust")
&& !SubscriptionUtils
.verifyFloatRange(
map.getTrust(), conditions.get("trust").get(0).getValue(),
conditions.get("trust").get(0).getOtherValue())) {
return false;
}
if (conditions.containsKey("targetDateofacceptance") && !conditions
.get("targetDateofacceptance")
.stream()
.anyMatch(
c -> SubscriptionUtils
.verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) {
return false;
}
if (conditions.containsKey("targetResultTitle")
&& !conditions
.get("targetResultTitle")
.stream()
.anyMatch(c -> SubscriptionUtils.verifySimilar(map.getTargetResultTitle(), c.getValue()))) {
return false;
}
if (conditions.containsKey("targetAuthors")
&& !conditions
.get("targetAuthors")
.stream()
.allMatch(c -> SubscriptionUtils.verifyListSimilar(map.getTargetAuthors(), c.getValue()))) {
return false;
}
if (conditions.containsKey("targetSubjects")
&& !conditions
.get("targetSubjects")
.stream()
.allMatch(c -> SubscriptionUtils.verifyListExact(map.getTargetSubjects(), c.getValue()))) {
return false;
}
return true;
}
private static List<Subscription> listSubscriptions(final String brokerApiBaseUrl) throws Exception {
final String url = brokerApiBaseUrl + "/api/subscriptions";
final HttpGet req = new HttpGet(url);
final ObjectMapper mapper = new ObjectMapper();
try (final CloseableHttpClient client = HttpClients.createDefault()) {
try (final CloseableHttpResponse response = client.execute(req)) {
final String s = IOUtils.toString(response.getEntity().getContent());
return mapper
.readValue(s, mapper.getTypeFactory().constructCollectionType(List.class, Subscription.class));
}
}
}
private static String deleteOldNotifications(final String brokerApiBaseUrl, final long l) throws Exception {
final String url = brokerApiBaseUrl + "/api/notifications/byDate/0/" + l;
final HttpDelete req = new HttpDelete(url);
try (final CloseableHttpClient client = HttpClients.createDefault()) {
try (final CloseableHttpResponse response = client.execute(req)) {
return IOUtils.toString(response.getEntity().getContent());
}
}
}
private static String sendNotifications(final String brokerApiBaseUrl, final long l) throws IOException {
final String url = brokerApiBaseUrl + "/api/openaireBroker/notifications/send/" + l;
final HttpGet req = new HttpGet(url);
try (final CloseableHttpClient client = HttpClients.createDefault()) {
try (final CloseableHttpResponse response = client.execute(req)) {
return IOUtils.toString(response.getEntity().getContent());
}
}
}
private static String prepareForIndexing(final Notification n, final LongAccumulator acc)
throws JsonProcessingException {
acc.add(1);
return new ObjectMapper().writeValueAsString(n);
}
}

View File

@ -20,6 +20,7 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.model.Event; import eu.dnetlib.dhp.broker.model.Event;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
@Deprecated
public class IndexOnESJob { public class IndexOnESJob {
private static final Logger log = LoggerFactory.getLogger(IndexOnESJob.class); private static final Logger log = LoggerFactory.getLogger(IndexOnESJob.class);

View File

@ -0,0 +1,113 @@
package eu.dnetlib.dhp.broker.oa;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.IOException;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import eu.dnetlib.broker.api.ShortEventMessage;
import eu.dnetlib.broker.objects.OaBrokerEventPayload;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.model.Event;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import scala.Tuple2;
public class PartitionEventsByDsIdJob {
private static final Logger log = LoggerFactory.getLogger(PartitionEventsByDsIdJob.class);
private static final String OPENDOAR_NSPREFIX = "opendoar____::";
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
PartitionEventsByDsIdJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final SparkConf conf = new SparkConf();
final String eventsPath = parser.get("workingPath") + "/events";
log.info("eventsPath: {}", eventsPath);
final String partitionPath = parser.get("workingPath") + "/eventsByOpendoarId";
log.info("partitionPath: {}", partitionPath);
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
ClusterUtils
.readPath(spark, eventsPath, Event.class)
.filter(e -> StringUtils.isNotBlank(e.getMap().getTargetDatasourceId()))
.filter(e -> e.getMap().getTargetDatasourceId().contains(OPENDOAR_NSPREFIX))
.map(
e -> new Tuple2<>(
StringUtils.substringAfter(e.getMap().getTargetDatasourceId(), OPENDOAR_NSPREFIX),
messageFromNotification(e)),
Encoders.tuple(Encoders.STRING(), Encoders.bean(ShortEventMessage.class)))
.write()
.partitionBy("_1")
.mode(SaveMode.Overwrite)
.json(partitionPath);
});
renameSubDirs(partitionPath);
}
private static void renameSubDirs(final String path) throws IOException {
final String prefix = "_1=";
final FileSystem fs = FileSystem.get(new Configuration());
log.info("** Renaming subdirs of " + path);
for (final FileStatus fileStatus : fs.listStatus(new Path(path))) {
if (fileStatus.isDirectory()) {
final Path oldPath = fileStatus.getPath();
final String oldName = oldPath.getName();
if (oldName.startsWith(prefix)) {
final Path newPath = new Path(path + "/" + StringUtils.substringAfter(oldName, prefix));
log.info(" * " + oldPath.getName() + " -> " + newPath.getName());
fs.rename(oldPath, newPath);
}
}
}
}
private static ShortEventMessage messageFromNotification(final Event e) {
final Gson gson = new Gson();
final OaBrokerEventPayload payload = gson.fromJson(e.getPayload(), OaBrokerEventPayload.class);
final ShortEventMessage res = new ShortEventMessage();
res.setOriginalId(payload.getResult().getOriginalId());
res.setTitle(payload.getResult().getTitles().stream().filter(StringUtils::isNotBlank).findFirst().orElse(null));
res.setTopic(e.getTopic());
res.setTrust(payload.getTrust());
res.generateMessageFromObject(payload.getHighlight());
return res;
}
}

View File

@ -0,0 +1,44 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import eu.dnetlib.dhp.broker.model.Notification;
public class NotificationGroup implements Serializable {
/**
*
*/
private static final long serialVersionUID = 720996471281158977L;
private List<Notification> data = new ArrayList<>();
public NotificationGroup() {
}
public NotificationGroup(final List<Notification> data) {
this.data = data;
}
public List<Notification> getData() {
return data;
}
public void setData(final List<Notification> data) {
this.data = data;
}
public NotificationGroup addElement(final Notification elem) {
data.add(elem);
return this;
}
public NotificationGroup addGroup(final NotificationGroup group) {
data.addAll(group.getData());
return this;
}
}

View File

@ -0,0 +1,49 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.text.ParseException;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.commons.lang3.time.DateUtils;
public class SubscriptionUtils {
private static final long ONE_DAY = 86_400_000;
public static boolean verifyListSimilar(final List<String> list, final String value) {
return list.stream().anyMatch(s -> verifySimilar(s, value));
}
public static boolean verifyListExact(final List<String> list, final String value) {
return list.stream().anyMatch(s -> verifyExact(s, value));
}
public static boolean verifySimilar(final String s1, final String s2) {
for (final String part : s2.split("\\W+")) {
if (!StringUtils.containsIgnoreCase(s1, part)) {
return false;
}
}
return true;
}
public static boolean verifyFloatRange(final float trust, final String min, final String max) {
return trust >= NumberUtils.toFloat(min, 0) && trust <= NumberUtils.toFloat(max, 1);
}
public static boolean verifyDateRange(final long date, final String min, final String max) {
try {
return date >= DateUtils.parseDate(min, "yyyy-MM-dd").getTime()
&& date < DateUtils.parseDate(max, "yyyy-MM-dd").getTime() + ONE_DAY;
} catch (final ParseException e) {
return false;
}
}
public static boolean verifyExact(final String s1, final String s2) {
return StringUtils.equalsIgnoreCase(s1, s2);
}
}

View File

@ -2,8 +2,6 @@
package eu.dnetlib.dhp.broker.oa.util.aggregators.stats; package eu.dnetlib.dhp.broker.oa.util.aggregators.stats;
import java.io.Serializable; import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
public class DatasourceStats implements Serializable { public class DatasourceStats implements Serializable {
@ -15,7 +13,8 @@ public class DatasourceStats implements Serializable {
private String id; private String id;
private String name; private String name;
private String type; private String type;
private Map<String, Long> topics = new HashMap<>(); private String topic;
private long size = 0l;
public String getId() { public String getId() {
return id; return id;
@ -41,21 +40,24 @@ public class DatasourceStats implements Serializable {
this.type = type; this.type = type;
} }
public Map<String, Long> getTopics() { public String getTopic() {
return topics; return topic;
} }
public void setTopics(final Map<String, Long> topics) { public void setTopic(final String topic) {
this.topics = topics; this.topic = topic;
} }
public void incrementTopic(final String topic, final long inc) { public long getSize() {
if (topics.containsKey(topic)) { return size;
topics.put(topic, topics.get(topic) + inc); }
} else {
topics.put(topic, inc);
}
public void setSize(final long size) {
this.size = size;
}
public void incrementSize(final long inc) {
this.size = this.size + inc;
} }
} }

View File

@ -25,7 +25,8 @@ public class StatsAggregator extends Aggregator<Event, DatasourceStats, Datasour
stats.setId(e.getMap().getTargetDatasourceId()); stats.setId(e.getMap().getTargetDatasourceId());
stats.setName(e.getMap().getTargetDatasourceName()); stats.setName(e.getMap().getTargetDatasourceName());
stats.setType(e.getMap().getTargetDatasourceType()); stats.setType(e.getMap().getTargetDatasourceType());
stats.incrementTopic(e.getTopic(), 1l); stats.setTopic(e.getTopic());
stats.incrementSize(1l);
return stats; return stats;
} }
@ -35,8 +36,9 @@ public class StatsAggregator extends Aggregator<Event, DatasourceStats, Datasour
stats0.setId(stats1.getId()); stats0.setId(stats1.getId());
stats0.setName(stats1.getName()); stats0.setName(stats1.getName());
stats0.setType(stats1.getType()); stats0.setType(stats1.getType());
stats0.setTopic(stats1.getTopic());
} }
stats1.getTopics().entrySet().forEach(e -> stats0.incrementTopic(e.getKey(), e.getValue())); stats0.incrementSize(stats1.getSize());
return stats0; return stats0;
} }

View File

@ -0,0 +1,67 @@
package eu.dnetlib.dhp.broker.oa.util.aggregators.subset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator;
import eu.dnetlib.dhp.broker.model.Event;
import eu.dnetlib.dhp.broker.oa.util.EventGroup;
public class EventSubsetAggregator extends Aggregator<Event, EventGroup, EventGroup> {
/**
*
*/
private static final long serialVersionUID = -678071078823059805L;
private final int maxEventsForTopic;
public EventSubsetAggregator(final int maxEventsForTopic) {
this.maxEventsForTopic = maxEventsForTopic;
}
@Override
public EventGroup zero() {
return new EventGroup();
}
@Override
public EventGroup reduce(final EventGroup g, final Event e) {
if (g.getData().size() < maxEventsForTopic) {
g.getData().add(e);
}
return g;
}
@Override
public EventGroup merge(final EventGroup g0, final EventGroup g1) {
final int missing = maxEventsForTopic - g0.getData().size();
if (missing > 0) {
if (g1.getData().size() < missing) {
g0.getData().addAll(g1.getData());
} else {
g0.getData().addAll(g1.getData().subList(0, missing));
}
}
return g0;
}
@Override
public EventGroup finish(final EventGroup g) {
return g;
}
@Override
public Encoder<EventGroup> outputEncoder() {
return Encoders.bean(EventGroup.class);
}
@Override
public Encoder<EventGroup> bufferEncoder() {
return Encoders.bean(EventGroup.class);
}
}

View File

@ -25,13 +25,25 @@
<description>a black list (comma separeted, - for empty list) of datasource ids</description> <description>a black list (comma separeted, - for empty list) of datasource ids</description>
</property> </property>
<property> <property>
<name>esIndexName</name> <name>esEventIndexName</name>
<description>the elasticsearch index name</description> <description>the elasticsearch index name for events</description>
</property>
<property>
<name>esNotificationsIndexName</name>
<description>the elasticsearch index name for notifications</description>
</property> </property>
<property> <property>
<name>esIndexHost</name> <name>esIndexHost</name>
<description>the elasticsearch host</description> <description>the elasticsearch host</description>
</property> </property>
<property>
<name>maxIndexedEventsForDsAndTopic</name>
<description>the max number of events for each couple (ds/topic)</description>
</property>
<property>
<name>brokerApiBaseUrl</name>
<description>the url of the broker service api</description>
</property>
<property> <property>
<name>sparkDriverMemory</name> <name>sparkDriverMemory</name>
<description>memory for driver process</description> <description>memory for driver process</description>
@ -423,16 +435,16 @@
<arg>--datasourceTypeWhitelist</arg><arg>${datasourceTypeWhitelist}</arg> <arg>--datasourceTypeWhitelist</arg><arg>${datasourceTypeWhitelist}</arg>
<arg>--datasourceIdBlacklist</arg><arg>${datasourceIdBlacklist}</arg> <arg>--datasourceIdBlacklist</arg><arg>${datasourceIdBlacklist}</arg>
</spark> </spark>
<ok to="index_es"/> <ok to="index_event_subset"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="index_es"> <action name="index_event_subset">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>IndexOnESJob</name> <name>IndexEventSubsetOnESJob</name>
<class>eu.dnetlib.dhp.broker.oa.IndexOnESJob</class> <class>eu.dnetlib.dhp.broker.oa.IndexEventSubsetJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar> <jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
@ -445,8 +457,36 @@
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--index</arg><arg>${esIndexName}</arg> <arg>--index</arg><arg>${esEventIndexName}</arg>
<arg>--esHost</arg><arg>${esIndexHost}</arg> <arg>--esHost</arg><arg>${esIndexHost}</arg>
<arg>--maxEventsForTopic</arg><arg>${maxIndexedEventsForDsAndTopic}</arg>
<arg>--brokerApiBaseUrl</arg><arg>${brokerApiBaseUrl}</arg>
</spark>
<ok to="index_notifications"/>
<error to="Kill"/>
</action>
<action name="index_notifications">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>IndexNotificationsOnESJob</name>
<class>eu.dnetlib.dhp.broker.oa.IndexNotificationsJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.dynamicAllocation.maxExecutors="8"
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--index</arg><arg>${esNotificationsIndexName}</arg>
<arg>--esHost</arg><arg>${esIndexHost}</arg>
<arg>--brokerApiBaseUrl</arg><arg>${brokerApiBaseUrl}</arg>
</spark> </spark>
<ok to="stats"/> <ok to="stats"/>
<error to="Kill"/> <error to="Kill"/>

View File

@ -0,0 +1,32 @@
[
{
"paramName": "o",
"paramLongName": "workingPath",
"paramDescription": "the workinh path",
"paramRequired": true
},
{
"paramName": "idx",
"paramLongName": "index",
"paramDescription": "the ES index",
"paramRequired": true
},
{
"paramName": "es",
"paramLongName": "esHost",
"paramDescription": "the ES host",
"paramRequired": true
},
{
"paramName": "n",
"paramLongName": "maxEventsForTopic",
"paramDescription": "the max number of events for each couple (ds/topic)",
"paramRequired": true
},
{
"paramName": "broker",
"paramLongName": "brokerApiBaseUrl",
"paramDescription": "the url of the broker service api",
"paramRequired": true
}
]

View File

@ -0,0 +1,26 @@
[
{
"paramName": "o",
"paramLongName": "workingPath",
"paramDescription": "the workinh path",
"paramRequired": true
},
{
"paramName": "idx",
"paramLongName": "index",
"paramDescription": "the ES index",
"paramRequired": true
},
{
"paramName": "es",
"paramLongName": "esHost",
"paramDescription": "the ES host",
"paramRequired": true
},
{
"paramName": "broker",
"paramLongName": "brokerApiBaseUrl",
"paramDescription": "the url of the broker service api",
"paramRequired": true
}
]

View File

@ -0,0 +1,18 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
</configuration>

View File

@ -0,0 +1,137 @@
<workflow-app name="create broker events - partial" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>graphInputPath</name>
<description>the path where the graph is stored</description>
</property>
<property>
<name>workingPath</name>
<description>the path where the the generated data will be stored</description>
</property>
<property>
<name>datasourceIdWhitelist</name>
<value>-</value>
<description>a white list (comma separeted, - for empty list) of datasource ids</description>
</property>
<property>
<name>datasourceTypeWhitelist</name>
<value>-</value>
<description>a white list (comma separeted, - for empty list) of datasource types</description>
</property>
<property>
<name>datasourceIdBlacklist</name>
<value>-</value>
<description>a black list (comma separeted, - for empty list) of datasource ids</description>
</property>
<property>
<name>esEventIndexName</name>
<description>the elasticsearch index name for events</description>
</property>
<property>
<name>esNotificationsIndexName</name>
<description>the elasticsearch index name for notifications</description>
</property>
<property>
<name>esIndexHost</name>
<description>the elasticsearch host</description>
</property>
<property>
<name>maxIndexedEventsForDsAndTopic</name>
<description>the max number of events for each couple (ds/topic)</description>
</property>
<property>
<name>brokerApiBaseUrl</name>
<description>the url of the broker service api</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="index_notifications"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="index_notifications">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>IndexNotificationsOnESJob</name>
<class>eu.dnetlib.dhp.broker.oa.IndexNotificationsJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.dynamicAllocation.maxExecutors="8"
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--index</arg><arg>${esNotificationsIndexName}</arg>
<arg>--esHost</arg><arg>${esIndexHost}</arg>
<arg>--brokerApiBaseUrl</arg><arg>${brokerApiBaseUrl}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -8,6 +8,53 @@
<property> <property>
<name>workingPath</name> <name>workingPath</name>
<description>the path where the the generated data will be stored</description> <description>the path where the the generated data will be stored</description>
</property>
<property>
<name>datasourceIdWhitelist</name>
<value>-</value>
<description>a white list (comma separeted, - for empty list) of datasource ids</description>
</property>
<property>
<name>datasourceTypeWhitelist</name>
<value>-</value>
<description>a white list (comma separeted, - for empty list) of datasource types</description>
</property>
<property>
<name>datasourceIdBlacklist</name>
<value>-</value>
<description>a black list (comma separeted, - for empty list) of datasource ids</description>
</property>
<property>
<name>esEventIndexName</name>
<description>the elasticsearch index name for events</description>
</property>
<property>
<name>esNotificationsIndexName</name>
<description>the elasticsearch index name for notifications</description>
</property>
<property>
<name>esIndexHost</name>
<description>the elasticsearch host</description>
</property>
<property>
<name>maxIndexedEventsForDsAndTopic</name>
<description>the max number of events for each couple (ds/topic)</description>
</property>
<property>
<name>brokerApiBaseUrl</name>
<description>the url of the broker service api</description>
</property>
<property>
<name>brokerDbUrl</name>
<description>the url of the broker database</description>
</property>
<property>
<name>brokerDbUser</name>
<description>the user of the broker database</description>
</property>
<property>
<name>brokerDbPassword</name>
<description>the password of the broker database</description>
</property> </property>
<property> <property>
<name>sparkDriverMemory</name> <name>sparkDriverMemory</name>
@ -64,23 +111,23 @@
</configuration> </configuration>
</global> </global>
<start to="index_es"/> <start to="stats"/>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<action name="index_es"> <action name="stats">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>IndexOnESJob</name> <name>GenerateStatsJob</name>
<class>eu.dnetlib.dhp.broker.oa.IndexOnESJob</class> <class>eu.dnetlib.dhp.broker.oa.GenerateStatsJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar> <jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.dynamicAllocation.maxExecutors="8"
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
@ -88,8 +135,10 @@
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--index</arg><arg>${esIndexName}</arg> <arg>--dbUrl</arg><arg>${brokerDbUrl}</arg>
<arg>--esHost</arg><arg>${esIndexHost}</arg> <arg>--dbUser</arg><arg>${brokerDbUser}</arg>
<arg>--dbPassword</arg><arg>${brokerDbPassword}</arg>
<arg>--brokerApiBaseUrl</arg><arg>${brokerApiBaseUrl}</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>

View File

@ -0,0 +1,32 @@
[
{
"paramName": "wp",
"paramLongName": "workingPath",
"paramDescription": "the working path",
"paramRequired": true
},
{
"paramName": "dburl",
"paramLongName": "dbUrl",
"paramDescription": "the broker database url",
"paramRequired": true
},
{
"paramName": "u",
"paramLongName": "dbUser",
"paramDescription": "the broker database user",
"paramRequired": true
},
{
"paramName": "p",
"paramLongName": "dbPassword",
"paramDescription": "the broker database password",
"paramRequired": true
},
{
"paramName": "broker",
"paramLongName": "brokerApiBaseUrl",
"paramDescription": "the url of the broker service api",
"paramRequired": true
}
]

View File

@ -0,0 +1,52 @@
package eu.dnetlib.dhp.broker.oa.util;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Arrays;
import org.junit.jupiter.api.Test;
class SubscriptionUtilsTest {
@Test
void testVerifyListSimilar() {
assertTrue(SubscriptionUtils.verifyListSimilar(Arrays.asList("Michele Artini", "Claudio Atzori"), "artini"));
assertFalse(SubscriptionUtils.verifyListSimilar(Arrays.asList("Michele Artini", "Claudio Atzori"), "bardi"));
}
@Test
void testVerifyListExact() {
assertTrue(SubscriptionUtils.verifyListExact(Arrays.asList("Java", "Perl"), "perl"));
assertFalse(SubscriptionUtils.verifyListExact(Arrays.asList("Java", "Perl"), "C"));
}
@Test
void testVerifySimilar() {
assertTrue(SubscriptionUtils.verifySimilar("Java Programming", "java"));
assertFalse(SubscriptionUtils.verifySimilar("Java Programming", "soap"));
}
@Test
void testVerifyFloatRange() {
assertTrue(SubscriptionUtils.verifyFloatRange(0.5f, "0.4", "0.6"));
assertFalse(SubscriptionUtils.verifyFloatRange(0.8f, "0.4", "0.6"));
assertTrue(SubscriptionUtils.verifyFloatRange(0.5f, "", ""));
}
@Test
void testVerifyDateRange() {
final long date = 1282738478000l; // 25 August 2010
assertTrue(SubscriptionUtils.verifyDateRange(date, "2010-01-01", "2011-01-01"));
assertFalse(SubscriptionUtils.verifyDateRange(date, "2020-01-01", "2021-01-01"));
}
@Test
void testVerifyExact() {
assertTrue(SubscriptionUtils.verifyExact("Java Programming", "java programming"));
assertFalse(SubscriptionUtils.verifyExact("Java Programming", "soap programming"));
}
}

View File

@ -373,6 +373,7 @@ public class ResultMapper implements Serializable {
private static Instance getGraphInstance(eu.dnetlib.dhp.schema.oaf.Instance i) { private static Instance getGraphInstance(eu.dnetlib.dhp.schema.oaf.Instance i) {
Instance instance = new Instance(); Instance instance = new Instance();
setCommonValue(i, instance); setCommonValue(i, instance);
return instance; return instance;

View File

@ -577,7 +577,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
final String lissn = StringUtils.isNotBlank(arr[2]) ? arr[2].trim() : null; final String lissn = StringUtils.isNotBlank(arr[2]) ? arr[2].trim() : null;
if (issn != null || eissn != null || lissn != null) { if (issn != null || eissn != null || lissn != null) {
return journal(name, issn, eissn, eissn, null, null, null, null, null, null, null, info); return journal(name, issn, eissn, lissn, null, null, null, null, null, null, null, info);
} }
} }
} }

View File

@ -85,7 +85,7 @@ SELECT
dc.officialname AS collectedfromname, dc.officialname AS collectedfromname,
d.typology||'@@@dnet:datasource_typologies' AS datasourcetype, d.typology||'@@@dnet:datasource_typologies' AS datasourcetype,
'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction, 'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction,
d.issn || ' @@@ ' || d.eissn || ' @@@ ' || d.lissn AS journal concat_ws(' @@@ ', d.issn, d.eissn, d.lissn) AS journal
FROM dsm_datasources d FROM dsm_datasources d

View File

@ -73,12 +73,16 @@ public class MigrateDbEntitiesApplicationTest {
final Datasource ds = (Datasource) list.get(0); final Datasource ds = (Datasource) list.get(0);
assertValidId(ds.getId()); assertValidId(ds.getId());
assertValidId(ds.getCollectedfrom().get(0).getKey()); assertValidId(ds.getCollectedfrom().get(0).getKey());
assertEquals(ds.getOfficialname().getValue(), getValueAsString("officialname", fields)); assertEquals(getValueAsString("officialname", fields), ds.getOfficialname().getValue());
assertEquals(ds.getEnglishname().getValue(), getValueAsString("englishname", fields)); assertEquals(getValueAsString("englishname", fields), ds.getEnglishname().getValue());
assertEquals(ds.getContactemail().getValue(), getValueAsString("contactemail", fields)); assertEquals(getValueAsString("contactemail", fields), ds.getContactemail().getValue());
assertEquals(ds.getWebsiteurl().getValue(), getValueAsString("websiteurl", fields)); assertEquals(getValueAsString("websiteurl", fields), ds.getWebsiteurl().getValue());
assertEquals(ds.getNamespaceprefix().getValue(), getValueAsString("namespaceprefix", fields)); assertEquals(getValueAsString("namespaceprefix", fields), ds.getNamespaceprefix().getValue());
assertEquals(ds.getCollectedfrom().get(0).getValue(), getValueAsString("collectedfromname", fields)); assertEquals(getValueAsString("collectedfromname", fields), ds.getCollectedfrom().get(0).getValue());
assertEquals(getValueAsString("officialname", fields), ds.getJournal().getName());
assertEquals("2579-5449", ds.getJournal().getIssnPrinted());
assertEquals("2597-6540", ds.getJournal().getIssnOnline());
assertEquals(null, ds.getJournal().getIssnLinking());
} }
@Test @Test
@ -92,9 +96,11 @@ public class MigrateDbEntitiesApplicationTest {
final Project p = (Project) list.get(0); final Project p = (Project) list.get(0);
assertValidId(p.getId()); assertValidId(p.getId());
assertValidId(p.getCollectedfrom().get(0).getKey()); assertValidId(p.getCollectedfrom().get(0).getKey());
assertEquals(p.getAcronym().getValue(), getValueAsString("acronym", fields)); assertEquals(getValueAsString("acronym", fields), p.getAcronym().getValue());
assertEquals(p.getTitle().getValue(), getValueAsString("title", fields)); assertEquals(getValueAsString("title", fields), p.getTitle().getValue());
assertEquals(p.getCollectedfrom().get(0).getValue(), getValueAsString("collectedfromname", fields)); assertEquals(getValueAsString("collectedfromname", fields), p.getCollectedfrom().get(0).getValue());
assertEquals(getValueAsFloat("fundedamount", fields), p.getFundedamount());
assertEquals(getValueAsFloat("totalcost", fields), p.getTotalcost());
} }
@Test @Test
@ -110,14 +116,14 @@ public class MigrateDbEntitiesApplicationTest {
final Organization o = (Organization) list.get(0); final Organization o = (Organization) list.get(0);
assertValidId(o.getId()); assertValidId(o.getId());
assertValidId(o.getCollectedfrom().get(0).getKey()); assertValidId(o.getCollectedfrom().get(0).getKey());
assertEquals(o.getLegalshortname().getValue(), getValueAsString("legalshortname", fields)); assertEquals(getValueAsString("legalshortname", fields), o.getLegalshortname().getValue());
assertEquals(o.getLegalname().getValue(), getValueAsString("legalname", fields)); assertEquals(getValueAsString("legalname", fields), o.getLegalname().getValue());
assertEquals(o.getWebsiteurl().getValue(), getValueAsString("websiteurl", fields)); assertEquals(getValueAsString("websiteurl", fields), o.getWebsiteurl().getValue());
assertEquals(o.getCountry().getClassid(), getValueAsString("country", fields).split("@@@")[0]); assertEquals(getValueAsString("country", fields).split("@@@")[0], o.getCountry().getClassid());
assertEquals(o.getCountry().getClassname(), getValueAsString("country", fields).split("@@@")[0]); assertEquals(getValueAsString("country", fields).split("@@@")[0], o.getCountry().getClassname());
assertEquals(o.getCountry().getSchemeid(), getValueAsString("country", fields).split("@@@")[1]); assertEquals(getValueAsString("country", fields).split("@@@")[1], o.getCountry().getSchemeid());
assertEquals(o.getCountry().getSchemename(), getValueAsString("country", fields).split("@@@")[1]); assertEquals(getValueAsString("country", fields).split("@@@")[1], o.getCountry().getSchemename());
assertEquals(o.getCollectedfrom().get(0).getValue(), getValueAsString("collectedfromname", fields)); assertEquals(getValueAsString("collectedfromname", fields), o.getCollectedfrom().get(0).getValue());
} }
@Test @Test
@ -322,12 +328,20 @@ public class MigrateDbEntitiesApplicationTest {
} }
private String getValueAsString(final String name, final List<TypedField> fields) { private String getValueAsString(final String name, final List<TypedField> fields) {
return getValueAs(name, fields);
}
private Float getValueAsFloat(final String name, final List<TypedField> fields) {
return new Float(getValueAs(name, fields).toString());
}
private <T> T getValueAs(final String name, final List<TypedField> fields) {
return fields return fields
.stream() .stream()
.filter(f -> f.getField().equals(name)) .filter(f -> f.getField().equals(name))
.map(TypedField::getValue) .map(TypedField::getValue)
.filter(Objects::nonNull) .filter(Objects::nonNull)
.map(o -> o.toString()) .map(o -> (T) o)
.findFirst() .findFirst()
.get(); .get();
} }

View File

@ -142,12 +142,12 @@
{ {
"field": "totalcost", "field": "totalcost",
"type": "double", "type": "double",
"value": null "value": 157846
}, },
{ {
"field": "fundedamount", "field": "fundedamount",
"type": "double", "type": "double",
"value": null "value": 157846
}, },
{ {
"field": "collectedfromid", "field": "collectedfromid",

View File

@ -663,7 +663,7 @@
<mockito-core.version>3.3.3</mockito-core.version> <mockito-core.version>3.3.3</mockito-core.version>
<mongodb.driver.version>3.4.2</mongodb.driver.version> <mongodb.driver.version>3.4.2</mongodb.driver.version>
<vtd.version>[2.12,3.0)</vtd.version> <vtd.version>[2.12,3.0)</vtd.version>
<dnet.openaire.broker.common>3.1.0</dnet.openaire.broker.common> <dnet.openaire.broker.common>3.1.1</dnet.openaire.broker.common>
<solr.version>7.5.0</solr.version> <solr.version>7.5.0</solr.version>
<okhttp.version>4.7.2</okhttp.version> <okhttp.version>4.7.2</okhttp.version>
<common.compress.version>1.1</common.compress.version> <common.compress.version>1.1</common.compress.version>