Move API analytics thread pool initialisation to plugin configuration

One pool should then be available to all controllers avoiding errors with
mod_wsgi under Apache as the number of threads grows infinitely.
This commit is contained in:
Alex Sadleir 2015-02-16 17:46:46 +11:00
parent 803b3f7f81
commit d2d5a5fc48
3 changed files with 47 additions and 38 deletions

View File

@ -1 +1,8 @@
# this is a namespace package
try:
import pkg_resources
pkg_resources.declare_namespace(__name__)
except ImportError:
import pkgutil
__path__ = pkgutil.extend_path(__path__, __name__)
# #

View File

@ -8,8 +8,7 @@ import urllib2
import logging import logging
import ckan.logic as logic import ckan.logic as logic
import hashlib import hashlib
import Queue import plugin
import threading
from pylons import config from pylons import config
from webob.multidict import UnicodeMultiDict from webob.multidict import UnicodeMultiDict
@ -28,42 +27,8 @@ class GAController(BaseController):
return render('summary.html') return render('summary.html')
class AnalyticsPostThread(threading.Thread):
"""Threaded Url POST"""
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
def run(self):
while True:
# grabs host from queue
data_dict = self.queue.get()
data = urllib.urlencode(data_dict)
log.debug("Sending API event to Google Analytics: " + data)
# send analytics
urllib2.urlopen(
"http://www.google-analytics.com/collect",
data,
# timeout in seconds
# https://docs.python.org/2/library/urllib2.html#urllib2.urlopen
10)
# signals to queue job is done
self.queue.task_done()
class GAApiController(ApiController): class GAApiController(ApiController):
# intercept API calls to record via google analytics # intercept API calls to record via google analytics
analytics_queue = Queue.Queue()
def __init__(self):
# spawn a pool of 5 threads, and pass them queue instance
for i in range(5):
t = AnalyticsPostThread(self.analytics_queue)
t.setDaemon(True)
t.start()
def _post_analytics( def _post_analytics(
self, user, request_obj_type, request_function, request_id): self, user, request_obj_type, request_function, request_id):
if config.get('googleanalytics.id'): if config.get('googleanalytics.id'):
@ -80,7 +45,7 @@ class GAApiController(ApiController):
"ea": request_obj_type+request_function, "ea": request_obj_type+request_function,
"el": request_id, "el": request_id,
} }
self.analytics_queue.put(data_dict) plugin.GoogleAnalyticsPlugin.analytics_queue.put(data_dict)
def action(self, logic_function, ver=None): def action(self, logic_function, ver=None):
try: try:

View File

@ -10,12 +10,40 @@ import ckan.plugins as p
import gasnippet import gasnippet
from routes.mapper import SubMapper, Mapper as _Mapper from routes.mapper import SubMapper, Mapper as _Mapper
log = logging.getLogger('ckanext.googleanalytics') import urllib2
import threading
import Queue
log = logging.getLogger('ckanext.googleanalytics')
class GoogleAnalyticsException(Exception): class GoogleAnalyticsException(Exception):
pass pass
class AnalyticsPostThread(threading.Thread):
"""Threaded Url POST"""
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
def run(self):
while True:
# grabs host from queue
data_dict = self.queue.get()
data = urllib.urlencode(data_dict)
log.debug("Sending API event to Google Analytics: " + data)
# send analytics
urllib2.urlopen(
"http://www.google-analytics.com/collect",
data,
# timeout in seconds
# https://docs.python.org/2/library/urllib2.html#urllib2.urlopen
10)
# signals to queue job is done
self.queue.task_done()
class GoogleAnalyticsPlugin(p.SingletonPlugin): class GoogleAnalyticsPlugin(p.SingletonPlugin):
p.implements(p.IConfigurable, inherit=True) p.implements(p.IConfigurable, inherit=True)
@ -24,6 +52,8 @@ class GoogleAnalyticsPlugin(p.SingletonPlugin):
p.implements(p.IConfigurer, inherit=True) p.implements(p.IConfigurer, inherit=True)
p.implements(p.ITemplateHelpers) p.implements(p.ITemplateHelpers)
analytics_queue = Queue.Queue()
def configure(self, config): def configure(self, config):
'''Load config settings for this extension from config file. '''Load config settings for this extension from config file.
@ -56,6 +86,13 @@ class GoogleAnalyticsPlugin(p.SingletonPlugin):
if not converters.asbool(config.get('ckan.legacy_templates', 'false')): if not converters.asbool(config.get('ckan.legacy_templates', 'false')):
p.toolkit.add_resource('fanstatic_library', 'ckanext-googleanalytics') p.toolkit.add_resource('fanstatic_library', 'ckanext-googleanalytics')
# spawn a pool of 5 threads, and pass them queue instance
for i in range(5):
t = AnalyticsPostThread(self.analytics_queue)
t.setDaemon(True)
t.start()
def update_config(self, config): def update_config(self, config):
'''Change the CKAN (Pylons) environment configuration. '''Change the CKAN (Pylons) environment configuration.