ckanext-googleanalytics/tests/mockgoogleanalytics.py

63 lines
1.8 KiB
Python

import os
import BaseHTTPServer
import threading
here_dir = os.path.dirname(os.path.abspath(__file__))
class MockHandler(BaseHTTPServer.BaseHTTPRequestHandler):
def do_GET(self):
if "feeds/accounts/default" in self.path:
self.send_response(200)
self.end_headers()
fixture = os.path.join(here_dir, "accountsfixture.xml")
content = open(fixture, "r").read()
elif "analytics/feeds/data" in self.path:
if "dataset" in self.path:
fixture = os.path.join(here_dir, "packagefixture.xml")
elif "download" in self.path:
fixture = os.path.join(here_dir, "downloadfixture.xml")
self.send_response(200)
self.end_headers()
content = open(fixture, "r").read()
else:
self.send_response(200)
self.end_headers()
content = "empty"
self.wfile.write(content)
def do_POST(self):
if "ClientLogin" in self.path:
self.send_response(200)
self.end_headers()
content = "Auth=blah"
else:
self.send_response(200)
self.end_headers()
content = "empty"
self.wfile.write(content)
def do_QUIT(self):
self.send_response(200)
self.end_headers()
self.server.stop = True
class ReusableServer(BaseHTTPServer.HTTPServer):
allow_reuse_address = 1
def serve_til_quit(self):
self.stop = False
while not self.stop:
self.handle_request()
def runmockserver():
server_address = ('localhost', 6969)
httpd = ReusableServer(server_address,
MockHandler)
httpd_thread = threading.Thread(target=httpd.serve_til_quit)
httpd_thread.setDaemon(True)
httpd_thread.start()
return httpd_thread