This article shows one way to implement complex authentication and authorization in Pylons without using third-party auth libraries. I'm not opposed to AuthKit or repoze.who, but I already had working code from another project, and decided it would be easier to keep the code rather than trying to shoehorn it into a foreign API and run the risk that it might not work.
Features of this system:
- Authentication is via LDAP or a MySQL database depending on the form of the username.
- Regardless of authentication method, the database can contain additional permission information.
- A permission has a string name and boolean status.
- A Role is a set of permissions and the logic to evaluate them (which can be arbitrarily complex).
- Every user is assigned to exactly one Role, either on a per-user basis or by applying a general formula.
- A Rule provides finer-grained authorization, such as access to a specific record.
- Two optional backdoors are provided. A generic login is automatically created for every role, and a "use_auth" setting in the config file can bypass authentication for development.
A more sophisticated system would allow multiple roles per user, or roles of roles. This could be built on top of this system with some database changes. Zope has a very sophisticated system which could be used as a model.
The following description is based on an existing system but the description has not been tested as-is, so you may find a missing piece or inconsistency here or there.
Authorization code
The base controller provides two methods .has_perm and ._REQUIRE_PERM. Both take a positional arg (the permission name) and optional keyword args (current-record data if needed to apply a rule). _has_perm returns false if the user is not logged in or does not have that permission. This can be used to selectively display links to restricted pages. _REQUIRE_PERM raises HTTP 403 Forbidden if _has_perm returns false.
The base controller's .__before__ method, called before every request, handles authentication and also applies controller-wide authorization.
- is_auth = paste.deploy.converters.asbool(config.get("use_auth", True))
- user = session.get("user", None)
- If "is_auth" is false, set the user to the admin bypass user.
- If "is_auth" is true and the current user is admin-bypass, log them out by setting user = None and session["user"] = None.
- If there is a user, set request.environ["REMOTE_USER"] = user.username.
- If there is not a user and self.controller_require_perm is non-empty, save the requested URL in the session and redirect to the login page.
- If self.controller_require_perm is set to a permission name, call ._REQUIRE_PERM. This allows controllers to impose a controller-wide permission.
.controller_require_perm should be set to the most permissive permission common to all actions in the controller. For instance, an admin-only controller may be set to "admin". At the opposite extreme, the pseudo permission "authenticated" merely requires the user to be authenticated. The attribute can also be set to None to allow anonymous access. Any action that's more restrictive than its controller's permission should call ._REQUIRE_PERM to enforce that restriction. The base controller should set it to the most restrictive permission of all (e.g., "admin") in case a controller doesn't override it. The login controller must set it to None because anonymous users must be able to use the login page.
The following code implements the above scheme. ._has_perm delegates to a method in the User object which we haven't seen yet:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 | class BaseController(WSGIController): controller_require_perm = "admin" # Default to the strictest permission. def __call__(self, environ, start_response): ... def __before__(self): """Do authentication and authorization according to controller class attributes. The login controller MUST set controller_require_perm = None, or the login redirect will cause an infinite redirect! """ is_auth = config["use_auth"] user = session.get("user") is_admin_bypass = isinstance(user, AdminBypassUser) if not config["use_auth"]: if not is_admin_bypass: # Switch to bypass user. session["user"] = user = AdminBypassUser() return if is_admin_bypass: # Log bypass user out. session["user"] = user = None if user: # Set the HTTP variable for the current user. request.environ["REMOTE_USER"] = user.username if self.controller_require_perm: if not user: # Require login. referer = current_url() session["after_login"] = referer log.debug("setting session['after_login'] to %r", referer) session.save() redirect_to("login") self._REQUIRE_PERM(self.controller_require_perm) def _has_perm(self, perm, **perm_kw): """This method recognizes two additional ``perm`` values besides those handled by the User object: ``None``: no permission required. ``"authenticated"``: user must be logged in, but no additional permission required. """ if perm is None: return True user = session.get("user") if not user: return False if perm == "authenticated": return True return session["user"].has_perm(perm, **perm_kw) def _REQUIRE_PERM(self, perm, **perm_kw): """Raise a Forbidden error if the user doesn't have the permission. The method name is capitalized to make it easier to find and audit in code. """ if not self._has_perm(perm, **perm_kw): reason = "'%s' permission required for the requested operation." abort(403, reason % perm.upper()) |
You'll need the following function:
1 2 3 4 5 6 7 | def current_url(): # Temporary kludge until pylons.url.current() stabilizes and returns # the query string. if request.query_string: return "%s?%s" % (request.path_info, request.query_string) else: return request.path_info |
If any controller overrides ._before_, it must call the superclass version.
User object
User objects have a .has_perm method which the controller's ._has_perm calls. It takes a permission name and optional keyword arguments, and returns true if the user is authorized. There are three ways to implement this:
- ._has_perm can be a huge method that does all the calculations itself.
- You can have different User subclasses, each implementing a certain role.
- You can store the role and rule permissions in the database and extract them on login. This allows admins to modify the permissions or create roles through the web, without restarting the application afterward.
I first went with the second scheme, but later switched to the third so that non-programmer admins could modify and browse permissions.
Database structure
The Role table contains a role name (primary key), its sort order (for the admin screens), and boolean fields for the various permissions. A permission can be general (an entire section of the application) or specific (view, add, modify, or delete a resource type).
The User table contains the following fields:
- username (primary key)
- password (null if LDAP authentication)
- role name (foreign key)
- notes (a string explaining who the user is, or who authorized this account)
- expire (null if account has no expiration date)
- last_login (last login date, or null if never logged in)
- create_date (when this account was created)
The optional Rule table allows for record-specific permissions:
- rule_id (integer primary key)
- username (foreign key)
- role (foreign key)
- scope (an uppercase letter denoting which type of record this rule applies to)
- ids (a space-separated list of numeric record IDs within the scope)
- notes (comments)
The idea is that a user has his primary role which is restrictive, but a Rule can promote him to a more permissive role in the context of a certain record.
Authentication
The login URL displays a form with username and password fields. The submit action verifies both fields are filled in and that the username is restricted to a certain range of characters. If that succeeds, authenticate in the following manner:
- If generic logins are supported and the username consists of a role name surrounded by certain special characters and the password matches a predetermined value, this is a generic user, used by admins to test the application's permissions.
- If the username ends in our email domain ("@example.com"), drop the suffix and authenticate via LDAP.
- Otherwise look for a user record in the database and see if the password matches.
You'll have to modify this to match your needs. For instance, if you don't want your LDAP users to type their complete email address, there may be no way to distinguish them syntactically from non-LDAP users, so you'll have to try one authenticator first (LDAP or database) and then try another authenticator if it fails. Of course, you could have another form field to specify the type of user, which would then imply which authenticator to use.
Failed authentication should raise one of the following exceptions:
1 2 3 4 5 6 7 8 9 10 11 12 | class AuthenticationError(DeclarativeException): pass class UsernameError(AuthenticationError): message = "no such user" class GenericUsernameError(AuthenticationError): message = "no such generic user" class PasswordError(AuthenticationError): message = "incorrect password" class AccountExpiredError(UsernameError): message = ( "This account has expired; " "please contact <CUSTOMER SERVICE ADDRESS>.") |
These all subclass DeclarativeException, which is just a convenience for setting the message:
1 2 3 4 5 6 7 | class DeclarativeException(Exception): """A simpler way to define an exception with a fixed message. """ message="" def __init__(self, message=None): Exception.__init__(self, message or self.message) |
If an authentication error is caught, treat it as a form validation error and redisplay the login form. Note how the exceptions subclass UsernameError or PasswordError so you know where to put the message.
After successful authentication, look for a user record in the database and any corresponding rules, and instantiate a User object based on those. If there is no user record, instantiate a default user object based on the user's LDAP properties. Set session["user"] to that. Then if session["after_login"] is set, clear it and redirect to that URL, otherwise redirect to a fixed URL such as the home page.
The following front-end authentication function chooses an authenticator based on the username syntax, and returns a User instance if successful, or raises an authentication error if not. db_user is the database record corresponding to that username if any. role is the role name. perms is a dict of permission name to boolean. division is another piece of info we need in the User object for an application-specific purpose. incident_rules is a set of application-specific Rules. All this is used to instantiate the User object. Of course, you may choose to structure your user object differently.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 | def authenticate(username, password): """Authenticate a user via LDAP or database, and return a ``User`` object. Raises ``AuthenticationError`` or subclass if the user is rejected. Raises ``EnvironmentError`` if the LDAP server could not be reached. If authentication is disabled in the config file, return the "Admin Bypass" user, who can do anything. """ # If authentication is disabled, use Admin Bypass. if not config["use_auth"]: return AdminBypassUser() # Here we assume a SQLAlchemy Session object, and an ORM class ``User``. # The ``.get`` method returns a User instance, or None if no such user. db_user = meta.Session.query(User).get(username) role = "" division = "" if username.startswith("%$"): generic_login_password = config.get("generic_login_password") if generic_login_password: if password != generic_login_password: raise PasswordError() else: raise UsernameError() role = username[2:] if role not in get_roles(): raise GenericUsernameError() elif username.endswith("@example.com"): log.debug("authenticating user '%s' via LDAP", username) properties = authenticate_ldap(username, password) role, division = process_ldap_properties(properties) if db_user: role = db_user.role elif not db_user: log.debug("database has no user '%s'", username) raise UsernameError() elif not db_user.password: log.error("user record '%s' in db has invalid password; pretending user doesn't exist") raise UsernameError() elif password != db_user.password: log.debug("database password for user '%s' does not match", username) raise PasswordError() else: role = db_user.role today = datetime.date.today() if db_user and db_user.expire and db_user.expire < today: raise AccountExpiredError() perms = get_permissions()[role] # Raises KeyError. incident_rules = get_incident_rules(username) user = User(username, perms, incident_rules, division) log.info("authenticated user %s", user) return user |
Here's the LDAP authentication code. We create a Distinguished Name string containing the username (without the domain suffix), and then try to log in as that user. We catch as many LDAP exceptions as we know about, and convert them to exceptions the rest of the code understands. If authentication succeeds, we return the user's LDAP properties.
LDAP_SERVER is a URL such as "ldaps://example.com:636". The certificate option is the absolute path of the server's SSL certificate, which Python-LDAP requires.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | def authenticate_ldap(username, password): """Authenticate a user via LDAP and return his/her LDAP properties. Raises AuthenticationError if the credentials are rejected, or EnvironmentError if the LDAP server can't be reached. """ import ldap uid = webhelpers.text.chop_at(username, "@example.com") dn = "uid=%s,ou=People,o=example.com" % uid log.debug("Authenticating %r at %s", dn, LDAP_SERVER) if "," in username: raise UsernameError("invalid character in username: ,") try: ldap.set_option(ldap.OPT_X_TLS_CACERTFILE, config["ldap.server_certificates"]) server = ldap.initialize(LDAP_SERVER) server.protocol = ldap.VERSION3 server.simple_bind_s(dn, password) properties = server.search_s(dn, ldap.SCOPE_SUBTREE) if not properties: raise ldap.NO_SUCH_OBJECT() except ldap.NO_SUCH_OBJECT, e: log.debug("LDAP says no such user '%s' (%s)", uid, username) raise UsernameError() except ldap.INVALID_CREDENTIALS, e: log.debug("LDAP rejected password for user '%s' (%s)", uid, username) raise PasswordError() except ldap.SERVER_DOWN, e: raise EnvironmentError("can't access authentication server") return properties |
Here's one way to calculate the user's role and division based on their LDAP properties:
1 2 3 4 5 6 7 8 9 10 11 12 13 | def process_ldap_properties(properties): ou = properties[0][1].get('ou', []) ou1 = properties[0][1].get('ou1', []) if "N/FOO" in ou1: return FOO_ROLE, FOO_DIVISION elif "N/BAR" in ou: try: division = DIVISIONS_MAP.get(ou1[0], DEFAULT_DIVISION) except IndexError: division = DEFAULT_DIVISION return SENIOR_PEON, division else: return PEON, "" |
Login controller
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 | class AuthController(BaseController): controller_require_perm = None # This controller allows public access. # Leave this as None to prevent an infinite redirect to the login page! def login(self): return self._login_form(None, None) @restrict("POST") @validate(schema=AuthenticateSchema(), form="login") def enter(self): username = self.form_result["username"] password = self.form_result["password"] errors = {} try: user = auth.authenticate(username, password) except auth.UsernameError, e: errors["username"] = str(e) except auth.PasswordError, e: errors["password"] = str(e) if errors: return self._login_form(self.form_result, errors) session["user"] = user session.save() request.environ["REMOTE_USER"] = user.username model_User.register_login(user.username) referer = session.pop("after_login", None) session.save() if not referer: log.debug("no referer found for login") #log.debug("redirecting to %s", referer) redirect_to(referer or "home") def logout(self): session["user"] = None session.delete() request.environ["REMOTE_USER"] = "" h.flash("You have been logged out.") redirect_to("login") #### Private methods def _login_form(self, defaults, errors): c.ht_title = "MyApplication Login" c.title = None c.crumbs = None c.action = url_for("enter") c.method = "post" html = render("/login.html") return htmlfill.render(html, defaults, errors) |
The register_login call is used to update the last_login date in the database after a successful login:
1 2 3 4 5 6 7 8 9 | class User(meta.ORMClass): _id_attribute = "username" @classmethod def register_login(class_, username): user = meta.Session(User).get(username) if user: user.last_login = datetime.date.today() meta.Session.commit() |
A class method is used because the user may not have a record in the database if they're a LDAP user without special permissions.
The FormEncode validator is pretty straightforward:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | import formencode from formencode import validators as v class Username(v.Regex): """Usernames may contain only alphanumeric characters, "_", "-", and ".". An optional "@" may appear anywhere except first and last position to form an email address. An undocumented "%$" is allowed in first position only. """ regex = R"^(%\$)?[\w.-]+(@[\w.-]+)?$" strip = True accept_python = True messages = { "invalid": """\ Only alphanumeric characters, "_", "-", "." and "@" are allowed.""" } class AuthenticateSchema(formencode.Schema): allow_extra_fields = True #filter_extra_fields = True strip = True username = Username(not_empty=True) password = v.String(not_empty=True) |
User info
Here's a nice way to display a bit of user information on every page. This prints "You are user X with Y permissions." if the account has Rules associated with it, put a "+" after the role name to indicate that the user sometimes has a higher effective role.
1 2 3 4 5 6 7 8 9 10 11 | <div id="userinfo"><span id="userinfo-inner"> <% user = session.get("user") %> % if user: You are user <strong>${user.username}</strong> with <strong>${user.perms["name"]}${"+" if user.incident_rules else ""}</strong> permissions. [<a href="${h.url_for('logout')}">Logout</a>] % else: You are not logged in. Your permissions are <strong>None</strong>. % endif user </span></div> |
The associated styles are:
#userinfo {
font-size: smaller;
text-align: right;
margin-top: 1em;
}
#userinfo-inner { /* Span inside userInfo div */
background-color: #c1ffc1; /* DarkSeaGreen1 */
}