ckanext-googleanalytics/ckanext/googleanalytics/commands.py

380 lines
14 KiB
Python

from __future__ import print_function
from __future__ import absolute_import
import os
import re
import logging
import datetime
import time
from pylons import config as pylonsconfig
from ckan.lib.cli import CkanCommand
import ckan.model as model
from . import dbutil
log = logging.getLogger("ckanext.googleanalytics")
PACKAGE_URL = "/dataset/" # XXX get from routes...
DEFAULT_RESOURCE_URL_TAG = "/downloads/"
RESOURCE_URL_REGEX = re.compile("/dataset/[a-z0-9-_]+/resource/([a-z0-9-_]+)")
DATASET_EDIT_REGEX = re.compile("/dataset/edit/([a-z0-9-_]+)")
class InitDB(CkanCommand):
"""Initialise the local stats database tables
"""
summary = __doc__.split("\n")[0]
usage = __doc__
max_args = 0
min_args = 0
def command(self):
self._load_config()
model.Session.remove()
model.Session.configure(bind=model.meta.engine)
dbutil.init_tables()
log.info("Set up statistics tables in main database")
class LoadAnalytics(CkanCommand):
"""Parse data from Google Analytics API and store it
in a local database
Options:
<credentials_file> internal [date] use ckan internal tracking tables
credentials_file specifies the OAUTH credentials file
date specifies start date for retrieving
analytics data YYYY-MM-DD format
"""
summary = __doc__.split("\n")[0]
usage = __doc__
max_args = 3
min_args = 1
TEST_HOST = None
CONFIG = None
def command(self):
if not self.CONFIG:
self._load_config()
self.CONFIG = pylonsconfig
self.resource_url_tag = self.CONFIG.get(
"googleanalytics_resource_prefix", DEFAULT_RESOURCE_URL_TAG
)
# funny dance we need to do to make sure we've got a
# configured session
model.Session.remove()
model.Session.configure(bind=model.meta.engine)
self.parse_and_save()
def internal_save(self, packages_data, summary_date):
engine = model.meta.engine
# clear out existing data before adding new
sql = (
"""DELETE FROM tracking_summary
WHERE tracking_date='%s'; """
% summary_date
)
engine.execute(sql)
for url, count in list(packages_data.items()):
# If it matches the resource then we should mark it as a resource.
# For resources we don't currently find the package ID.
if RESOURCE_URL_REGEX.match(url):
tracking_type = "resource"
else:
tracking_type = "page"
sql = """INSERT INTO tracking_summary
(url, count, tracking_date, tracking_type)
VALUES (%s, %s, %s, %s);"""
engine.execute(sql, url, count, summary_date, tracking_type)
# get ids for dataset urls
sql = """UPDATE tracking_summary t
SET package_id = COALESCE(
(SELECT id FROM package p WHERE t.url = %s || p.name)
,'~~not~found~~')
WHERE t.package_id IS NULL AND tracking_type = 'page';"""
engine.execute(sql, PACKAGE_URL)
# get ids for dataset edit urls which aren't captured otherwise
sql = """UPDATE tracking_summary t
SET package_id = COALESCE(
(SELECT id FROM package p WHERE t.url = %s || p.name)
,'~~not~found~~')
WHERE t.package_id = '~~not~found~~' AND tracking_type = 'page';"""
engine.execute(sql, "%sedit/" % PACKAGE_URL)
# update summary totals for resources
sql = """UPDATE tracking_summary t1
SET running_total = (
SELECT sum(count)
FROM tracking_summary t2
WHERE t1.url = t2.url
AND t2.tracking_date <= t1.tracking_date
) + t1.count
,recent_views = (
SELECT sum(count)
FROM tracking_summary t2
WHERE t1.url = t2.url
AND t2.tracking_date <= t1.tracking_date AND t2.tracking_date >= t1.tracking_date - 14
) + t1.count
WHERE t1.running_total = 0 AND tracking_type = 'resource';"""
engine.execute(sql)
# update summary totals for pages
sql = """UPDATE tracking_summary t1
SET running_total = (
SELECT sum(count)
FROM tracking_summary t2
WHERE t1.package_id = t2.package_id
AND t2.tracking_date <= t1.tracking_date
) + t1.count
,recent_views = (
SELECT sum(count)
FROM tracking_summary t2
WHERE t1.package_id = t2.package_id
AND t2.tracking_date <= t1.tracking_date AND t2.tracking_date >= t1.tracking_date - 14
) + t1.count
WHERE t1.running_total = 0 AND tracking_type = 'page'
AND t1.package_id IS NOT NULL
AND t1.package_id != '~~not~found~~';"""
engine.execute(sql)
def bulk_import(self):
if len(self.args) == 3:
# Get summeries from specified date
start_date = datetime.datetime.strptime(self.args[2], "%Y-%m-%d")
else:
# No date given. See when we last have data for and get data
# from 2 days before then in case new data is available.
# If no date here then use 2010-01-01 as the start date
engine = model.meta.engine
sql = """SELECT tracking_date from tracking_summary
ORDER BY tracking_date DESC LIMIT 1;"""
result = engine.execute(sql).fetchall()
if result:
start_date = result[0]["tracking_date"]
start_date += datetime.timedelta(-2)
# convert date to datetime
combine = datetime.datetime.combine
start_date = combine(start_date, datetime.time(0))
else:
start_date = datetime.datetime(2011, 1, 1)
end_date = datetime.datetime.now()
while start_date < end_date:
stop_date = start_date + datetime.timedelta(1)
packages_data = self.get_ga_data_new(
start_date=start_date, end_date=stop_date
)
self.internal_save(packages_data, start_date)
# sleep to rate limit requests
time.sleep(0.25)
start_date = stop_date
log.info("%s received %s" % (len(packages_data), start_date))
print("%s received %s" % (len(packages_data), start_date))
def get_ga_data_new(self, start_date=None, end_date=None):
"""Get raw data from Google Analtyics for packages and
resources.
Returns a dictionary like::
{'identifier': 3}
"""
start_date = start_date.strftime("%Y-%m-%d")
end_date = end_date.strftime("%Y-%m-%d")
packages = {}
query = "ga:pagePath=~%s,ga:pagePath=~%s" % (
PACKAGE_URL,
self.resource_url_tag,
)
metrics = "ga:uniquePageviews"
sort = "-ga:uniquePageviews"
start_index = 1
max_results = 10000
# data retrival is chunked
completed = False
while not completed:
results = (
self.service.data()
.ga()
.get(
ids="ga:%s" % self.profile_id,
filters=query,
dimensions="ga:pagePath",
start_date=start_date,
start_index=start_index,
max_results=max_results,
metrics=metrics,
sort=sort,
end_date=end_date,
)
.execute()
)
result_count = len(results.get("rows", []))
if result_count < max_results:
completed = True
for result in results.get("rows", []):
package = result[0]
package = "/" + "/".join(package.split("/")[2:])
count = result[1]
packages[package] = int(count)
start_index += max_results
# rate limiting
time.sleep(0.2)
return packages
def parse_and_save(self):
"""Grab raw data from Google Analytics and save to the database"""
from .ga_auth import init_service, get_profile_id
tokenfile = self.args[0]
if not os.path.exists(tokenfile):
raise Exception("Cannot find the token file %s" % self.args[0])
try:
self.service = init_service(self.args[0])
except TypeError as e:
raise Exception("Unable to create a service: {0}".format(e))
self.profile_id = get_profile_id(self.service)
if len(self.args) > 1:
if len(self.args) > 2 and self.args[1].lower() != "internal":
raise Exception("Illegal argument %s" % self.args[1])
self.bulk_import()
else:
query = "ga:pagePath=~%s,ga:pagePath=~%s" % (
PACKAGE_URL,
self.resource_url_tag,
)
packages_data = self.get_ga_data(query_filter=query)
self.save_ga_data(packages_data)
log.info("Saved %s records from google" % len(packages_data))
def save_ga_data(self, packages_data):
"""Save tuples of packages_data to the database
"""
for identifier, visits in list(packages_data.items()):
recently = visits.get("recent", 0)
ever = visits.get("ever", 0)
matches = RESOURCE_URL_REGEX.match(identifier)
if matches:
resource_url = identifier[len(self.resource_url_tag) :]
resource = (
model.Session.query(model.Resource)
.autoflush(True)
.filter_by(id=matches.group(1))
.first()
)
if not resource:
log.warning("Couldn't find resource %s" % resource_url)
continue
dbutil.update_resource_visits(resource.id, recently, ever)
log.info("Updated %s with %s visits" % (resource.id, visits))
else:
package_name = identifier[len(PACKAGE_URL) :]
if "/" in package_name:
log.warning("%s not a valid package name" % package_name)
continue
item = model.Package.by_name(package_name)
if not item:
log.warning("Couldn't find package %s" % package_name)
continue
dbutil.update_package_visits(item.id, recently, ever)
log.info("Updated %s with %s visits" % (item.id, visits))
model.Session.commit()
def ga_query(
self,
query_filter=None,
from_date=None,
to_date=None,
start_index=1,
max_results=10000,
metrics=None,
sort=None,
):
"""Execute a query against Google Analytics
"""
if not to_date:
now = datetime.datetime.now()
to_date = now.strftime("%Y-%m-%d")
if isinstance(from_date, datetime.date):
from_date = from_date.strftime("%Y-%m-%d")
if not metrics:
metrics = "ga:visits,ga:visitors,ga:newVisits,ga:uniquePageviews"
if not sort:
sort = "-ga:uniquePageviews"
print("%s -> %s" % (from_date, to_date))
results = (
self.service.data()
.ga()
.get(
ids="ga:" + self.profile_id,
start_date=from_date,
end_date=to_date,
dimensions="ga:pagePath",
metrics=metrics,
sort=sort,
start_index=start_index,
filters=query_filter,
max_results=max_results,
)
.execute()
)
return results
def get_ga_data(self, query_filter=None, start_date=None, end_date=None):
"""Get raw data from Google Analtyics for packages and
resources, and for both the last two weeks and ever.
Returns a dictionary like::
{'identifier': {'recent':3, 'ever':6}}
"""
now = datetime.datetime.now()
recent_date = now - datetime.timedelta(14)
recent_date = recent_date.strftime("%Y-%m-%d")
floor_date = datetime.date(2005, 1, 1)
packages = {}
queries = ["ga:pagePath=~%s" % PACKAGE_URL]
dates = {"recent": recent_date, "ever": floor_date}
for date_name, date in list(dates.items()):
for query in queries:
results = self.ga_query(
query_filter=query,
metrics="ga:uniquePageviews",
from_date=date,
)
if "rows" in results:
for result in results.get("rows"):
package = result[0]
if not package.startswith(PACKAGE_URL):
package = "/" + "/".join(package.split("/")[2:])
count = result[1]
# Make sure we add the different representations of the same
# dataset /mysite.com & /www.mysite.com ...
val = 0
if (
package in packages
and date_name in packages[package]
):
val += packages[package][date_name]
packages.setdefault(package, {})[date_name] = (
int(count) + val
)
return packages