commit ba1d2d30195047acee68f834b554172a1bb45694
parent 5fbfd1786405e2b81bedc879a852cd6d57449ec8
Author: Dan Callaghan <djc@djc.id.au>
Date: Sat, 22 Nov 2008 14:08:12 +1000
use Paste instead of Colubrid for static serving
Diffstat:
11 files changed, 17 insertions(+), 2464 deletions(-)
diff --git a/app.py b/app.py
@@ -9,7 +9,6 @@ import cgi, re, datetime
from itertools import chain
import wsgiref.util
from genshi.template import TemplateLoader
-from colubrid.server import StaticExports
from recaptcha.client import captcha
import config
@@ -327,8 +326,22 @@ application = Constance
if __name__ == '__main__':
+ import optparse
+ parser = optparse.OptionParser(usage='%prog [OPTIONS...] CONFIG_FILENAME')
+ parser.set_defaults(port=8082)
+ parser.add_option('-p', '--port', type='int',
+ help='Port to server on (default: %default)')
+ options, args = parser.parse_args()
+ if not args:
+ parser.error('You must supply a CONFIG_FILENAME')
+
+ from paste.urlparser import StaticURLParser
+ from paste.urlmap import URLMap
+ application = URLMap()
+ application['/static'] = StaticURLParser(os.path.join(os.path.dirname(__file__), 'static'))
+ application['/'] = Constance
+
import wsgiref.simple_server
- application = StaticExports(application, {'/static': os.path.join(os.path.dirname(__file__), 'static')})
- server = wsgiref.simple_server.make_server('0.0.0.0', 8082, application)
- server.base_environ['constance.config_filename'] = sys.argv[1]
+ server = wsgiref.simple_server.make_server('0.0.0.0', options.port, application)
+ server.base_environ['constance.config_filename'] = args[0]
server.serve_forever()
diff --git a/lib/colubrid/__init__.py b/lib/colubrid/__init__.py
@@ -1,25 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
- Colubrid WSGI Toolkit
- ---------------------
-"""
-__version__ = '0.10'
-__author__ = 'Armin Ronacher <armin.ronacher@active-4.com>'
-__license__ = 'BSD LICENSE'
-
-#from colubrid.application import *
-#from colubrid.request import *
-#from colubrid.response import *
-#from colubrid.server import *
-#from colubrid import application
-#from colubrid import request
-#from colubrid import response
-#from colubrid import server
-
-#__all__ = (application.__all__ + request.__all__ + response.__all__ +
-# server.__all__)
-
-from colubrid.application import *
-from colubrid.request import Request
-from colubrid.response import HttpResponse
-from colubrid.server import execute
diff --git a/lib/colubrid/application.py b/lib/colubrid/application.py
@@ -1,371 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
- Colubrid Base Applications
- ==========================
-
- This file provides a list of colubrid application. Each of them inherits
- form BaseApplication and implements a full WSGI compatible web application.
-
- If you like to add your own you _have_ to inherit from BaseApplication or
- the Request object wont work properly.
-
- Since colubrid 0.10 you don't have to use those base application objects.
- If you're looking for an example check out the `customapp.py`
- application in the example folder.
-"""
-
-from __future__ import generators
-from colubrid.request import Request, ResponseRequest, RoutesRequest
-from colubrid.response import HttpResponse
-from colubrid.utils import fix_slash
-from colubrid.exceptions import HttpException, PageNotFound
-import re
-
-__all__ = ('BaseApplication', 'RegexApplication', 'ResolveRegexApplication',
- 'PathApplication', 'ObjectApplication', 'WebpyApplication',
- 'RoutesApplication')
-
-
-class RegexCompilerClass(type):
- """
- Metaclass that automatically compiles regular expressions in
- the 'urls' attribute.
- """
-
- def __new__(cls, name, bases, dct):
- result = type.__new__(cls, name, bases, dct)
- if type(bases[0]) == type:
- return result
- if not hasattr(result, 'urls'):
- raise TypeError('Regex application without url definition.')
- compiled_urls = []
- for args in result.urls:
- args = list(args)
- args[0] = re.compile(args[0])
- compiled_urls.append(tuple(args))
- result.urls = compiled_urls
- return result
-
-
-class RoutesMapperClass(type):
- """
- Metaclass that automatically creates a Routes mapper.
- """
-
- def __new__(cls, name, bases, dct):
- result = type.__new__(cls, name, bases, dct)
- if type(bases[0]) == type:
- return result
- if not hasattr(result, 'mapping'):
- raise TypeError('Route application without mapping.')
- from routes import Mapper
- mapper = Mapper()
- controllers = {}
- controller_map = {}
- for m in result.mapping:
- name = m[0].split('/', 1)[0]
- internal = str(id(m[1]))
- controllers[internal] = m[1]
- controller_map[m[1]] = internal
- kwargs = {}
- if len(m) >= 3 and not m[2] is None:
- kwargs['requirements'] = m[2]
- if len(m) == 4:
- kwargs.update(m[3])
- mapper.connect(name, m[0], controller=internal, **kwargs)
- mapper.create_regs(controllers.keys())
- result._routes_mapper = mapper
- result._routes_controllers = controllers
- result._controller_map = controller_map
- return result
-
-
-class BaseApplication(object):
- """
- Base class for Colubrid applications.
- """
-
- def __init__(self, environ, start_response, request_class=Request):
- charset = 'utf-8'
- if hasattr(self, 'charset'):
- charset = self.charset
- self.request = request_class(environ, start_response, charset)
-
- def process_http_exception(self, exc):
- """Default routine to process a HttpException."""
- return HttpResponse(exc.get_error_page(), exc.get_headers(), exc.code)
-
- def process_request(self):
- """Process a request. Must be overridden."""
- raise NotImplementedError()
-
- def __iter__(self):
- try:
- response = self.process_request()
- if isinstance(self.request, ResponseRequest):
- response = self.request
- else:
- assert isinstance(response, HttpResponse), \
- 'process_request() must return a HttpResponse instance'
- except HttpException, exc:
- response = self.process_http_exception(exc)
- return response(self.request)
-
-
-class RegexApplication(BaseApplication):
- """
- Application that maps URLs based on regular expressions.
- """
- __metaclass__ = RegexCompilerClass
-
- def process_request(self):
- """Process a single request."""
- path_info = self.request.environ.get('PATH_INFO', '/')[1:]
- if hasattr(self, 'slash_append') and self.slash_append:
- fix_slash(self.request.environ, True)
- for url, module in self.urls:
- matchobj = url.search(path_info)
- if not matchobj is None:
- args = matchobj.groups()
- if module in (True, False):
- return fix_slash(self.request.environ, module)
- elif not '.' in module:
- handler = getattr(self, module)
- else:
- parts = module.split('.')
- mname, fname = '.'.join(parts[:-1]), parts[-1]
- package = __import__(mname, '', '', [''])
- handler = getattr(package, fname)
- args = list(args)
- args.insert(0, self.request)
- args = tuple(args)
- return handler(*args)
- raise PageNotFound()
-
-
-class ResolveRegexApplication(BaseApplication):
- """
- Application that ...
- """
- __metaclass__ = RegexCompilerClass
-
- def process_request(self):
- """Process a single request."""
- path_info = self.request.environ.get('PATH_INFO', '/')[1:]
- if hasattr(self, 'slash_append') and self.slash_append:
- fix_slash(self.request.environ, True)
- for url, module in self.urls:
- matchobj = url.search(path_info)
- if not matchobj is None:
- args = matchobj.groups()
- new_args = []
- for pos, value in enumerate(args):
- search = '$%d' % (pos + 1)
- if search in module:
- module = module.replace(search, value.replace('.', '_'))
- else:
- new_args.append(value)
- args = tuple(new_args)
- if not '.' in module:
- if not hasattr(self, module):
- raise PageNotFound
- handler = getattr(self, module)
- else:
- parts = module.split('.')
- mname, fname = '.'.join(parts[:-1]), parts[-1]
- try:
- package = __import__(mname, '', '', [''])
- handler = getattr(package, fname)
- except (ImportError, AttributeError):
- raise PageNotFound
- args = list(args)
- args.insert(0, self.request)
- args = tuple(args)
- if handler in (True, False):
- return fix_slash(self.request.environ, handler)
- return handler(*args)
- raise PageNotFound()
-
-
-class WebpyApplication(BaseApplication):
- """
- Application compatible with web.py.
- """
- __metaclass__ = RegexCompilerClass
-
- def process_request(self):
- """Process a single request."""
- path_info = self.request.environ.get('PATH_INFO', '/')[1:]
- if hasattr(self, 'slash_append') and self.slash_append:
- fix_slash(self.request.environ, True)
- for url, cls in self.urls:
- matchobj = url.search(path_info)
- if not matchobj is None:
- cls = cls()
- cls.request = self.request
- handler = getattr(cls, self.request.environ['REQUEST_METHOD'])
- if handler in (True, False):
- return fix_slash(self.request.environ, handler)
- return handler(*matchobj.groups())
- raise PageNotFound()
-
-
-class PathApplication(BaseApplication):
- """
- Application that dispatches based on the first path element.
- """
-
- def process_request(self):
- """Process a single request."""
- path_info = self.request.environ.get('PATH_INFO', '/').strip('/')
- parts = path_info.strip('/').split('/')
- if not len(parts) or not parts[0]:
- handler = 'show_index'
- args = ()
- else:
- handler = 'show_%s' % parts[0]
- args = tuple(parts[1:])
- if hasattr(self, handler):
- return getattr(self, handler)(*args)
- fix_slash(self.request.environ, True)
- raise PageNotFound()
-
-
-class ObjectApplication(BaseApplication):
- """
- A rather complex application type.
- It uses python class structures to handler the user requests.
-
- an ObjectApplication might look like this:
-
- class HelloWorld(object):
- def index(self):
- self.request.write('Hello World!')
- def name(self, name="Nobody"):
- self.request.write('Hello %s!' % name)
-
- class AdminPanel(object):
- def index(self):
- pass
- def login(self):
- pass
-
- class DispatcherApplication(ObjectApplication):
- root = HelloWorld
- root.admin = AdminPanel
-
- app = DispatcherApplication
-
- Let's say that the application listens on localhost:
-
- http://localhost/ --> HelloWorld.index()
- http://localhost/name/ --> HelloWorld.name('Nobody')
- http://localhost/name/Max --> HelloWorld.name('Max')
- http://localhost/admin/ --> AdminPanel.index()
- http://localhost/admin/login --> AdminPanel.login()
- """
-
- def process_request(self):
- """Process a single request."""
- if not hasattr(self, 'root'):
- raise AttributeError, 'ObjectApplication requires a root object.'
-
- path = self.request.environ.get('PATH_INFO', '').strip('/')
- parts = path.split('/')
-
- # Resolve the path
- handler = self.root
- args = []
- for part in parts:
- if part.startswith('_'):
- raise PageNotFound
- node = getattr(handler, part, None)
- if node is None:
- if part:
- args.append(part)
- else:
- handler = node
-
- container = None
-
- # Find handler and make first container check
- import inspect
- if inspect.ismethod(handler):
- if handler.__name__ == 'index':
- # the handler is called index so it's the leaf of
- # itself. we don't want a slash, even if forced
- container = False
- else:
- index = getattr(handler, 'index', None)
- if not index is None:
- if not hasattr(index, 'container'):
- container = True
- handler = index
- else:
- raise PageNotFound()
-
- # update with hardcoded container information
- if container is None and hasattr(handler, 'container'):
- container = handler.container
-
- # Check for handler arguments and update container
- handler_args, varargs, _, defaults = inspect.getargspec(handler)
- if defaults is None:
- defaults = 0
- else:
- defaults = len(defaults)
-
- max_len = len(handler_args) - 1
- min_len = max_len - defaults
- cur_len = len(args)
- if varargs:
- max_len = -1
-
- # check if the number of arguments fits our handler
- if max_len == -1:
- if cur_len < min_len:
- raise PageNotFound
- elif min_len <= cur_len <= max_len:
- if container is None:
- container = cur_len < max_len
- else:
- raise PageNotFound()
-
- if container is None:
- container = False
- fix_slash(self.request.environ, container)
-
- # call handler
- parent = handler.im_class()
- if hasattr(self, 'request'):
- parent.request = self.request
- return handler(parent, *args)
-
-
-class RoutesApplication(BaseApplication):
- """
- Application that uses Routes (http://routes.groovie.org/) to
- dispatch URLs.
- """
- __metaclass__ = RoutesMapperClass
-
- def __init__(self, environ, start_response):
- def create_request(e, s, c):
- return RoutesRequest(self, e, s, c)
- super(RoutesApplication, self).__init__(environ, start_response,
- create_request)
- path = self.request.environ.get('PATH_INFO') or '/'
- match = self._routes_mapper.match(path)
- if match is None:
- raise PageNotFound()
-
- handler = self._routes_controllers[match['controller']]
- app = handler.im_class()
- app.request = self.request
-
- if match['action'] == 'index':
- del match['action']
- del match['controller']
-
- # XXX: can't return from __init__
- return handler(app, **match)
diff --git a/lib/colubrid/const.py b/lib/colubrid/const.py
@@ -1,53 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
- Colubrid Constants
-"""
-
-HTTP_STATUS_CODES = {
- 100: 'CONTINUE',
- 101: 'SWITCHING PROTOCOLS',
- 102: 'PROCESSING',
- 200: 'OK',
- 201: 'CREATED',
- 202: 'ACCEPTED',
- 203: 'NON-AUTHORITATIVE INFORMATION',
- 204: 'NO CONTENT',
- 205: 'RESET CONTENT',
- 206: 'PARTIAL CONTENT',
- 207: 'MULTI STATUS',
- 300: 'MULTIPLE CHOICES',
- 301: 'MOVED PERMANENTLY',
- 302: 'FOUND',
- 303: 'SEE OTHER',
- 304: 'NOT MODIFIED',
- 305: 'USE PROXY',
- 306: 'RESERVED',
- 307: 'TEMPORARY REDIRECT',
- 400: 'BAD REQUEST',
- 401: 'UNAUTHORIZED',
- 402: 'PAYMENT REQUIRED',
- 403: 'FORBIDDEN',
- 404: 'NOT FOUND',
- 405: 'METHOD NOT ALLOWED',
- 406: 'NOT ACCEPTABLE',
- 407: 'PROXY AUTHENTICATION REQUIRED',
- 408: 'REQUEST TIMEOUT',
- 409: 'CONFLICT',
- 410: 'GONE',
- 411: 'LENGTH REQUIRED',
- 412: 'PRECONDITION FAILED',
- 413: 'REQUEST ENTITY TOO LARGE',
- 414: 'REQUEST-URI TOO LONG',
- 415: 'UNSUPPORTED MEDIA TYPE',
- 416: 'REQUESTED RANGE NOT SATISFIABLE',
- 417: 'EXPECTATION FAILED',
- 500: 'INTERNAL SERVER ERROR',
- 501: 'NOT IMPLEMENTED',
- 502: 'BAD GATEWAY',
- 503: 'SERVICE UNAVAILABLE',
- 504: 'GATEWAY TIMEOUT',
- 505: 'HTTP VERSION NOT SUPPORTED',
- 506: 'VARIANT ALSO VARIES',
- 507: 'INSUFFICIENT STORAGE',
- 510: 'NOT EXTENDED'
-}
diff --git a/lib/colubrid/debug.py b/lib/colubrid/debug.py
@@ -1,984 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
- Colubrid Debugging Module
- =========================
-
- Adds debug support to colubrid applications.
-"""
-from __future__ import generators
-import os
-import sys
-import re
-import traceback
-import keyword
-import token
-import tokenize
-import string
-import pprint
-import inspect
-import threading
-import cgi
-from random import random
-from cStringIO import StringIO
-from xml.sax.saxutils import escape
-
-JAVASCRIPT = r'''
-function toggleBlock(handler) {
- if (handler.nodeName == 'H3') {
- var table = handler;
- do {
- table = table.nextSibling;
- if (typeof table == 'undefined') {
- return;
- }
- }
- while (table.nodeName != 'TABLE');
- }
-
- else if (handler.nodeName == 'DT') {
- var parent = handler.parentNode;
- var table = parent.getElementsByTagName('TABLE')[0];
- }
-
- var lines = table.getElementsByTagName("TR");
- for (var i = 0; i < lines.length; i++) {
- var line = lines[i];
- if (line.className == 'pre' || line.className == 'post') {
- line.style.display = (line.style.display == 'none') ? '' : 'none';
- }
- else if (line.parentNode.parentNode.className == 'vars' ||
- line.parentNode.parentNode.className == 'exec_code') {
- line.style.display = (line.style.display == 'none') ? '' : 'none';
- var input = line.getElementsByTagName('TEXTAREA');
- if (input.length) {
- input[0].focus();
- }
- }
- }
-}
-
-function initTB() {
- var tb = document.getElementById('wsgi-traceback');
- var handlers = tb.getElementsByTagName('H3');
- for (var i = 0; i < handlers.length; i++) {
- toggleBlock(handlers[i]);
- handlers[i].setAttribute('onclick', 'toggleBlock(this)');
- }
- handlers = tb.getElementsByTagName('DT');
- for (var i = 0; i < handlers.length; i++) {
- toggleBlock(handlers[i]);
- handlers[i].setAttribute('onclick', 'toggleBlock(this)');
- }
- var handlers = tb.getElementsByTagName('TEXTAREA');
- for (var i = 0; i < handlers.length; i++) {
- var hid = handlers[i].getAttribute('id');
- if (hid && hid.substr(0, 6) == 'input-') {
- var p = handlers[i].getAttribute('id').split('-');
- handlers[i].onkeyup = makeEnter(p[1], p[2]);
- }
- }
-}
-
-AJAX_ACTIVEX = ['Msxml2.XMLHTTP', 'Microsoft.XMLHTTP', 'Msxml2.XMLHTTP.4.0'];
-
-function ajaxConnect() {
- var con = null;
- try {
- con = new XMLHttpRequest();
- }
- catch (e) {
- if (typeof AJAX_ACTIVEX == 'string') {
- con = new ActiveXObject(AJAX_ACTIVEX);
- }
- else {
- for (var i=0; i < AJAX_ACTIVEX.length; i++) {
- var axid = AJAX_ACTIVEX[i];
- try {
- con = new ActiveXObject(axid);
- }
- catch (e) {}
- if (con) {
- AJAX_ACTIVEX = axid;
- break;
- }
- }
- }
- }
- return con;
-}
-
-function execCode(traceback, frame) {
- var input = document.getElementById('input-' + traceback + '-' +
- frame);
- var e = encodeURIComponent;
- var data = 'tb=' + e(traceback) + '&' +
- 'frame=' + e(frame) + '&' +
- 'code=' + e(input.value);
- writeToOutput(traceback, frame, '>>> ' + input.value);
- var con = ajaxConnect();
- con.onreadystatechange = function() {
- if (con.readyState == 4 && con.status == 200) {
- writeToOutput(traceback, frame, con.responseText);
- input.focus();
- input.value = '';
- }
- };
- con.open('GET', '__traceback__?' + data);
- con.send(data);
-}
-
-function makeEnter(traceback, frame) {
- return function(e) {
- var e = (e) ? e : window.event;
- var code = (e.keyCode) ? e.keyCode : e.which;
- if (code == 13) {
- var input = document.getElementById('input-' + traceback +
- '-' + frame);
- if (input.className == 'big') {
- if (input.value.substr(input.value.length - 2) != '\n\n') {
- return;
- }
- input.value = input.value.substr(0, input.value.length - 1);
- input.className = 'small';
- }
- if (input.value == 'clear\n') {
- clearOutput(traceback, frame);
- input.value = '';
- }
- else {
- execCode(traceback, frame);
- }
- }
- }
-}
-
-function writeToOutput(traceback, frame, text) {
- var output = document.getElementById('output-' + traceback + '-' +
- frame);
- if (text && text != '\n') {
- var node = document.createTextNode(text);
- output.appendChild(node);
- }
-}
-
-function clearOutput(traceback, frame) {
- var output = document.getElementById('output-' + traceback + '-' +
- frame);
- output.innerHTML = '';
-}
-
-function toggleExtend(traceback, frame) {
- var input = document.getElementById('input-' + traceback + '-' +
- frame);
- input.className = (input.className == 'small') ? 'big' : 'small';
- input.focus();
-}
-
-function change_tb() {
- interactive = document.getElementById('interactive');
- plain = document.getElementById('plain');
- interactive.style.display = ((interactive.style.display == 'block') | (interactive.style.display == '')) ? 'none' : 'block';
- plain.style.display = (plain.style.display == 'block') ? 'none' : 'block';
-}
-'''
-
-STYLESHEET = '''
-body {
- font-size:0.9em;
-}
-
-* {
- margin:0;
- padding:0;
-}
-
-#wsgi-traceback {
- margin: 1em;
- border: 1px solid #5F9CC4;
- background-color: #F6F6F6;
-}
-
-.footer {
- margin: 1em;
- text-align: right;
- font-style: italic;
-}
-
-h1 {
- background-color: #3F7CA4;
- font-size:1.2em;
- color:#FFFFFF;
- padding:0.3em;
- margin:0 0 0.2em 0;
-}
-
-h2 {
- background-color:#5F9CC4;
- font-size:1em;
- color:#FFFFFF;
- padding:0.3em;
- margin:0.4em 0 0.2em 0;
-}
-
-h2.tb {
- cursor:pointer;
-}
-
-h3 {
- font-size:1em;
- cursor:pointer;
-}
-
-h3.fn {
- margin-top: 0.5em;
-}
-
-h3.fn:hover:before {
- content: "\\21D2 ";
-}
-
-h3.indent {
- margin:0 0.7em 0 0.7em;
- font-weight:normal;
-}
-
-p.text {
- padding:0.1em 0.5em 0.1em 0.5em;
-}
-
-p.important {
- font-weight: bold;
-}
-
-div.frame {
- margin:0 1em 0 1em;
-}
-
-table.code {
- margin:0.5em 0.7em 0.3em 0.7em;
- background-color:#E0E0E0;
- width:100%;
- font-size:0.9em;
- border:1px solid #C9C9C9;
- border-collapse:collapse;
-}
-
-table.code td.lineno {
- width:42px;
- text-align:right;
- padding:0 5px 0 0;
- color:#444444;
- border-right:1px solid #888888;
-}
-
-table.code td.code {
- background-color:#EFEFEF;
- padding:0 0 0 5px;
- white-space:pre;
-}
-
-table.code tr.cur td.code {
- background-color: #FAFAFA;
- padding: 1px 0 1px 5px;
- white-space: pre;
-}
-
-pre.plain {
- margin:0.5em 1em 1em 1em;
- padding:0.5em;
- border:1px solid #999999;
- background-color: #FFFFFF;
- line-height: 120%;
- font-family: monospace;
-}
-
-table.exec_code {
- width:100%;
- margin:0 1em 0 1em;
-}
-
-table.exec_code td.input {
- width:100%;
-}
-
-table.exec_code textarea.small {
- width:100%;
- height:1.5em;
- border:1px solid #999999;
-}
-
-table.exec_code textarea.big {
- width:100%;
- height:5em;
- border:1px solid #999999;
-}
-
-table.exec_code input {
- height:1.5em;
- border:1px solid #999999;
- background-color:#FFFFFF;
-}
-
-table.exec_code td.extend {
- width:70px;
- padding:0 5px 0 5px;
-}
-
-table.exec_code td.output pre {
- font-family: monospace;
- white-space: pre-wrap; /* css-3 should we be so lucky... */
- white-space: -moz-pre-wrap; /* Mozilla, since 1999 */
- white-space: -pre-wrap; /* Opera 4-6 ?? */
- white-space: -o-pre-wrap; /* Opera 7 ?? */
- word-wrap: break-word; /* Internet Explorer 5.5+ */
- _white-space: pre; /* IE only hack to re-specify in addition to word-wrap */
-}
-
-table.vars {
- margin:0 1.5em 0 1.5em;
- border-collapse:collapse;
- font-size: 0.9em;
-}
-
-table.vars td {
- font-family: 'Bitstream Vera Sans Mono', 'Courier New', monospace;
- padding: 0.3em;
- border: 1px solid #ddd;
- vertical-align: top;
- background-color: white;
-}
-
-table.vars .name {
- font-style: italic;
-}
-
-table.vars .value {
- color: #555;
-}
-
-table.vars th {
- padding: 0.2em;
- border: 1px solid #ddd;
- background-color: #f2f2f2;
- text-align: left;
-}
-
-#plain {
- display: none;
-}
-
-dl dt {
- padding: 0.2em 0 0.2em 1em;
- font-weight: bold;
- cursor: pointer;
- background-color: #ddd;
-}
-
-dl dt:hover {
- background-color: #bbb; color: white;
-}
-
-dl dd {
- padding: 0 0 0 2em;
- background-color: #eee;
-}
-
-span.p-kw {
- font-weight:bold;
-}
-
-span.p-cmt {
- color:#8CBF83;
-}
-
-span.p-str {
- color:#DEA39B;
-}
-
-span.p-num {
- color:#D2A2D6;
-}
-
-span.p-op {
- color:#0000AA;
-}
-'''
-
-
-def get_uid():
- return str(random()).encode('base64')[3:11]
-
-
-def get_frame_info(tb, context_lines=7):
- """
- Return a dict of informations about a given traceback.
- """
- # line numbers / function / variables
- lineno = tb.tb_lineno
- function = tb.tb_frame.f_code.co_name
- variables = tb.tb_frame.f_locals
-
- # get filename
- fn = tb.tb_frame.f_globals.get('__file__')
- if not fn:
- fn = os.path.realpath(inspect.getsourcefile(tb) or
- inspect.getfile(tb))
- if fn[-4:] in ('.pyc', '.pyo'):
- fn = fn[:-1]
-
- # module name
- modname = tb.tb_frame.f_globals.get('__name__')
-
- # get loader
- loader = tb.tb_frame.f_globals.get('__loader__')
-
- # sourcecode
- try:
- if not loader is None:
- source = loader.get_source(modname)
- else:
- source = file(fn).read()
- except:
- source = ''
- pre_context, post_context = [], []
- context_line, context_lineno = None, None
- else:
- parser = PythonParser(source)
- parser.parse()
- parsed_source = parser.get_html_output()
- lbound = max(0, lineno - context_lines - 1)
- ubound = lineno + context_lines
- try:
- context_line = parsed_source[lineno - 1]
- pre_context = parsed_source[lbound:lineno - 1]
- post_context = parsed_source[lineno:ubound]
- except IndexError:
- context_line = None
- pre_context = post_context = [], []
- context_lineno = lbound
-
- return {
- 'tb': tb,
- 'filename': fn,
- 'loader': loader,
- 'function': function,
- 'lineno': lineno,
- 'vars': variables,
- 'pre_context': pre_context,
- 'context_line': context_line,
- 'post_context': post_context,
- 'context_lineno': context_lineno,
- 'source': source
- }
-
-
-def debug_info(request, context=None, evalex=True):
- """
- Return debug info for the request
- """
- if context is None:
- context = Namespace()
-
- req_vars = []
- for item in dir(request):
- attr = getattr(request, item)
- if not (item.startswith("_") or inspect.isroutine(attr)):
- req_vars.append((item, attr))
- req_vars.sort()
-
- context.req_vars = req_vars
- return DebugRender(context, evalex).render()
-
-
-def get_current_thread():
- return threading.currentThread()
-
-
-class Namespace(object):
- def __init__(self, **kwds):
- self.__dict__.update(kwds)
-
-
-class ThreadedStream(object):
- _orig = None
-
- def __init__(self):
- self._buffer = {}
-
- def install(cls, environ):
- if cls._orig or not environ['wsgi.multithread']:
- return
- cls._orig = sys.stdout
- sys.stdout = cls()
- install = classmethod(install)
-
- def can_interact(cls):
- return not cls._orig is None
- can_interact = classmethod(can_interact)
-
- def push(self):
- tid = get_current_thread()
- self._buffer[tid] = StringIO()
-
- def release(self):
- tid = get_current_thread()
- if tid in self._buffer:
- result = self._buffer[tid].getvalue()
- del self._buffer[tid]
- else:
- result = ''
- return result
-
- def write(self, d):
- tid = get_current_thread()
- if tid in self._buffer:
- self._buffer[tid].write(d)
- else:
- self._orig.write(d)
-
-
-class EvalContext(object):
-
- def __init__(self, frm):
- self.locals = frm.f_locals
- self.globals = frm.f_globals
-
- def exec_expr(self, s):
- sys.stdout.push()
- try:
- try:
- code = compile(s, '<stdin>', 'single', 0, 1)
- exec code in self.globals, self.locals
- except:
- etype, value, tb = sys.exc_info()
- tb = tb.tb_next
- msg = ''.join(traceback.format_exception(etype, value, tb))
- sys.stdout.write(msg)
- finally:
- output = sys.stdout.release()
- return output
-
-
-class PythonParser(object):
- """
- Simple python sourcecode highlighter.
- Usage::
-
- p = PythonParser(source)
- p.parse()
- for line in p.get_html_output():
- print line
- """
-
- _KEYWORD = token.NT_OFFSET + 1
- _TEXT = token.NT_OFFSET + 2
- _classes = {
- token.NUMBER: 'num',
- token.OP: 'op',
- token.STRING: 'str',
- tokenize.COMMENT: 'cmt',
- token.NAME: 'id',
- token.ERRORTOKEN: 'error',
- _KEYWORD: 'kw',
- _TEXT: 'txt',
- }
-
- def __init__(self, raw):
- self.raw = raw.expandtabs(8).strip()
- self.out = StringIO()
-
- def parse(self):
- self.lines = [0, 0]
- pos = 0
- while 1:
- pos = string.find(self.raw, '\n', pos) + 1
- if not pos: break
- self.lines.append(pos)
- self.lines.append(len(self.raw))
-
- self.pos = 0
- text = StringIO(self.raw)
- try:
- tokenize.tokenize(text.readline, self)
- except StandardError:
- pass
-
- def get_html_output(self):
- """ Return line generator. """
- def html_splitlines(lines):
- # this cool function was taken from trac.
- # http://projects.edgewall.com/trac/
- open_tag_re = re.compile(r'<(\w+)(\s.*)?[^/]?>')
- close_tag_re = re.compile(r'</(\w+)>')
- open_tags = []
- for line in lines:
- for tag in open_tags:
- line = tag.group(0) + line
- open_tags = []
- for tag in open_tag_re.finditer(line):
- open_tags.append(tag)
- open_tags.reverse()
- for ctag in close_tag_re.finditer(line):
- for otag in open_tags:
- if otag.group(1) == ctag.group(1):
- open_tags.remove(otag)
- break
- for tag in open_tags:
- line += '</%s>' % tag.group(1)
- yield line
-
- return list(html_splitlines(self.out.getvalue().splitlines()))
-
-
- def __call__(self, toktype, toktext, (srow,scol), (erow,ecol), line):
- oldpos = self.pos
- newpos = self.lines[srow] + scol
- self.pos = newpos + len(toktext)
-
- if toktype in [token.NEWLINE, tokenize.NL]:
- self.out.write('\n')
- return
-
- if newpos > oldpos:
- self.out.write(self.raw[oldpos:newpos])
-
- if toktype in [token.INDENT, token.DEDENT]:
- self.pos = newpos
- return
-
- if token.LPAR <= toktype and toktype <= token.OP:
- toktype = token.OP
- elif toktype == token.NAME and keyword.iskeyword(toktext):
- toktype = self._KEYWORD
- clsname = self._classes.get(toktype, 'txt')
-
- self.out.write('<span class="code-item p-%s">' % clsname)
- self.out.write(escape(toktext))
- self.out.write('</span>')
-
-
-class DebugRender(object):
-
- def __init__(self, context, evalex):
- self.c = context
- self.evalex = evalex
-
- def render(self):
- return '\n'.join([
- self.header(),
- self.traceback(),
- self.request_information(),
- self.footer()
- ])
-
- def header(self):
- data = [
- '<script type="text/javascript">%s</script>' % JAVASCRIPT,
- '<style type="text/css">%s</style>' % STYLESHEET,
- '<div id="wsgi-traceback">'
- ]
-
- if hasattr(self.c, 'exception_type'):
- title = escape(self.c.exception_type)
- exc = escape(self.c.exception_value)
- data += [
- '<h1>%s</h1>' % title,
- '<p class="text important">%s</p>' % exc
- ]
-
- if hasattr(self.c, 'last_frame'):
- data += [
- '<p class="text important">%s in %s, line %s</p>' % (
- self.c.last_frame['filename'], self.c.last_frame['function'],
- self.c.last_frame['lineno'])
- ]
-
- return '\n'.join(data)
-
- def render_code(self, frame):
- def render_line(mode, lineno, code):
- return ''.join([
- '<tr class="%s">' % mode,
- '<td class="lineno">%i</td>' % lineno,
- '<td class="code">%s</td></tr>' % code
- ])
-
- tmp = ['<table class="code">']
- lineno = frame['context_lineno']
- if not lineno is None:
- lineno += 1
- for l in frame['pre_context']:
- tmp.append(render_line('pre', lineno, l))
- lineno += 1
- tmp.append(render_line('cur', lineno, frame['context_line']))
- lineno += 1
- for l in frame['post_context']:
- tmp.append(render_line('post', lineno, l))
- lineno += 1
- else:
- tmp.append(render_line('cur', 1, 'Sourcecode not available'))
- tmp.append('</table>')
-
- return '\n'.join(tmp)
-
- def var_table(self, var):
- # simple data types
- if isinstance(var, basestring) or isinstance(var, float)\
- or isinstance(var, int) or isinstance(var, long):
- return ('<table class="vars"><tr><td class="value">%r'
- '</td></tr></table>' % var)
-
- # dicts
- if isinstance(var, dict) or hasattr(var, 'items'):
- items = var.items()
- items.sort()
-
- # empty dict
- if not items:
- return ('<table class="vars"><tr><th>no data given'
- '</th></tr></table>')
-
- result = ['<table class="vars"><tr><th>Name'
- '</th><th>Value</th></tr>']
- for key, value in items:
- try:
- val = escape(pprint.pformat(value))
- except:
- val = '?'
- result.append('<tr><td class="name">%s</td><td class="value">%s'
- '</td></tr>' % (escape(repr(key)), val))
- result.append('</table>')
- return '\n'.join(result)
-
- # lists
- if isinstance(var, list):
- # empty list
- if not var:
- return ('<table class="vars"><tr><th>no data given'
- '</th></tr></table>')
-
- result = ['<table class="vars">']
- for line in var:
- try:
- val = escape(pprint.pformat(line))
- except:
- val = '?'
- result.append('<tr><td class="value">%s</td></tr>' % (val))
- result.append('</table>')
- return '\n'.join(result)
-
- # unknown things
- try:
- value = escape(repr(var))
- except:
- value = '?'
- return '<table class="vars"><tr><th>%s</th></tr></table>' % value
-
- def exec_code_table(self, uid):
- return '''
- <table class="exec_code">
- <tr>
- <td class="output" colspan="2"><pre id="output-%(tb_uid)s-%(frame_uid)s"></pre></td>
- </tr>
- <tr>
- <td class="input">
- <textarea class="small" id="input-%(tb_uid)s-%(frame_uid)s" value=""></textarea>
- </td>
- <td class="extend">
- <input type="button" onclick="toggleExtend('%(tb_uid)s', '%(frame_uid)s')" value="extend">
- </td>
- </tr>
- </table>
- ''' % {
- 'target': '#',
- 'tb_uid': self.c.tb_uid,
- 'frame_uid': uid
- }
-
- def traceback(self):
- if not hasattr(self.c, 'frames'):
- return ''
-
- result = ['<h2 onclick="change_tb()" class="tb">Traceback (click to switch to raw view)</h2>']
- result.append('<div id="interactive"><p class="text">A problem occurred in your Python WSGI'
- ' application. Here is the sequence of function calls leading up to'
- ' the error, in the order they occurred. Click on a header to show'
- ' context lines.</p>')
-
- for num, frame in enumerate(self.c.frames):
- line = [
- '<div class="frame" id="frame-%i">' % num,
- '<h3 class="fn">%s in %s</h3>' % (frame['function'],
- frame['filename']),
- self.render_code(frame),
- ]
-
- if frame['vars']:
- line.append('\n'.join([
- '<h3 class="indent">⇒ local variables</h3>',
- self.var_table(frame['vars'])
- ]))
-
- if self.evalex and self.c.tb_uid:
- line.append('\n'.join([
- '<h3 class="indent">⇒ execute code</h3>',
- self.exec_code_table(frame['frame_uid'])
- ]))
-
- line.append('</div>')
- result.append(''.join(line))
- result.append('\n'.join([
- '</div>',
- self.plain()
- ]))
- return '\n'.join(result)
-
- def plain(self):
- if not hasattr(self.c, 'plaintb'):
- return ''
- return '''
- <div id="plain">
- <p class="text">Here is the plain Python traceback for copy and paste:</p>
- <pre class="plain">\n%s</pre>
- </div>
- ''' % self.c.plaintb
-
- def request_information(self):
- result = [
- '<h2>Request Data</h2>',
- '<p class="text">The following list contains all important',
- 'request variables. Click on a header to expand the list.</p>'
- ]
-
- if not hasattr(self.c, 'frames'):
- del result[0]
-
- for key, info in self.c.req_vars:
- result.append('<dl><dt>%s</dt><dd>%s</dd></dl>' % (
- escape(key), self.var_table(info)
- ))
-
- return '\n'.join(result)
-
- def footer(self):
- return '\n'.join([
- '<script type="text/javascript">initTB();</script>',
- '</div>',
- '<div class="footer">Brought to you by '
- '<span style="font-style: normal">DON\'T PANIC</span>, your friendly '
- 'Colubrid traceback interpreter system.</div>',
- hasattr(self.c, 'plaintb')
- and ('<!-- Plain traceback:\n\n%s-->' % self.c.plaintb)
- or '',
- ])
-
-
-
-class DebuggedApplication(object):
- """
- Enables debugging support for a given application::
-
- from colubrid.debug import DebuggedApplication
- from myapp import app
- app = DebuggedApplication(app)
-
- Or for a whole package::
-
- app = DebuggedApplication("myapp:app")
- """
-
- def __init__(self, application, evalex=True):
- self.evalex = bool(evalex)
- if not isinstance(application, basestring):
- self.application = application
- else:
- try:
- self.module, self.handler = application.split(':', 1)
- except ValueError:
- self.module = application
- self.handler = 'app'
- self.tracebacks = {}
-
- def __call__(self, environ, start_response):
- # exec code in open tracebacks
- if self.evalex and environ.get('PATH_INFO', '').strip('/').endswith('__traceback__'):
- parameters = cgi.parse_qs(environ['QUERY_STRING'])
- try:
- tb = self.tracebacks[parameters['tb'][0]]
- frame = parameters['frame'][0]
- context = tb[frame]
- code = parameters['code'][0].replace('\r','')
- except (IndexError, KeyError):
- pass
- else:
- result = context.exec_expr(code)
- start_response('200 OK', [('Content-Type', 'text/plain')])
- yield result
- return
- appiter = None
- try:
- if hasattr(self, 'application'):
- result = self.application(environ, start_response)
- else:
- module = __import__(self.module, '', '', [''])
- app = getattr(module, self.handler)
- result = app(environ, start_response)
- appiter = iter(result)
- for line in appiter:
- yield line
- except:
- ThreadedStream.install(environ)
- exc_info = sys.exc_info()
- try:
- headers = [('Content-Type', 'text/html')]
- start_response('500 INTERNAL SERVER ERROR', headers)
- except:
- pass
- debug_context = self.get_debug_context(exc_info)
- yield debug_info(environ.get('colubrid.request'), debug_context, self.evalex)
-
- if hasattr(appiter, 'close'):
- appiter.close()
-
- def get_debug_context(self, exc_info):
- exception_type, exception_value, tb = exc_info
- # skip first internal frame
- if not tb.tb_next is None:
- tb = tb.tb_next
- plaintb = ''.join(traceback.format_exception(*exc_info))
-
- # load frames
- frames = []
- frame_map = {}
- tb_uid = None
- if ThreadedStream.can_interact():
- tb_uid = get_uid()
- frame_map = self.tracebacks[tb_uid] = {}
-
- # walk through frames and collect informations
- while tb is not None:
- if tb_uid:
- frame_uid = get_uid()
- frame_map[frame_uid] = EvalContext(tb.tb_frame)
- else:
- frame_uid = None
- frame = get_frame_info(tb)
- frame['frame_uid'] = frame_uid
- frames.append(frame)
- tb = tb.tb_next
-
- if exception_type.__module__ == "exceptions":
- extypestr = exception_type.__name__
- else:
- extypestr = str(exception_type)
-
- return Namespace(
- exception_type = extypestr,
- exception_value = str(exception_value),
- frames = frames,
- last_frame = frames[-1],
- plaintb = plaintb,
- tb_uid = tb_uid,
- frame_map = frame_map
- )
-
diff --git a/lib/colubrid/exceptions.py b/lib/colubrid/exceptions.py
@@ -1,145 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
- Colubrid Exceptions
- ===================
-
- Since paste covers that too this is only a "redirection module".
- Because colubrid may change the error interface later it's
- better to use the mapped names instead of the paste.httpexceptions
- module.
-"""
-
-__all__ = ['PageNotFound', 'PageGone', 'AccessDenied', 'BadRequest',
- 'RequestTimeout', 'ServerError', 'HttpRedirect', 'HttpFound',
- 'HttpMoved']
-
-
-ERROR_PAGE_TEMPLATE = """\
-<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
-<html>
- <head>
- <title>%(code)s %(title)s</title>
- <style type="text/css">
- body {
- font-family: sans-serif;
- margin: 2em;
- padding: 0;
- }
- a, h1 {
- color: #cc0000;
- }
- div.content {
- margin: 1em 3em 2em 2em;
- }
- address {
- border-top: 1px solid #ccc;
- padding: 0.3em;
- }
- </style>
- </head>
- <body>
-<h1>%(title)s</h1>
-<div class="content">%(msg)s</div>
-<address>powered by colubrid %(version)s</address>
-</body></html>
-"""
-
-
-class HttpException(Exception):
- """Base for HTTP exceptions. Not to be used directly."""
- code = None
- title = None
- msg = ''
- headers = None
-
- def get_error_page(self):
- from colubrid.utils import get_version
- from cgi import escape
- return ERROR_PAGE_TEMPLATE % {
- 'code': self.code,
- 'title': escape(self.title),
- 'msg': escape(self.msg),
- 'version': get_version()
- }
-
- def get_headers(self):
- if not self.headers:
- return []
- return self.headers[:]
-
- def __repr__(self):
- return '<%s %d>' % (self.__class__.__name__, self.code)
-
-
-class HttpMove(HttpException):
- """Automatically add a "Location:" header to the result."""
- msg = 'The resource has been moved to %s.'
-
- def __init__(self, url):
- self.headers = [('Location', url)]
- if '%s' in self.msg:
- self.msg = self.msg % url
-
-
-class PageNotFound(HttpException):
- """HTTP 404."""
- code = 404
- title = 'Not Found'
- msg = 'The resource could not be found.'
-
-
-class PageGone(HttpException):
- """HTTP 410."""
- code = 410
- title = 'Gone'
- msg = ('This resource is no longer available. '
- 'No forwarding address is given.')
-
-
-class AccessDenied(HttpException):
- """HTTP 403."""
- code = 403
- title = 'Forbidden'
- msg = 'Access was denied to this resource.'
-
-
-class BadRequest(HttpException):
- """HTTP 400."""
- code = 400
- title = 'Bad Request'
- msg = 'The server could not comply with the request since it is either '\
- 'malformed or wtherwise incorrect.'
-
-
-class RequestTimeout(HttpException):
- """HTTP 408."""
- code = 408
- title = 'Request Timeout'
- msg = 'There was a conflict when trying to complete your request.'
-
-
-class ServerError(HttpException):
- """HTTP 500."""
- code = 500
- title = 'Internal Server Error'
- msg = 'The server has either erred or is inapable of performing '\
- 'the requested operation.'
-
-
-class HttpRedirect(HttpMove):
- """HTTP 307."""
- code = 307
- title = 'Temporary Redirect'
-
-
-class HttpFound(HttpMove):
- """HTTP 302."""
- code = 302
- title = 'Found'
- msg = 'The resource was found at %s.'
-
-
-class HttpMoved(HttpMove):
- """HTTP 301."""
- code = 301
- title = 'Moved Permanently'
diff --git a/lib/colubrid/reloader.py b/lib/colubrid/reloader.py
@@ -1,92 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
- Reloader Module
- ===============
-
- Taken from django, which took it from cherrypy / paste
-"""
-# Portions copyright (c) 2004, CherryPy Team (team@cherrypy.org)
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without modification,
-# are permitted provided that the following conditions are met:
-#
-# * Redistributions of source code must retain the above copyright notice,
-# this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above copyright notice,
-# this list of conditions and the following disclaimer in the documentation
-# and/or other materials provided with the distribution.
-# * Neither the name of the CherryPy Team nor the names of its contributors
-# may be used to endorse or promote products derived from this software
-# without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
-# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
-# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
-# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
-# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
-# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
-# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
-# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-import os
-import sys
-import thread
-import time
-
-RUN_RELOADER = True
-
-def reloader_thread(watch):
- """This thread watches Python and "watch" files and reloads the
- application if something changes."""
- mtimes = {}
- win = sys.platform == 'win32'
-
- while RUN_RELOADER:
- for filename in [getattr(m, '__file__', '') for m
- in sys.modules.values()] + watch:
- if filename[-4:] in ('.pyo', '.pyc'):
- filename = filename[:-1]
- if not os.path.exists(filename):
- continue
- stat = os.stat(filename)
- mtime = stat.st_mtime
- if win:
- mtime -= stat.st_ctime
- if filename not in mtimes:
- mtimes[filename] = mtime
- continue
- if mtime != mtimes[filename]:
- sys.exit(3) # force reload
- time.sleep(1)
-
-
-def restart_with_reloader():
- """Spawn a new Python interpreter with the same arguments as this one,
- but running the reloader thread."""
- while True:
- args = [sys.executable] + sys.argv
- if sys.platform == 'win32':
- args = ['"%s"' % arg for arg in args]
- new_environ = os.environ.copy()
- new_environ['RUN_MAIN'] = 'true'
- exit_code = os.spawnve(os.P_WAIT, sys.executable, args, new_environ)
- if exit_code != 3:
- return exit_code
-
-
-def main(main_func, watch=[]):
- """Call this to initialize the reloader."""
- if os.environ.get('RUN_MAIN') == 'true':
- thread.start_new_thread(main_func, ())
- try:
- reloader_thread(watch)
- except KeyboardInterrupt:
- pass
- else:
- try:
- sys.exit(restart_with_reloader())
- except KeyboardInterrupt:
- pass
diff --git a/lib/colubrid/request.py b/lib/colubrid/request.py
@@ -1,191 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
- Colubrid Request Object
- =======================
-"""
-
-from __future__ import generators
-from colubrid.utils import MultiDict, MergedMultiDict, FieldStorage,\
- get_full_url
-from colubrid.response import HttpResponse
-
-import posixpath
-import cgi
-import email
-from urllib import quote
-from email.Message import Message as MessageType
-from cStringIO import StringIO
-from Cookie import SimpleCookie
-
-
-__all__ = ('Request', 'ResponseRequest')
-
-
-class Request(object):
- """
- The central Request object. It stores all data coming from
- the HTTP client.
- """
-
- def __init__(self, environ, start_response, charset='utf-8'):
- self.charset = charset
- self.start_response = start_response
- self.environ = environ
- self.environ['REQUEST_URI'] = get_full_url(self.environ)
-
- # copy a reference to the request object
- # into the environ so wsgi middlewares
- # can communicate with it.
- environ['colubrid.request'] = self
-
- # get absolute path to script
- root = self.environ.get('SCRIPT_NAME', '/')
- if not root or not root.startswith('/'):
- root = '/' + root
- self.environ['SCRIPT_ROOT'] = root
-
- # get the full application request
- url = ''.join([
- quote(self.environ['SCRIPT_NAME']),
- quote(self.environ.get('PATH_INFO', ''))
- ])
- if not url.startswith('/'):
- url = '/' + url
- self.environ['APPLICATION_REQUEST'] = url
-
- def read(self, *args):
- if not hasattr(self, '_buffered_stream'):
- self._buffered_stream = StringIO(self.data)
- return self._buffered_stream.read(*args)
-
- def readline(self, *args):
- if not hasattr(self, '_buffered_stream'):
- self._buffered_stream = StringIO(self.data)
- return self._buffered_stream.readline(*args)
-
- def readlines(self, *args):
- while True:
- line = self.readline(*args)
- if not line:
- raise StopIteration()
- yield line
-
- def _load_post_data(self):
- self._data = ''
- if self.environ['REQUEST_METHOD'] == 'POST':
- maxlen = int(self.environ['CONTENT_LENGTH'])
- self._data = self.environ['wsgi.input'].read(maxlen)
- if self.environ.get('CONTENT_TYPE', '').startswith('multipart'):
- lines = ['Content-Type: %s' %
- self.environ.get('CONTENT_TYPE', '')]
- for key, value in self.environ.items():
- if key.startswith('HTTP_'):
- lines.append('%s: %s' % (key, value))
- raw = '\r\n'.join(lines) + '\r\n\r\n' + self._data
- msg = email.message_from_string(raw)
- self._post = MultiDict()
- self._files = MultiDict()
- for sub in msg.get_payload():
- if not isinstance(sub, MessageType):
- continue
- name_dict = cgi.parse_header(sub['Content-Disposition'])[1]
- if 'filename' in name_dict:
- payload = sub.get_payload()
- filename = name_dict['filename']
- if isinstance(payload, list) or not filename.strip():
- continue
- filename = name_dict['filename']
- #XXX: fixes stupid ie bug but can cause problems
- filename = filename[filename.rfind('\\') + 1:]
- if 'Content-Type' in sub:
- content_type = sub['Content-Type']
- else:
- content_type = None
- s = FieldStorage(name_dict['name'], filename,
- content_type, payload)
- self._files.appendlist(name_dict['name'], s)
- else:
- value = sub.get_payload()
- if not self.charset is None:
- value = value.decode(self.charset, 'ignore')
- self._post.appendlist(name_dict['name'], value)
- else:
- d = cgi.parse_qs(self._data, True)
- if not self.charset is None:
- for key, value in d.iteritems():
- d[key] = [i.decode(self.charset, 'ignore')
- for i in value]
- self._post = MultiDict(d)
- self._files = MultiDict()
- else:
- self._post = MultiDict()
- self._files = MultiDict()
-
- def args(self):
- if not hasattr(self, '_get'):
- query = cgi.parse_qs(self.environ.get('QUERY_STRING', ''), True)
- if not self.charset is None:
- for key, value in query.iteritems():
- query[key] = [i.decode(self.charset, 'ignore')
- for i in value]
- self._get = MultiDict(query)
- return self._get
-
- def form(self):
- if not hasattr(self, '_post'):
- self._load_post_data()
- return self._post
-
- def values(self):
- if not hasattr(self, '_values'):
- self._values = MergedMultiDict(self.args, self.form)
- return self._values
-
- def files(self):
- if not hasattr(self, '_files'):
- self._load_post_data()
- return self._files
-
- def cookies(self):
- if not hasattr(self, '_cookie'):
- self._cookie = SimpleCookie()
- self._cookie.load(self.environ.get('HTTP_COOKIE', ''))
- return self._cookie
-
- def data(self):
- if not hasattr(self, '_data'):
- self._load_post_data()
- return self._data
-
- args = property(args, doc='url paramters')
- form = property(form, doc='form data')
- files = property(files, doc='submitted files')
- values = property(values, doc='url parameters and form data')
- cookies = property(cookies, doc='cookies')
- data = property(data, doc='raw value of input stream')
-
-
-class ResponseRequest(Request, HttpResponse):
- """
- A Request that's a Response too. This way users can call
- request.write() etc.
- """
-
- def __init__(self, environ, start_response, charset='utf-8'):
- Request.__init__(self, environ, start_response, charset)
- HttpResponse.__init__(self, [], [], 200)
-
-
-class RoutesRequest(Request):
-
- def __init__(self, app, environ, start_response, charset='utf-8'):
- super(RoutesRequest, self).__init__(environ, start_response, charset)
- self.app = app
-
- def link_to(self, __controller__, **kwargs):
- controller = self.app._controller_map.get(__controller__)
- root = self.environ['SCRIPT_ROOT']
- link = self.app._routes_map.generate(controller, **kwargs)
- if link is None:
- return root
- return posixpath.join(root, link[1:])
diff --git a/lib/colubrid/response.py b/lib/colubrid/response.py
@@ -1,142 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
- Colubrid Response
-"""
-from __future__ import generators
-from colubrid.utils import HttpHeaders
-from colubrid.const import HTTP_STATUS_CODES
-from Cookie import SimpleCookie
-from datetime import datetime
-from time import gmtime
-
-
-__all__ = ('HttpResponse',)
-
-
-class HttpResponse(object):
- """
- The Response object is used to collect the data to be written
- back to the HTTP client.
- """
-
- def __init__(self, response=None, headers=None, status=200):
- if response is None:
- self.response = []
- elif isinstance(response, basestring):
- self.response = [response]
- else:
- self.response = response
- if headers is None:
- self.headers = HttpHeaders([])
- elif isinstance(headers, list):
- self.headers = HttpHeaders(headers)
- elif isinstance(headers, HttpHeaders):
- self.headers = headers
- else:
- raise TypeError('invalid header format')
- self.status = status
- self._cookies = None
-
- def __setitem__(self, name, value):
- self.headers[name] = value
-
- def __getitem__(self, name):
- self.headers.get(name)
-
- def __delitem__(self, name):
- del self.headers[name]
-
- def __contains__(self, name):
- return name in self.headers
-
- def __len__(self):
- if isinstance(self.response, list):
- length = 0
- for item in self.response:
- length += len(item)
- return length
- try:
- return len(self.response)
- except:
- return 0
-
- def write(self, d):
- if not isinstance(self.response, list):
- raise TypeError('read only or dynamic response object')
- elif isinstance(d, basestring):
- self.response.append(d)
- else:
- raise TypeError('str or unicode required')
-
- def set_cookie(self, key, value='', max_age=None, expires=None,
- path='/', domain=None, secure=None):
- if self._cookies is None:
- self._cookies = SimpleCookie()
- self._cookies[key] = value
- if not max_age is None:
- self._cookies[key]['max-age'] = max_age
- if not expires is None:
- if isinstance(expires, basestring):
- self._cookies[key]['expires'] = expires
- expires = None
- elif isinstance(expires, datetime):
- expires = expires.utctimetuple()
- elif not isinstance(expires, (int, long)):
- expires = gmtime(expires)
- else:
- raise ValueError('datetime or integer required')
- if not expires is None:
- now = gmtime()
- month = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul',
- 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'][now.tm_mon - 1]
- day = ['Monday', 'Tuesday', 'Wednesday', 'Thursday',
- 'Friday', 'Saturday', 'Sunday'][expires.tm_wday]
- date = '%02d-%s-%s' % (
- now.tm_mday, month, str(now.tm_year)[-2:]
- )
- d = '%s, %s %02d:%02d:%02d GMT' % (day, date, now.tm_hour,
- now.tm_min, now.tm_sec)
- self._cookies[key]['expires'] = d
- if not path is None:
- self._cookies[key]['path'] = path
- if not domain is None:
- self._cookies[key]['domain'] = domain
- if not secure is None:
- self._cookies[key]['secure'] = secure
-
- def delete_cookie(self, key):
- if self._cookies is None:
- self._cookies = SimpleCookie()
- if not key in self._cookies:
- self._cookies[key] = ''
- self._cookies[key]['max-age'] = 0
-
- def __call__(self, request):
- if not 'Content-Type' in self.headers:
- self.headers['Content-Type'] = 'text/html; charset=%s' % \
- str(request.charset)
- headers = self.headers.get()
- if not self._cookies is None:
- for morsel in self._cookies.values():
- headers.append(('Set-Cookie', morsel.output(header='')))
- status = '%d %s' % (self.status,
- HTTP_STATUS_CODES.get(self.status, 'UNKNOWN'))
- request.start_response(status, headers)
- if self.response is None:
- yield ''
- elif isinstance(self.response, unicode):
- yield self.response.encode(request.charset)
- elif isinstance(self.response, str):
- yield self.response
- else:
- try:
- iterator = iter(self.response)
- except TypeError:
- raise TypeError('%r is not an valid response' % self.response)
- for line in iterator:
- if isinstance(line, unicode):
- yield line.encode(request.charset)
- elif isinstance(line, str):
- yield line
- else:
- raise TypeError('%r is not string or unicode' % line)
diff --git a/lib/colubrid/server.py b/lib/colubrid/server.py
@@ -1,80 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
- Colubrid Execute
- ================
-
- Since the 0.10 version of colubrid this is only a small helper module
- which uses paster to serve the application.
-"""
-from __future__ import generators
-import sys
-import os
-from threading import Thread
-
-__all__ = ('execute', 'run_test', 'stop_test')
-
-
-class StaticExports(object):
-
- def __init__(self, app, exports):
- self.application = app
- self.exports = exports
-
- def serve_file(self, filename, start_response):
- from mimetypes import guess_type
- guessed_type = guess_type(filename)
- if guessed_type[0] is None:
- mime_type = 'text/plain'
- else:
- mime_type = guessed_type[0]
- start_response('200 OK', [('Content-Type', mime_type)])
- fp = file(filename, 'rb')
- try:
- result = fp.read()
- finally:
- fp.close()
- return iter([result])
-
- def __call__(self, environ, start_response):
- path_info = environ.get('PATH_INFO', '')
- for search_path, file_path in self.exports.iteritems():
- if not search_path.endswith('/'):
- search_path += '/'
- if path_info.startswith(search_path):
- real_path = os.path.join(file_path, path_info[len(search_path):])
- if os.path.exists(real_path) and os.path.isfile(real_path):
- return self.serve_file(real_path, start_response)
- return self.application(environ, start_response)
-
-
-def execute(app=None, debug=True, hostname='localhost', port=8080,
- reload=False, evalex=False):
- if app is None:
- frm = sys._getframe().f_back
- if not 'app' in frm.f_globals:
- raise RuntimeError('no application found')
- app = frm.f_globals['app']
- if debug:
- from colubrid.debug import DebuggedApplication
- app = DebuggedApplication(app, evalex)
-
- try:
- from paste import httpserver
- run = lambda: httpserver.serve(app, host=hostname, port=str(port))
- except ImportError:
- try:
- from BaseWSGIServer import WSGIServer
- run = WSGIServer(app, hostname, port).serve_forever
- except ImportError:
- try:
- from wsgiref.simple_server import make_server
- run = make_server(hostname, port, app).serve_forever
- except ImportError:
- run = None
- if run is None:
- raise RuntimeError('no standalone wsgi server found')
- if reload:
- from colubrid import reloader
- reloader.main(run)
- else:
- run()
diff --git a/lib/colubrid/utils.py b/lib/colubrid/utils.py
@@ -1,377 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
- Colubrid Utilities
- ------------------
-"""
-
-from __future__ import generators
-from urllib import quote
-from cStringIO import StringIO
-import posixpath
-
-
-def get_version():
- """return the colubrid and python version."""
- from colubrid import __version__
- from sys import version
- return '%s - Python %s' % (__version__, version.split('\n')[0].strip())
-
-
-class MultiDict(dict):
- #adopted from django
-
- def __init__(self, key_to_list_mapping=()):
- dict.__init__(self, key_to_list_mapping)
-
- def __getitem__(self, key):
- """
- Return the last data value for this key, or [] if it's an empty list;
- raises KeyError if not found.
- """
- list_ = dict.__getitem__(self, key)
- try:
- return list_[-1]
- except IndexError:
- return []
-
- def _setitem_list(self, key, value):
- dict.__setitem__(self, key, [value])
- __setitem__ = _setitem_list
-
- def get(self, key, default=None):
- """Return the default value if the requested data doesn't exist"""
- try:
- val = self[key]
- except KeyError:
- return default
- if val == []:
- return default
- return val
-
- def getlist(self, key):
- """Return an empty list if the requested data doesn't exist"""
- try:
- return dict.__getitem__(self, key)
- except KeyError:
- return []
-
- def setlist(self, key, list_):
- dict.__setitem__(self, key, list_)
-
- def setdefault(self, key, default=None):
- if key not in self:
- self[key] = default
- return self[key]
-
- def setlistdefault(self, key, default_list=()):
- if key not in self:
- self.setlist(key, default_list)
- return self.getlist(key)
-
- def appendlist(self, key, value):
- """Append an item to the internal list associated with key."""
- self.setlistdefault(key, [])
- dict.__setitem__(self, key, self.getlist(key) + [value])
-
- def items(self):
- """
- Return a list of (key, value) pairs, where value is the last item in
- the list associated with the key.
- """
- return [(key, self[key]) for key in self.keys()]
-
- def lists(self):
- """Return a list of (key, list) pairs."""
- return dict.items(self)
-
- def values(self):
- """Returns a list of the last value on every key list."""
- return [self[key] for key in self.keys()]
-
- def copy(self):
- """Returns a copy of this object."""
- import copy
- MultiDict.__setitem__ = dict.__setitem__
- cp = copy.deepcopy(self)
- MultiDict.__setitem__ = MultiDict._setitem_list
- return cp
-
- def update(self, other_dict):
- """update() extends rather than replaces existing key lists."""
- if isinstance(other_dict, MultiDict):
- for key, value_list in other_dict.lists():
- self.setlistdefault(key, []).extend(value_list)
- else:
- for key, value in other_dict.items():
- self.setlistdefault(key, []).append(value)
-
- def as_dict(self):
- """
- A MultiDict has lists as its real values, which means if we construct
- a dict from it or use it as keyword args (really anything which touches
- the internal dict implementation directly, instead of going through
- __getitem__) then we will get lists everywhere.
-
- This method returns a real dict, with the same keys as this MultiDict,
- but with just the last value for each key (or None if it has no
- values).
- """
- return dict((k, self.get(k, None)) for k in self.iterkeys())
-
-
-
-class MergedMultiDict(object):
- """
- A simple class for creating new "virtual" dictionaries that actualy look
- up values in more than one MultiDict dictionary, passed in the constructor.
- """
- def __init__(self, *dicts):
- self._dicts = dicts
-
- def __getitem__(self, key):
- for d in self._dicts:
- try:
- return d[key]
- except KeyError:
- pass
- raise KeyError
-
- def __repr__(self):
- tmp = {}
- for dict_ in self._dicts:
- tmp.update(dict_)
- return repr(tmp)
-
- def __iter__(self):
- return self.iterkeys()
-
- def copy(self):
- return dict(self.iteritems())
-
- def iterkeys(self):
- for d in self._dicts:
- for key in d:
- yield key
-
- def itervalues(self):
- for d in self._dicts:
- for value in d.itervalues():
- yield value
-
- def iteritems(self):
- for d in self._dicts:
- for item in d.iteritems():
- yield item
-
- def get(self, key, default):
- try:
- return self[key]
- except KeyError:
- return default
-
- def getlist(self, key):
- for d in self._dicts:
- try:
- return d.getlist(key)
- except KeyError:
- pass
- raise KeyError
-
- def items(self):
- return list(self.iteritems())
-
- def __contains__(self, key):
- for d in self._dicts:
- if d.has_key(key):
- return True
- return False
-
- def has_key(self, key):
- return self.__contains__(key)
-
-
-class FieldStorage(object):
-
- def __init__(self, name, filename, ftype, data):
- self.name = name
- self.type = ftype
- self.filename = filename
- self.data = data
-
- def read(self, *args):
- if not hasattr(self, '_cached_buffer'):
- self._cached_buffer = StringIO(self.data)
- return self._cached_buffer.read(*args)
-
- def readline(self, *args):
- if not hasattr(self, '_cached_buffer'):
- self._cached_buffer = StringIO(self.data)
- return self._cached_buffer.readline(*args)
-
- def readlines(self):
- result = []
- while True:
- row = self.readline()
- if not row:
- break
- result.append(row)
- return result
-
- def __iter__(self):
- while True:
- row = self.readline()
- if not row:
- return
- yield row
-
- def __repr__(self):
- return '%s (%s)' % (self.filename, self.type)
-
- def __str__(self):
- return self.__repr__()
-
-
-class HttpHeaders(object):
-
- def __init__(self, defaults=None):
- if defaults is None:
- self._defaults = []
- elif isinstance(defaults, dict):
- self._defaults = dict.items()
- elif isinstance(defaults, list):
- self._defaults = defaults[:]
- else:
- raise TypeError('invalid default value')
- self.reset()
-
- def __setitem__(self, key, value):
- self.set(key, value)
-
- def __delitem__(self, key):
- self.remove(key)
-
- def __contains__(self, key):
- if not isinstance(key, basestring):
- raise TypeError('keys have to be string')
- key = key.lower()
- for k, v in self.data:
- if k.lower() == key:
- return True
- return False
-
- def add(self, key, value):
- """add a new header tuple to the list"""
- self.data.append((key, value))
-
- def remove(self, key, count=-1):
- """removes count header tuples from the list
- where key matches
- """
- removed = 0
- data = []
- for _key, _value in self.data:
- if _key.lower() != key.lower():
- if count > -1:
- if removed >= count:
- break
- else:
- removed += 1
- data.append((_key, _value))
- self.data = data
-
- def reset(self):
- """Reset to default headers."""
- self.data = self._defaults
-
- def clear(self):
- """clears all headers"""
- self.data = []
-
- def set(self, key, value):
- """remove all header tuples for key and add
- a new one
- """
- self.remove(key)
- self.add(key, value)
-
- def get(self, key=False, httpformat=False):
- """returns matching headers as list
-
- if httpformat is set the result is a HTTP
- header formatted list.
- """
- if not key:
- result = self.data
- elif not isinstance(key, basestring):
- raise TypeError('keys have to be string')
- else:
- result = []
- for k, v in self.data:
- if k.lower() == key.lower():
- result.append((str(k), str(v)))
- if httpformat:
- return '\n'.join(['%s: %s' % item for item in result])
- return result
-
-
-def get_full_url(environ, append=None):
- if 'REQUEST_URI' in environ and append is None:
- return environ['REQUEST_URI']
-
- url = environ['wsgi.url_scheme']+'://'
- if environ.get('HTTP_HOST'):
- url += environ['HTTP_HOST']
- else:
- url += environ['SERVER_NAME']
- if environ['wsgi.url_scheme'] == 'https':
- if environ['SERVER_PORT'] != '443':
- url += ':' + environ['SERVER_PORT']
- else:
- if environ['SERVER_PORT'] != '80':
- url += ':' + environ['SERVER_PORT']
-
- if append is None:
- url += quote(environ.get('SCRIPT_NAME', ''))
- url += quote(environ.get('PATH_INFO', ''))
- if environ.get('QUERY_STRING'):
- url += '?' + environ['QUERY_STRING']
- else:
- url += append
- return url
-
-
-def splitpath(p):
- return [s for s in (posixpath.normpath(posixpath.join('/', p)) +
- (p and p[-1] == '/' and '/' or '')).split('/') if s]
-
-
-def fix_slash(environ, wantslash):
- """
- Fixes the trailing slash in an url.
- If the user requests an container object without an slash it
- will appends one.
- Requested an non container object with an traling slash will
- result in an redirect to the same url without it.
- the QUERY_STRING won't get lost but post data would. So don't
- forget the slash problem in your form actions ;-)
- """
- from colubrid.exceptions import HttpMoved
- #FIXME
- # argh. never did something that supid
- # find a better solution for that problem.
- url = quote(environ.get('SCRIPT_NAME', ''))
- url += quote(environ.get('PATH_INFO', ''))
- query = environ.get('QUERY_STRING', '')
- oldurl = query and ('%s?%s' % (url, query)) or url
-
- if oldurl and oldurl != '/':
- if url.endswith('/'):
- if not wantslash:
- url = url[:-1]
- else:
- if wantslash:
- url += '/'
-
- newurl = query and ('%s?%s' % (url, query)) or url
- if oldurl != newurl:
- raise HttpMoved(newurl)