CHANGES.txt | ●●●●● patch | view | raw | blame | history | |
docs/api/csrf.rst | ●●●●● patch | view | raw | blame | history | |
docs/narr/security.rst | ●●●●● patch | view | raw | blame | history | |
docs/narr/templates.rst | ●●●●● patch | view | raw | blame | history | |
pyramid/config/security.py | ●●●●● patch | view | raw | blame | history | |
pyramid/csrf.py | ●●●●● patch | view | raw | blame | history | |
pyramid/interfaces.py | ●●●●● patch | view | raw | blame | history | |
pyramid/session.py | ●●●●● patch | view | raw | blame | history | |
pyramid/tests/test_csrf.py | ●●●●● patch | view | raw | blame | history | |
pyramid/tests/test_util.py | ●●●●● patch | view | raw | blame | history |
CHANGES.txt
@@ -24,12 +24,13 @@ can be alleviated by invoking ``config.begin()`` and ``config.end()`` appropriately. See https://github.com/Pylons/pyramid/pull/2989 - A new CSRF implementation, :class:`pyramid.csrf.SessionCSRF` has been added, which delegates all CSRF generation to the current session, following the old API for this. A ``get_csrf_token()`` method is now available in template global scope, to make it easy for template developers to get the current CSRF token without adding it to Python code. See https://github.com/Pylons/pyramid/pull/2854 - A new CSRF implementation, ``pyramid.csrf.SessionCSRFStoragePolicy``, has been added which delegates all CSRF generation to the current session, following the old API for this. A ``pyramid.csrf.get_csrf_token()`` api is now available in template global scope, to make it easy for template developers to get the current CSRF token without adding it to Python code. See https://github.com/Pylons/pyramid/pull/2854 and https://github.com/Pylons/pyramid/pull/3019 Bug Fixes docs/api/csrf.rst
@@ -5,6 +5,9 @@ .. automodule:: pyramid.csrf .. autoclass:: LegacySessionCSRFStoragePolicy :members: .. autoclass:: SessionCSRFStoragePolicy :members: docs/narr/security.rst
@@ -824,6 +824,7 @@ into the session and returned. The newly created token will be opaque and randomized. .. _get_csrf_token_in_templates: Using the ``get_csrf_token`` global in templates ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ docs/narr/templates.rst
@@ -228,6 +228,10 @@ provided if the template is rendered as the result of a ``renderer=`` argument to the view configuration being used. ``get_csrf_token()`` A convenience function to access the current CSRF token. See :ref:`get_csrf_token_in_templates` for more information. ``renderer_name`` The renderer name used to perform the rendering, e.g., ``mypackage:templates/foo.pt``. pyramid/config/security.py
@@ -10,7 +10,7 @@ PHASE2_CONFIG, ) from pyramid.csrf import SessionCSRFStoragePolicy from pyramid.csrf import LegacySessionCSRFStoragePolicy from pyramid.exceptions import ConfigurationError from pyramid.util import action_method from pyramid.util import as_sorted_tuple @@ -19,7 +19,7 @@ class SecurityConfiguratorMixin(object): def add_default_security(self): self.set_csrf_storage_policy(SessionCSRFStoragePolicy()) self.set_csrf_storage_policy(LegacySessionCSRFStoragePolicy()) @action_method def set_authentication_policy(self, policy): @@ -232,16 +232,22 @@ @action_method def set_csrf_storage_policy(self, policy): """ Set the :term:`CSRF storage policy` used by subsequent view registrations. Set the :term:`CSRF storage policy` used by subsequent view registrations. ``policy`` is a class that implements the :meth:`pyramid.interfaces.ICSRFStoragePolicy` interface that will be used for all CSRF functionality. :meth:`pyramid.interfaces.ICSRFStoragePolicy` interface and defines how to generate and persist CSRF tokens. """ def register(): self.registry.registerUtility(policy, ICSRFStoragePolicy) self.action(ICSRFStoragePolicy, register) intr = self.introspectable('csrf storage policy', None, policy, 'csrf storage policy') intr['policy'] = policy self.action(ICSRFStoragePolicy, register, introspectables=(intr,)) @implementer(IDefaultCSRFOptions) pyramid/csrf.py
@@ -7,8 +7,9 @@ from pyramid.authentication import _SimpleSerializer from pyramid.compat import ( bytes_, urlparse, bytes_ text_, ) from pyramid.exceptions import ( BadCSRFOrigin, @@ -23,44 +24,79 @@ @implementer(ICSRFStoragePolicy) class SessionCSRFStoragePolicy(object): """ The default CSRF implementation, which mimics the behavior from older versions of Pyramid. The ``new_csrf_token`` and ``get_csrf_token`` methods are indirected to the underlying session implementation. class LegacySessionCSRFStoragePolicy(object): """ A CSRF storage policy that defers control of CSRF storage to the session. This policy maintains compatibility with legacy ISession implementations that know how to manage CSRF tokens themselves via ``ISession.new_csrf_token`` and ``ISession.get_csrf_token``. Note that using this CSRF implementation requires that a :term:`session factory` is configured. .. versionadded :: 1.9 .. versionadded:: 1.9 """ def new_csrf_token(self, request): """ Sets a new CSRF token into the session and returns it. """ return request.session.new_csrf_token() def get_csrf_token(self, request): """ Returns the currently active CSRF token from the session, generating a new one if needed.""" """ Returns the currently active CSRF token from the session, generating a new one if needed.""" return request.session.get_csrf_token() def check_csrf_token(self, request, supplied_token): """ Returns ``True`` if ``supplied_token is`` the same value as ``get_csrf_token(request)``.""" expected = self.get_csrf_token(request) return not strings_differ( bytes_(expected, 'ascii'), bytes_(supplied_token, 'ascii'), ) @implementer(ICSRFStoragePolicy) class SessionCSRFStoragePolicy(object): """ A CSRF storage policy that persists the CSRF token in the session. Note that using this CSRF implementation requires that a :term:`session factory` is configured. ``key`` The session key where the CSRF token will be stored. Default: `_csrft_`. .. versionadded:: 1.9 """ _token_factory = staticmethod(lambda: text_(uuid.uuid4().hex)) def __init__(self, key='_csrft_'): self.key = key def new_csrf_token(self, request): """ Sets a new CSRF token into the session and returns it. """ token = self._token_factory() request.session[self.key] = token return token def get_csrf_token(self, request): """ Returns the currently active CSRF token from the session, generating a new one if needed.""" token = request.session.get(self.key, None) if not token: token = self.new_csrf_token(request) return token @implementer(ICSRFStoragePolicy) class CookieCSRFStoragePolicy(object): """ An alternative CSRF implementation that stores its information in unauthenticated cookies, known as the 'Double Submit Cookie' method in the `OWASP CSRF guidelines <https://www.owasp.org/index.php/Cross-Site_Request_Forgery_(CSRF)_Prevention_Cheat_Sheet#Double_Submit_Cookie>`_. This gives some additional flexibility with regards to scaling as the tokens can be generated and verified by a front-end server. `OWASP CSRF guidelines <https://www.owasp.org/index.php/ Cross-Site_Request_Forgery_(CSRF)_Prevention_Cheat_Sheet# Double_Submit_Cookie>`_. This gives some additional flexibility with regards to scaling as the tokens can be generated and verified by a front-end server. .. versionadded :: 1.9 .. versionadded:: 1.9 """ _token_factory = staticmethod(lambda: text_(uuid.uuid4().hex)) def __init__(self, cookie_name='csrf_token', secure=False, httponly=False, domain=None, max_age=None, path='/'): @@ -71,13 +107,15 @@ max_age=max_age, httponly=httponly, path=path, domains=[domain], serializer=serializer ) self.domain = domain self.cookie_name = cookie_name def new_csrf_token(self, request): """ Sets a new CSRF token into the request and returns it. """ token = uuid.uuid4().hex token = self._token_factory() request.cookies[self.cookie_name] = token def set_cookie(request, response): self.cookie_profile.set_cookies( response, @@ -95,14 +133,6 @@ token = self.new_csrf_token(request) return token def check_csrf_token(self, request, supplied_token): """ Returns ``True`` if ``supplied_token is`` the same value as ``get_csrf_token(request)``.""" expected = self.get_csrf_token(request) return not strings_differ( bytes_(expected, 'ascii'), bytes_(supplied_token, 'ascii'), ) def get_csrf_token(request): """ Get the currently active CSRF token for the request passed, generating @@ -133,8 +163,8 @@ header='X-CSRF-Token', raises=True): """ Check the CSRF token returned by the :class:`pyramid.interfaces.ICSRFStoragePolicy` implementation against the value in ``request.POST.get(token)`` (if a POST request) or :class:`pyramid.interfaces.ICSRFStoragePolicy` implementation against the value in ``request.POST.get(token)`` (if a POST request) or ``request.headers.get(header)``. If a ``token`` keyword is not supplied to this function, the string ``csrf_token`` will be used to look up the token in ``request.POST``. If a ``header`` keyword is not supplied to this @@ -143,11 +173,12 @@ If the value supplied by post or by header doesn't match the value supplied by ``policy.get_csrf_token()`` (where ``policy`` is an implementation of :class:`pyramid.interfaces.ICSRFStoragePolicy`), and ``raises`` is ``True``, this function will raise an :exc:`pyramid.exceptions.BadCSRFToken` exception. If the values differ and ``raises`` is ``False``, this function will return ``False``. If the CSRF check is successful, this function will return ``True`` unconditionally. :class:`pyramid.interfaces.ICSRFStoragePolicy`), and ``raises`` is ``True``, this function will raise an :exc:`pyramid.exceptions.BadCSRFToken` exception. If the values differ and ``raises`` is ``False``, this function will return ``False``. If the CSRF check is successful, this function will return ``True`` unconditionally. See :ref:`auto_csrf_checking` for information about how to secure your application automatically against CSRF attacks. @@ -176,8 +207,8 @@ if supplied_token == "" and token is not None: supplied_token = request.POST.get(token, "") policy = request.registry.queryUtility(ICSRFStoragePolicy) if not policy.check_csrf_token(request, supplied_token): expected_token = get_csrf_token(request) if strings_differ(bytes_(expected_token), bytes_(supplied_token)): if raises: raise BadCSRFToken('check_csrf_token(): Invalid token') return False pyramid/interfaces.py
@@ -927,6 +927,13 @@ usually accessed via ``request.session``. Keys and values of a session must be pickleable. .. versionchanged:: 1.9 Sessions are no longer required to implement ``get_csrf_token`` and ``new_csrf_token``. CSRF token support was moved to the pluggable :class:`pyramid.interfaces.ICSRFStoragePolicy` configuration hook. """ # attributes @@ -984,24 +991,23 @@ class ICSRFStoragePolicy(Interface): """ An object that offers the ability to verify CSRF tokens and generate new ones""" new ones.""" def new_csrf_token(request): """ Create and return a new, random cross-site request forgery protection token. Return the token. It will be a string.""" """ Create and return a new, random cross-site request forgery protection token. The token will be an ascii-compatible unicode string. """ def get_csrf_token(request): """ Return a cross-site request forgery protection token. It will be a string. If a token was previously set for this user via ``new_csrf_token``, that token will be returned. If no CSRF token was previously set, ``new_csrf_token`` will be called, which will create and set a token, and this token will be returned. """ will be an ascii-compatible unicode string. If a token was previously set for this user via ``new_csrf_token``, that token will be returned. If no CSRF token was previously set, ``new_csrf_token`` will be called, which will create and set a token, and this token will be returned. def check_csrf_token(request, supplied_token): """ Returns a boolean that represents if ``supplied_token`` is a valid CSRF token for this request. Comparing strings for equality must be done using :func:`pyramid.utils.strings_differ` to avoid timing attacks. """ pyramid/session.py
@@ -17,6 +17,10 @@ bytes_, native_, ) from pyramid.csrf import ( check_csrf_origin, check_csrf_token, ) from pyramid.interfaces import ISession from pyramid.util import strings_differ @@ -608,3 +612,13 @@ reissue_time=reissue_time, set_on_exception=set_on_exception, ) check_csrf_origin = check_csrf_origin # api deprecated('check_csrf_origin', 'pyramid.session.check_csrf_origin is deprecated as of Pyramid ' '1.9. Use pyramid.csrf.check_csrf_origin instead.') check_csrf_token = check_csrf_token # api deprecated('check_csrf_token', 'pyramid.session.check_csrf_token is deprecated as of Pyramid ' '1.9. Use pyramid.csrf.check_csrf_token instead.') pyramid/tests/test_csrf.py
@@ -49,7 +49,7 @@ self.assertEquals(csrf_token, 'e5e9e30a08b34ff9842ff7d2b958c14b') class TestSessionCSRFStoragePolicy(unittest.TestCase): class TestLegacySessionCSRFStoragePolicy(unittest.TestCase): class MockSession(object): def new_csrf_token(self): return 'e5e9e30a08b34ff9842ff7d2b958c14b' @@ -58,11 +58,11 @@ return '02821185e4c94269bdc38e6eeae0a2f8' def _makeOne(self): from pyramid.csrf import SessionCSRFStoragePolicy return SessionCSRFStoragePolicy() from pyramid.csrf import LegacySessionCSRFStoragePolicy return LegacySessionCSRFStoragePolicy() def test_register_session_csrf_policy(self): from pyramid.csrf import SessionCSRFStoragePolicy from pyramid.csrf import LegacySessionCSRFStoragePolicy from pyramid.interfaces import ICSRFStoragePolicy config = Configurator() @@ -71,7 +71,7 @@ policy = config.registry.queryUtility(ICSRFStoragePolicy) self.assertTrue(isinstance(policy, SessionCSRFStoragePolicy)) self.assertTrue(isinstance(policy, LegacySessionCSRFStoragePolicy)) def test_session_csrf_implementation_delegates_to_session(self): policy = self._makeOne() @@ -86,26 +86,46 @@ 'e5e9e30a08b34ff9842ff7d2b958c14b' ) def test_verifying_token_invalid(self): class TestSessionCSRFStoragePolicy(unittest.TestCase): def _makeOne(self, **kw): from pyramid.csrf import SessionCSRFStoragePolicy return SessionCSRFStoragePolicy(**kw) def test_register_session_csrf_policy(self): from pyramid.csrf import SessionCSRFStoragePolicy from pyramid.interfaces import ICSRFStoragePolicy config = Configurator() config.set_csrf_storage_policy(self._makeOne()) config.commit() policy = config.registry.queryUtility(ICSRFStoragePolicy) self.assertTrue(isinstance(policy, SessionCSRFStoragePolicy)) def test_it_creates_a_new_token(self): request = DummyRequest(session={}) policy = self._makeOne() request = DummyRequest(session=self.MockSession()) policy._token_factory = lambda: 'foo' self.assertEqual(policy.get_csrf_token(request), 'foo') result = policy.check_csrf_token(request, 'invalid-token') self.assertFalse(result) def test_get_csrf_token_returns_the_new_token(self): request = DummyRequest(session={'_csrft_': 'foo'}) def test_verifying_token_valid(self): policy = self._makeOne() request = DummyRequest(session=self.MockSession()) self.assertEqual(policy.get_csrf_token(request), 'foo') result = policy.check_csrf_token( request, '02821185e4c94269bdc38e6eeae0a2f8') self.assertTrue(result) token = policy.new_csrf_token(request) self.assertNotEqual(token, 'foo') self.assertEqual(token, policy.get_csrf_token(request)) class TestCookieCSRFStoragePolicy(unittest.TestCase): def _makeOne(self): def _makeOne(self, **kw): from pyramid.csrf import CookieCSRFStoragePolicy return CookieCSRFStoragePolicy() return CookieCSRFStoragePolicy(**kw) def test_register_cookie_csrf_policy(self): from pyramid.csrf import CookieCSRFStoragePolicy @@ -121,18 +141,18 @@ def test_get_cookie_csrf_with_no_existing_cookie_sets_cookies(self): response = MockResponse() request = DummyRequest(response=response) request = DummyRequest() policy = self._makeOne() token = policy.get_csrf_token(request) request.response_callback(request, response) self.assertEqual( response.headerlist, [('Set-Cookie', 'csrf_token={}; Path=/'.format(token))] ) def test_existing_cookie_csrf_does_not_set_cookie(self): response = MockResponse() request = DummyRequest(response=response) request = DummyRequest() request.cookies = {'csrf_token': 'e6f325fee5974f3da4315a8ccf4513d2'} policy = self._makeOne() @@ -142,42 +162,32 @@ token, 'e6f325fee5974f3da4315a8ccf4513d2' ) self.assertEqual( response.headerlist, [], ) self.assertIsNone(request.response_callback) def test_new_cookie_csrf_with_existing_cookie_sets_cookies(self): response = MockResponse() request = DummyRequest(response=response) request = DummyRequest() request.cookies = {'csrf_token': 'e6f325fee5974f3da4315a8ccf4513d2'} policy = self._makeOne() token = policy.new_csrf_token(request) response = MockResponse() request.response_callback(request, response) self.assertEqual( response.headerlist, [('Set-Cookie', 'csrf_token={}; Path=/'.format(token))] ) def test_verifying_token_invalid_token(self): response = MockResponse() request = DummyRequest(response=response) request.cookies = {'csrf_token': 'e6f325fee5974f3da4315a8ccf4513d2'} def test_get_csrf_token_returns_the_new_token(self): request = DummyRequest() request.cookies = {'csrf_token': 'foo'} policy = self._makeOne() self.assertFalse( policy.check_csrf_token(request, 'invalid-token') ) self.assertEqual(policy.get_csrf_token(request), 'foo') def test_verifying_token_against_existing_cookie(self): response = MockResponse() request = DummyRequest(response=response) request.cookies = {'csrf_token': 'e6f325fee5974f3da4315a8ccf4513d2'} policy = self._makeOne() self.assertTrue( policy.check_csrf_token(request, 'e6f325fee5974f3da4315a8ccf4513d2') ) token = policy.new_csrf_token(request) self.assertNotEqual(token, 'foo') self.assertEqual(token, policy.get_csrf_token(request)) class Test_check_csrf_token(unittest.TestCase): @@ -223,14 +233,6 @@ request = testing.DummyRequest() result = self._callFUT(request, 'csrf_token', raises=False) self.assertEqual(result, False) def test_token_differing_types(self): from pyramid.compat import text_ request = testing.DummyRequest() request.method = "POST" request.session['_csrft_'] = text_('foo') request.POST = {'csrf_token': b'foo'} self.assertEqual(self._callFUT(request, token='csrf_token'), True) class Test_check_csrf_token_without_defaults_configured(unittest.TestCase): @@ -353,15 +355,15 @@ class DummyRequest(object): registry = None session = None cookies = {} response_callback = None def __init__(self, registry=None, session=None, response=None): def __init__(self, registry=None, session=None): self.registry = registry self.session = session self.response = response self.cookies = {} def add_response_callback(self, callback): callback(self, self.response) self.response_callback = callback class MockResponse(object): pyramid/tests/test_util.py
@@ -369,12 +369,16 @@ from pyramid.util import strings_differ return strings_differ(*args, **kw) def test_it(self): def test_it_bytes(self): self.assertFalse(self._callFUT(b'foo', b'foo')) self.assertTrue(self._callFUT(b'123', b'345')) self.assertTrue(self._callFUT(b'1234', b'123')) self.assertTrue(self._callFUT(b'123', b'1234')) def test_it_native_str(self): self.assertFalse(self._callFUT('123', '123')) self.assertTrue(self._callFUT('123', '1234')) def test_it_with_internal_comparator(self): result = self._callFUT(b'foo', b'foo', compare_digest=None) self.assertFalse(result)