diff --git a/.gitignore b/.gitignore
new file mode 100644
index 00000000..2d773071
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,27 @@
+.DS_Store
+.idea
+*.iws
+*.ipr
+*.iml
+*.ipr
+*.iws
+*~
+.vscode
+.classpath
+/*/.classpath
+/*/*/.classpath
+.metadata
+/*/.metadata
+/*/*/.metadata
+.project
+.settings
+/*/*/target
+/*/target
+/target
+/*/*/build
+/*/build
+/build
+spark-warehouse
+/**/job-override.properties
+/**/*.log
+
diff --git a/dhp-broker-application/.factorypath b/dhp-broker-application/.factorypath
new file mode 100644
index 00000000..9ec9ea83
--- /dev/null
+++ b/dhp-broker-application/.factorypath
@@ -0,0 +1,151 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/dhp-broker-application/.svn/wc.db-journal b/dhp-broker-application/.svn/wc.db-journal
deleted file mode 100644
index e69de29b..00000000
diff --git a/dhp-broker-application/literatureBrokerService.iml b/dhp-broker-application/literatureBrokerService.iml
deleted file mode 100644
index 0d098172..00000000
--- a/dhp-broker-application/literatureBrokerService.iml
+++ /dev/null
@@ -1,184 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/dhp-broker-application/pom.xml b/dhp-broker-application/pom.xml
index 35f1d977..997eb7a4 100644
--- a/dhp-broker-application/pom.xml
+++ b/dhp-broker-application/pom.xml
@@ -2,22 +2,21 @@
- 4.0.0
-
- eu.dnetlib
- literatureBrokerService
- 0.0.1-SNAPSHOT
- jar
-
- literatureBrokerService
- Literature Broker Service
- org.springframework.boot
- spring-boot-starter-parent
- 2.3.0.M4
+ eu.dnetlib
+ dnet-applications
+ 3.0.0-SNAPSHOT
+ 4.0.0
+
+ dhp-broker-application
+ jar
+
+ dhp-broker-application
+ D-Net Broker Service (new Hadoop implementation)
+
UTF-8
UTF-8
@@ -72,26 +71,10 @@
h2
-
+
- javax.validation
- validation-api
-
-
-
- org.apache.commons
- commons-lang3
-
-
-
- commons-codec
- commons-codec
-
-
-
- commons-io
- commons-io
- 2.5
+ com.google.code.gson
+ gson
@@ -101,11 +84,7 @@
1.4.7
-
-
- com.google.code.gson
- gson
-
+
diff --git a/dhp-broker-application/src/main/java/eu/dnetlib/lbs/openaire/OpenaireBrokerController.java b/dhp-broker-application/src/main/java/eu/dnetlib/lbs/openaire/OpenaireBrokerController.java
index e342db4c..cbf7dd51 100644
--- a/dhp-broker-application/src/main/java/eu/dnetlib/lbs/openaire/OpenaireBrokerController.java
+++ b/dhp-broker-application/src/main/java/eu/dnetlib/lbs/openaire/OpenaireBrokerController.java
@@ -76,12 +76,12 @@ public class OpenaireBrokerController extends AbstractLbsController {
public List findDatasourcesWithEvents() {
final NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
- .withQuery(QueryBuilders.matchAllQuery())
- .withSearchType(SearchType.DEFAULT)
- .addAggregation(AggregationBuilders.nested("nested", "map")
- // .path("map")
- .subAggregation(AggregationBuilders.terms("by_map").field("map.target_datasource_name").size(1000).minDocCount(1)))
- .build();
+ .withQuery(QueryBuilders.matchAllQuery())
+ .withSearchType(SearchType.DEFAULT)
+ .addAggregation(AggregationBuilders.nested("nested", "map")
+ // .path("map")
+ .subAggregation(AggregationBuilders.terms("by_map").field("map.target_datasource_name").size(1000).minDocCount(1)))
+ .build();
final SearchHits hits = esOperations.search(searchQuery, Event.class, IndexCoordinates.of(props.getEventsIndexName()));
@@ -90,9 +90,9 @@ public class OpenaireBrokerController extends AbstractLbsController {
final Aggregation aggByMap = ((ParsedNested) aggregations.asMap().get("nested")).getAggregations().asMap().get("by_map");
return ((ParsedStringTerms) aggByMap).getBuckets()
- .stream()
- .map(b -> new BrowseEntry(b.getKeyAsString(), b.getDocCount()))
- .collect(Collectors.toList());
+ .stream()
+ .map(b -> new BrowseEntry(b.getKeyAsString(), b.getDocCount()))
+ .collect(Collectors.toList());
}
@ApiOperation("Return the topics of the events of a datasource")
@@ -100,19 +100,19 @@ public class OpenaireBrokerController extends AbstractLbsController {
public List findTopicsForDatasource(@RequestParam final String ds) {
final NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
- .withQuery(QueryBuilders.nestedQuery("map", QueryBuilders.matchQuery("map.target_datasource_name", ds), ScoreMode.None))
- .withSearchType(SearchType.DEFAULT)
- .addAggregation(AggregationBuilders.terms("topic").field("topic").size(1000).minDocCount(1))
- .build();
+ .withQuery(QueryBuilders.nestedQuery("map", QueryBuilders.matchQuery("map.target_datasource_name", ds), ScoreMode.None))
+ .withSearchType(SearchType.DEFAULT)
+ .addAggregation(AggregationBuilders.terms("topic").field("topic").size(1000).minDocCount(1))
+ .build();
final SearchHits hits = esOperations.search(searchQuery, Event.class, IndexCoordinates.of(props.getEventsIndexName()));
final Aggregations aggregations = hits.getAggregations();
return ((ParsedStringTerms) aggregations.asMap().get("topic")).getBuckets()
- .stream()
- .map(b -> new BrowseEntry(b.getKeyAsString(), b.getDocCount()))
- .collect(Collectors.toList());
+ .stream()
+ .map(b -> new BrowseEntry(b.getKeyAsString(), b.getDocCount()))
+ .collect(Collectors.toList());
}
@ApiOperation("Return a page of events of a datasource (by topic)")
@@ -120,21 +120,21 @@ public class OpenaireBrokerController extends AbstractLbsController {
public EventsPage showEvents(@RequestParam final String ds, @RequestParam final String topic, @PathVariable final int nPage, @PathVariable final int size) {
final NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
- .withQuery(QueryBuilders.boolQuery()
- .must(QueryBuilders.matchQuery("topic", topic))
- .must(QueryBuilders.nestedQuery("map", QueryBuilders.matchQuery("map.target_datasource_name", ds), ScoreMode.None)))
- .withSearchType(SearchType.DEFAULT)
- .withFields("payload")
- .withPageable(PageRequest.of(nPage, size))
- .build();
+ .withQuery(QueryBuilders.boolQuery()
+ .must(QueryBuilders.matchQuery("topic", topic))
+ .must(QueryBuilders.nestedQuery("map", QueryBuilders.matchQuery("map.target_datasource_name", ds), ScoreMode.None)))
+ .withSearchType(SearchType.DEFAULT)
+ .withFields("payload")
+ .withPageable(PageRequest.of(nPage, size))
+ .build();
final SearchHits page = esOperations.search(searchQuery, Event.class, IndexCoordinates.of(props.getEventsIndexName()));
final List list = page.stream()
- .map(SearchHit::getContent)
- .map(Event::getPayload)
- .map(OpenAireEventPayload::fromJSON)
- .collect(Collectors.toList());
+ .map(SearchHit::getContent)
+ .map(Event::getPayload)
+ .map(OpenAireEventPayload::fromJSON)
+ .collect(Collectors.toList());
return new EventsPage(ds, topic, nPage, overrideGetTotalPage(page, size), page.getTotalHits(), list);
}
@@ -152,21 +152,21 @@ public class OpenaireBrokerController extends AbstractLbsController {
ElasticSearchQueryUtils.addMapConditionForDates(mapQuery, "map.target_dateofacceptance", qObj.getDates());
final NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
- .withQuery(QueryBuilders.boolQuery()
- .must(QueryBuilders.matchQuery("topic", qObj.getTopic()))
- .must(QueryBuilders.nestedQuery("map", mapQuery, ScoreMode.None)))
- .withSearchType(SearchType.DEFAULT)
- .withFields("payload")
- .withPageable(PageRequest.of(nPage, size))
- .build();
+ .withQuery(QueryBuilders.boolQuery()
+ .must(QueryBuilders.matchQuery("topic", qObj.getTopic()))
+ .must(QueryBuilders.nestedQuery("map", mapQuery, ScoreMode.None)))
+ .withSearchType(SearchType.DEFAULT)
+ .withFields("payload")
+ .withPageable(PageRequest.of(nPage, size))
+ .build();
final SearchHits page = esOperations.search(searchQuery, Event.class, IndexCoordinates.of(props.getEventsIndexName()));
final List list = page.stream()
- .map(SearchHit::getContent)
- .map(Event::getPayload)
- .map(OpenAireEventPayload::fromJSON)
- .collect(Collectors.toList());
+ .map(SearchHit::getContent)
+ .map(Event::getPayload)
+ .map(OpenAireEventPayload::fromJSON)
+ .collect(Collectors.toList());
return new EventsPage(qObj.getDatasource(), qObj.getTopic(), nPage, overrideGetTotalPage(page, size), page.getTotalHits(), list);
}
@@ -186,8 +186,8 @@ public class OpenaireBrokerController extends AbstractLbsController {
public Map> subscriptions(@RequestParam final String email) {
final Iterable iter = subscriptionRepo.findBySubscriber(email);
return StreamSupport.stream(iter.spliterator(), false)
- .map(this::subscriptionDesc)
- .collect(Collectors.groupingBy(SimpleSubscriptionDesc::getDatasource));
+ .map(this::subscriptionDesc)
+ .collect(Collectors.groupingBy(SimpleSubscriptionDesc::getDatasource));
}
@ApiOperation("Return a page of notifications")
@@ -200,19 +200,19 @@ public class OpenaireBrokerController extends AbstractLbsController {
final Subscription sub = optSub.get();
final NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
- .withQuery(QueryBuilders.matchQuery("subscriptionId", subscrId))
- .withSearchType(SearchType.DEFAULT)
- .withFields("payload")
- .withPageable(PageRequest.of(nPage, size))
- .build();
+ .withQuery(QueryBuilders.matchQuery("subscriptionId", subscrId))
+ .withSearchType(SearchType.DEFAULT)
+ .withFields("payload")
+ .withPageable(PageRequest.of(nPage, size))
+ .build();
final SearchHits page = esOperations.search(searchQuery, Notification.class, IndexCoordinates.of(props.getNotificationsIndexName()));
final List list = page.stream()
- .map(SearchHit::getContent)
- .map(Notification::getPayload)
- .map(OpenAireEventPayload::fromJSON)
- .collect(Collectors.toList());
+ .map(SearchHit::getContent)
+ .map(Notification::getPayload)
+ .map(OpenAireEventPayload::fromJSON)
+ .collect(Collectors.toList());
return new EventsPage(extractDatasource(sub), sub.getTopic(), nPage, overrideGetTotalPage(page, size), page.getTotalHits(), list);
} else {
@@ -233,13 +233,13 @@ public class OpenaireBrokerController extends AbstractLbsController {
final AbstractElasticsearchTemplate esTemplate = (AbstractElasticsearchTemplate) esOperations;
final NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
- .withQuery(QueryBuilders.matchQuery("subscriptionId", subscrId))
- .withSearchType(SearchType.DEFAULT)
- .withFields("payload")
- .build();
+ .withQuery(QueryBuilders.matchQuery("subscriptionId", subscrId))
+ .withSearchType(SearchType.DEFAULT)
+ .withFields("payload")
+ .build();
final SearchScrollHits scroll =
- ElasticSearchScrollUtils.startScroll(esTemplate, IndexCoordinates.of(props.getNotificationsIndexName()), searchQuery, Notification.class);
+ ElasticSearchScrollUtils.startScroll(esTemplate, IndexCoordinates.of(props.getNotificationsIndexName()), searchQuery, Notification.class);
if (scroll.hasSearchHits()) {
final List values = calculateEventPayloads(scroll);
@@ -278,10 +278,10 @@ public class OpenaireBrokerController extends AbstractLbsController {
final Gson gson = new Gson();
return scroll.stream()
- .map(SearchHit::getContent)
- .map(Notification::getPayload)
- .map(s -> gson.fromJson(s, OpenAireEventPayload.class))
- .collect(Collectors.toList());
+ .map(SearchHit::getContent)
+ .map(Notification::getPayload)
+ .map(s -> gson.fromJson(s, OpenAireEventPayload.class))
+ .collect(Collectors.toList());
} else {
return new ArrayList<>();
}
@@ -290,17 +290,18 @@ public class OpenaireBrokerController extends AbstractLbsController {
private SimpleSubscriptionDesc subscriptionDesc(final Subscription s) {
return new SimpleSubscriptionDesc(s.getSubscriptionId(), extractDatasource(s), s.getTopic(), s.getCreationDate(), s.getLastNotificationDate(),
- OpenaireBrokerController.this.notificationRepository.countBySubscriptionId(s.getSubscriptionId()));
+ OpenaireBrokerController.this.notificationRepository.countBySubscriptionId(s.getSubscriptionId()));
}
private String extractDatasource(final Subscription sub) {
- return sub.getConditionsAsList().stream()
- .filter(c -> c.getField().equals("target_datasource_name"))
- .map(MapCondition::getListParams)
- .filter(l -> !l.isEmpty())
- .map(l -> l.get(0).getValue())
- .findFirst()
- .get();
+ return sub.getConditionsAsList()
+ .stream()
+ .filter(c -> c.getField().equals("target_datasource_name"))
+ .map(MapCondition::getListParams)
+ .filter(l -> !l.isEmpty())
+ .map(l -> l.get(0).getValue())
+ .findFirst()
+ .get();
}
private long overrideGetTotalPage(final SearchHits> page, final int size) {
diff --git a/pom.xml b/pom.xml
index e0264610..fdaf498a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,9 +1,11 @@
-
+
4.0.0
- eu.dnetlib.dhp
- dhp
- 1.2.4-SNAPSHOT
+ eu.dnetlib
+ dnet-applications
+ 3.0.0-SNAPSHOT
pom
@@ -17,6 +19,13 @@
+
+ org.springframework.boot
+ spring-boot-starter-parent
+ 2.3.1.RELEASE
+
+
+
dhp-broker-application
@@ -56,38 +65,46 @@
true
-
- cloudera
- Cloudera Repository
- https://repository.cloudera.com/artifactory/cloudera-repos
-
- true
-
-
- false
-
-
+
+
+ javax.validation
+ validation-api
+
+
+
+ org.apache.commons
+ commons-lang3
+ 3.10
+
+
+
+ commons-codec
+ commons-codec
+
+
+
+ commons-io
+ commons-io
+
+
org.junit.jupiter
junit-jupiter
- ${junit-jupiter.version}
test
org.mockito
mockito-core
- ${mockito-core.version}
test
org.mockito
mockito-junit-jupiter
- ${mockito-core.version}
test
@@ -101,16 +118,8 @@
org.apache.commons
commons-lang3
- ${dhp.commons.lang.version}
-
- com.google.guava
- guava
- ${dhp.guava.version}
-
-
-
commons-codec
commons-codec
@@ -148,26 +157,12 @@
1.1.6
-
- org.elasticsearch
- elasticsearch-hadoop
- 7.6.0
-
-
-
-
- org.apache.oozie
- oozie-client
- ${dhp.oozie.version}
- provided
-
-
-
- slf4j-simple
- org.slf4j
-
-
+
+ org.elasticsearch
+ elasticsearch-hadoop
+ 7.6.0
+
@@ -244,19 +239,7 @@
maven-dependency-plugin
3.0.0
-
-
- net.revelc.code.formatter
- formatter-maven-plugin
- 2.11.0
-
-
- eu.dnetlib.dhp
- dhp-code-style
- ${project.version}
-
-
-
+
@@ -268,73 +251,11 @@
org.apache.maven.plugins
maven-project-info-reports-plugin
-
- net.revelc.code.formatter
- formatter-maven-plugin
-
-
-
- format
-
-
- eclipse/formatter_dnet.xml
-
-
-
-
-
- net.revelc.code
- impsort-maven-plugin
- 1.4.1
-
- java.,javax.,org.,com.
- java,*
-
- **/thrift/*.java
-
-
-
-
- sort-imports
-
- sort
-
-
-
-
org.apache.maven.plugins
maven-release-plugin
2.5.3
-
- org.jacoco
- jacoco-maven-plugin
- 0.7.9
-
-
- **/schemas/*
- **/com/cloudera/**/*
- **/org/apache/avro/io/**/*
-
-
-
-
- default-prepare-agent
-
- prepare-agent
-
-
-
- default-report
- prepare-package
-
- report
-
-
-
-
-
@@ -374,15 +295,5 @@
UTF-8
UTF-8
3.6.0
- 2.22.2
- 2.0.1
- 2.9.6
- 3.5
- 11.0.2
- 2.11.12
- 5.6.1
- 3.3.3
- 3.4.2
- [2.12,3.0)