diff --git a/apps/dhp-broker-public-application/pom.xml b/apps/dhp-broker-public-application/pom.xml index 8735f92d..dfe102f6 100644 --- a/apps/dhp-broker-public-application/pom.xml +++ b/apps/dhp-broker-public-application/pom.xml @@ -1,5 +1,7 @@ - + eu.dnetlib.dhp @@ -22,7 +24,40 @@ dnet-broker-apps-common ${project.version} + + org.apache.hadoop + hadoop-client + 2.6.0-cdh5.9.2 + + + org.slf4j + slf4j-log4j12 + + + javax.servlet + servlet-api + + + com.google.guava + guava + + + + + + + + true + + + false + + cloudera + Cloudera Repository + https://repository.cloudera.com/artifactory/cloudera-repos + + diff --git a/apps/dhp-broker-public-application/src/main/java/eu/dnetlib/broker/BrokerConfiguration.java b/apps/dhp-broker-public-application/src/main/java/eu/dnetlib/broker/BrokerConfiguration.java index 69711375..ed3988c3 100644 --- a/apps/dhp-broker-public-application/src/main/java/eu/dnetlib/broker/BrokerConfiguration.java +++ b/apps/dhp-broker-public-application/src/main/java/eu/dnetlib/broker/BrokerConfiguration.java @@ -42,12 +42,7 @@ public class BrokerConfiguration extends AbstractElasticsearchConfiguration { @Autowired private ElasticSearchProperties elasticSearchProperties; - public static final String TAG_EVENTS = "Events"; - public static final String TAG_SUBSCRIPTIONS = "Subscriptions"; - public static final String TAG_NOTIFICATIONS = "Notifications"; - public static final String TAG_TOPIC_TYPES = "Topic Types"; - public static final String TAG_OPENAIRE = "OpenAIRE"; - public static final String TAG_MATCHING = "Subscription-Event Matching"; + public static final String OA_PUBLIC_APIS = "Openaire Broker Public API"; @Override @Bean @@ -67,10 +62,9 @@ public class BrokerConfiguration extends AbstractElasticsearchConfiguration { return new Docket(DocumentationType.SWAGGER_2) .select() .apis(RequestHandlerSelectors.any()) - .paths(p -> p.startsWith("/api/")) + .paths(p -> p.startsWith("/")) .build() - .tags(new Tag(TAG_EVENTS, "Events management"), new Tag(TAG_SUBSCRIPTIONS, "Subscriptions management"), new Tag(TAG_NOTIFICATIONS, - "Notifications management"), new Tag(TAG_TOPIC_TYPES, "Topic types management"), new Tag(TAG_OPENAIRE, "OpenAIRE use case")) + .tags(new Tag(OA_PUBLIC_APIS, OA_PUBLIC_APIS)) .apiInfo(new ApiInfo( "Literature Broker Service", "APIs documentation", diff --git a/apps/dhp-broker-public-application/src/main/java/eu/dnetlib/broker/oa/controllers/OpenairePublicController.java b/apps/dhp-broker-public-application/src/main/java/eu/dnetlib/broker/oa/controllers/OpenairePublicController.java index b1da3108..4f117821 100644 --- a/apps/dhp-broker-public-application/src/main/java/eu/dnetlib/broker/oa/controllers/OpenairePublicController.java +++ b/apps/dhp-broker-public-application/src/main/java/eu/dnetlib/broker/oa/controllers/OpenairePublicController.java @@ -1,5 +1,8 @@ package eu.dnetlib.broker.oa.controllers; +import java.io.BufferedReader; +import java.io.FileNotFoundException; +import java.io.InputStreamReader; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -15,10 +18,15 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.lucene.search.join.ScoreMode; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.index.query.QueryBuilders; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Profile; import org.springframework.data.domain.PageRequest; import org.springframework.data.elasticsearch.core.ElasticsearchOperations; @@ -49,7 +57,7 @@ import io.swagger.annotations.ApiOperation; @Profile("openaire") @RestController @RequestMapping("/") -@Api(tags = BrokerConfiguration.TAG_OPENAIRE) +@Api(tags = BrokerConfiguration.OA_PUBLIC_APIS) public class OpenairePublicController extends AbstractLbsController { @Autowired @@ -61,6 +69,12 @@ public class OpenairePublicController extends AbstractLbsController { @Autowired private ElasticSearchProperties props; + @Value("${lbs.hadoop.namenode.url}") + private String namenode; + + @Value("${lbs.hadoop.opendoar.events.path}") + private String opendoarEventsPath; + private static final long SCROLL_TIMEOUT_IN_MILLIS = 5 * 60 * 1000; private static final Log log = LogFactory.getLog(OpenairePublicController.class); @@ -98,36 +112,6 @@ public class OpenairePublicController extends AbstractLbsController { } } - @ApiOperation("Returns notifications by opendorar Id (for example: 301) using scrolls (first page)") - @GetMapping("/scroll/notifications/byOpenDoarId/{opendoarId}") - public ScrollPage prepareScrollNotificationsByOpendoarId(@PathVariable final String opendoarId) { - - final ElasticsearchRestTemplate esTemplate = (ElasticsearchRestTemplate) esOperations; - - final String dsId = calculateDsIdFromOpenDoarId(opendoarId); - - final NativeSearchQuery searchQuery = new NativeSearchQueryBuilder() - .withQuery(QueryBuilders.nestedQuery("map", QueryBuilders.termQuery("map.targetDatasourceId", dsId), ScoreMode.None)) - .withSearchType(SearchType.DEFAULT) - .withFields("topic", "payload") - .build(); - - final SearchScrollHits scroll = - esTemplate.searchScrollStart(SCROLL_TIMEOUT_IN_MILLIS, searchQuery, Notification.class, IndexCoordinates.of(props.getNotificationsIndexName())); - if (scroll.hasSearchHits()) { - final List values = calculateNotificationMessages(scroll); - return new ScrollPage<>(scroll.getScrollId(), values.isEmpty() || scroll.getScrollId() == null, values); - } else { - esTemplate.searchScrollClear(Arrays.asList(scroll.getScrollId())); - return new ScrollPage<>(null, true, new ArrayList<>()); - } - - } - - private String calculateDsIdFromOpenDoarId(final String opendoarId) { - return "10|opendoar____::" + DigestUtils.md5Hex(opendoarId); - } - @ApiOperation("Returns notifications using scrolls (other pages)") @GetMapping("/scroll/notifications/{scrollId}") public ScrollPage scrollNotifications(@PathVariable final String scrollId) { @@ -182,6 +166,61 @@ public class OpenairePublicController extends AbstractLbsController { } + @ApiOperation("Returns events as file by opendoarId") + @GetMapping(value = "/file/events/opendoar/{id}", produces = "application/gzip") + public void opendoarEventsAsFile(final HttpServletResponse res, @PathVariable final String id) { + + res.setHeader("Content-Disposition", "attachment; filename=dump.json.gz"); + + final Configuration conf = new Configuration(); + conf.addResource(getClass().getResourceAsStream("/core-site.xml")); + conf.addResource(getClass().getResourceAsStream("/ocean-hadoop-conf.xml")); + + final Path pathDir = new Path(opendoarEventsPath + "/" + DigestUtils.md5Hex(id)); + + try (final FileSystem fs = FileSystem.get(conf); + final ServletOutputStream out = res.getOutputStream(); + final GZIPOutputStream gzOut = new GZIPOutputStream(out)) { + boolean first = true; + + IOUtils.write("[\n", gzOut); + + try { + for (final FileStatus fileStatus : fs.listStatus(pathDir)) { + if (fileStatus.isFile()) { + final Path path = fileStatus.getPath(); + if (path.getName().endsWith(".json")) { + try (final FSDataInputStream fis = fs.open(path); + final InputStreamReader isr = new InputStreamReader(fis); + final BufferedReader br = new BufferedReader(isr)) { + + String line = br.readLine(); + while (line != null) { + if (first) { + first = false; + } else { + IOUtils.write(",\n", gzOut); + } + + IOUtils.write(line, gzOut); + + line = br.readLine(); + } + } + } + } + } + } catch (final FileNotFoundException e) { + log.warn("File not found - " + e.getMessage()); + } + IOUtils.write("\n]\n", gzOut); + gzOut.flush(); + } catch (final Throwable e) { + log.error("Error accessing hdfs file", e); + throw new RuntimeException(e); + } + } + private List calculateNotificationMessages(final SearchScrollHits scroll) { if (scroll.getSearchHits().size() > 0) { return scroll.stream() diff --git a/apps/dhp-broker-public-application/src/main/resources/application.properties b/apps/dhp-broker-public-application/src/main/resources/application.properties index 44cc1c39..8a63cdbc 100644 --- a/apps/dhp-broker-public-application/src/main/resources/application.properties +++ b/apps/dhp-broker-public-application/src/main/resources/application.properties @@ -19,6 +19,8 @@ spring.jpa.properties.hibernate.format_sql = false lbs.database.url = ${spring.datasource.url} +lbs.hadoop.opendoar.events.path = /user/michele.artini/broker_events/eventsByOpendoarId + # for development server #lbs.elastic.clusterNodes = broker1-dev-dnet.d4science.org:9300 #lbs.elastic.homepage = http://broker1-dev-dnet.d4science.org:9200/_plugin/hq diff --git a/apps/dhp-broker-public-application/src/main/resources/core-site.xml b/apps/dhp-broker-public-application/src/main/resources/core-site.xml new file mode 100644 index 00000000..e9fde9e7 --- /dev/null +++ b/apps/dhp-broker-public-application/src/main/resources/core-site.xml @@ -0,0 +1,145 @@ + + + + + + fs.defaultFS + hdfs://nameservice1 + + + fs.trash.interval + 1 + + + io.compression.codecs + org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.DeflateCodec,org.apache.hadoop.io.compress.SnappyCodec,org.apache.hadoop.io.compress.Lz4Codec + + + hadoop.security.authentication + simple + + + hadoop.security.authorization + false + + + hadoop.rpc.protection + authentication + + + hadoop.security.auth_to_local + DEFAULT + + + hadoop.proxyuser.oozie.hosts + * + + + hadoop.proxyuser.oozie.groups + * + + + hadoop.proxyuser.mapred.hosts + * + + + hadoop.proxyuser.mapred.groups + * + + + hadoop.proxyuser.flume.hosts + * + + + hadoop.proxyuser.flume.groups + * + + + hadoop.proxyuser.HTTP.hosts + * + + + hadoop.proxyuser.HTTP.groups + * + + + hadoop.proxyuser.hive.hosts + * + + + hadoop.proxyuser.hive.groups + * + + + hadoop.proxyuser.hue.hosts + * + + + hadoop.proxyuser.hue.groups + * + + + hadoop.proxyuser.httpfs.hosts + * + + + hadoop.proxyuser.httpfs.groups + * + + + hadoop.proxyuser.hdfs.groups + * + + + hadoop.proxyuser.hdfs.hosts + * + + + hadoop.proxyuser.yarn.hosts + * + + + hadoop.proxyuser.yarn.groups + * + + + hadoop.security.group.mapping + org.apache.hadoop.security.ShellBasedUnixGroupsMapping + + + hadoop.security.instrumentation.requires.admin + false + + + net.topology.script.file.name + /etc/hadoop/conf.cloudera.yarn2/topology.py + + + io.file.buffer.size + 65536 + + + hadoop.ssl.enabled + false + + + hadoop.ssl.require.client.cert + false + true + + + hadoop.ssl.keystores.factory.class + org.apache.hadoop.security.ssl.FileBasedKeyStoresFactory + true + + + hadoop.ssl.server.conf + ssl-server.xml + true + + + hadoop.ssl.client.conf + ssl-client.xml + true + + diff --git a/apps/dhp-broker-public-application/src/main/resources/ocean-hadoop-conf.xml b/apps/dhp-broker-public-application/src/main/resources/ocean-hadoop-conf.xml new file mode 100644 index 00000000..aa5388fa --- /dev/null +++ b/apps/dhp-broker-public-application/src/main/resources/ocean-hadoop-conf.xml @@ -0,0 +1,101 @@ + + + + + + dfs.nameservices + nameservice1 + + + dfs.client.failover.proxy.provider.nameservice1 + org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider + + + dfs.ha.automatic-failover.enabled.nameservice1 + true + + + ha.zookeeper.quorum + iis-cdh5-test-m1.ocean.icm.edu.pl:2181,iis-cdh5-test-m2.ocean.icm.edu.pl:2181,iis-cdh5-test-m3.ocean.icm.edu.pl:2181 + + + dfs.ha.namenodes.nameservice1 + namenode528,namenode434 + + + dfs.namenode.rpc-address.nameservice1.namenode528 + iis-cdh5-test-m1.ocean.icm.edu.pl:8020 + + + dfs.namenode.servicerpc-address.nameservice1.namenode528 + iis-cdh5-test-m1.ocean.icm.edu.pl:8022 + + + dfs.namenode.http-address.nameservice1.namenode528 + iis-cdh5-test-m1.ocean.icm.edu.pl:50070 + + + dfs.namenode.https-address.nameservice1.namenode528 + iis-cdh5-test-m1.ocean.icm.edu.pl:50470 + + + dfs.namenode.rpc-address.nameservice1.namenode434 + iis-cdh5-test-m2.ocean.icm.edu.pl:8020 + + + dfs.namenode.servicerpc-address.nameservice1.namenode434 + iis-cdh5-test-m2.ocean.icm.edu.pl:8022 + + + dfs.namenode.http-address.nameservice1.namenode434 + iis-cdh5-test-m2.ocean.icm.edu.pl:50070 + + + dfs.namenode.https-address.nameservice1.namenode434 + iis-cdh5-test-m2.ocean.icm.edu.pl:50470 + + + dfs.replication + 3 + + + dfs.blocksize + 134217728 + + + dfs.client.use.datanode.hostname + false + + + fs.permissions.umask-mode + 022 + + + dfs.namenode.acls.enabled + false + + + dfs.client.use.legacy.blockreader + false + + + dfs.client.read.shortcircuit + false + + + dfs.domain.socket.path + /var/run/hdfs-sockets/dn + + + dfs.client.read.shortcircuit.skip.checksum + false + + + dfs.client.domain.socket.data.traffic + false + + + dfs.datanode.hdfs-blocks-metadata.enabled + true + + diff --git a/apps/dhp-broker-public-application/src/main/resources/static/index.html b/apps/dhp-broker-public-application/src/main/resources/static/index.html new file mode 100644 index 00000000..3712e776 --- /dev/null +++ b/apps/dhp-broker-public-application/src/main/resources/static/index.html @@ -0,0 +1,7 @@ + + + +OpenAIRE Broker Public API + + +