diff options
author | Michał Górny <mgorny@gentoo.org> | 2013-08-25 11:46:08 +0200 |
---|---|---|
committer | Michał Górny <mgorny@gentoo.org> | 2013-08-25 22:45:29 +0200 |
commit | 25a1e361e499590fac0c28cb6e366da6935871c7 (patch) | |
tree | 4e26d4dc3e76dd7ab5b72ba08f43358e5479298b | |
parent | Store SSH handler list in settings. (diff) | |
download | identity.gentoo.org-25a1e361e499590fac0c28cb6e366da6935871c7.tar.gz identity.gentoo.org-25a1e361e499590fac0c28cb6e366da6935871c7.tar.bz2 identity.gentoo.org-25a1e361e499590fac0c28cb6e366da6935871c7.zip |
Add tests for SSH handlers.
-rw-r--r-- | okupy/common/ssh.py | 3 | ||||
-rw-r--r-- | okupy/tests/unit/test_auth.py | 40 | ||||
-rw-r--r-- | okupy/tests/unit/test_ssh.py | 189 |
3 files changed, 218 insertions, 14 deletions
diff --git a/okupy/common/ssh.py b/okupy/common/ssh.py index 5969b19..1c4a49a 100644 --- a/okupy/common/ssh.py +++ b/okupy/common/ssh.py @@ -31,7 +31,8 @@ class SSHServer(paramiko.ServerInterface): return 'publickey' def check_auth_publickey(self, username, key): - # for some reason, this is called twice... + # for some reason, this is called twice... therefore, we need + # to cache the result since token will be revoked on first use if self._message: return paramiko.AUTH_SUCCESSFUL diff --git a/okupy/tests/unit/test_auth.py b/okupy/tests/unit/test_auth.py index 97c7ddb..7c445f6 100644 --- a/okupy/tests/unit/test_auth.py +++ b/okupy/tests/unit/test_auth.py @@ -4,21 +4,17 @@ from mockldap import MockLdap from django.conf import settings from django.contrib.auth import authenticate +from django.test import TestCase from .. import vars -from ...common.test_helpers import OkupyTestCase, set_request +from ...common.test_helpers import ldap_users, set_request import base64 import paramiko -def get_ssh_key(person, number=0): - keystr = person['sshPublicKey'][number] - return base64.b64decode(keystr.split()[1]) - - -class AuthUnitTests(OkupyTestCase): +class AuthSSLUnitTests(TestCase): @classmethod def setUpClass(cls): cls.mockldap = MockLdap(vars.DIRECTORY) @@ -70,21 +66,39 @@ class AuthUnitTests(OkupyTestCase): u = authenticate(request=request) self.assertIs(u, None) + +class AuthSSHUnitTests(TestCase): + @classmethod + def setUpClass(cls): + cls.mockldap = MockLdap(vars.DIRECTORY) + + def setUp(self): + self.mockldap.start() + self.ldapobject = self.mockldap[settings.AUTH_LDAP_SERVER_URI] + + def tearDown(self): + self.mockldap.stop() + + @staticmethod + def get_ssh_key(person, number=0): + keystr = person['sshPublicKey'][number] + return base64.b64decode(keystr.split()[1]) + def test_valid_rsa_ssh_key_authenticates_alice(self): - alice = vars.DIRECTORY['uid=alice,ou=people,o=test'] - key = paramiko.RSAKey(data=get_ssh_key(alice)) + dn, alice = ldap_users('alice') + key = paramiko.RSAKey(data=self.get_ssh_key(alice)) u = authenticate(ssh_key=key) self.assertEqual(u.username, alice['uid'][0]) def test_valid_dss_ssh_key_authenticates_bob(self): - bob = vars.DIRECTORY['uid=bob,ou=people,o=test'] - key = paramiko.DSSKey(data=get_ssh_key(bob, 1)) + dn, bob = ldap_users('bob') + key = paramiko.DSSKey(data=self.get_ssh_key(bob, 1)) u = authenticate(ssh_key=key) self.assertEqual(u.username, bob['uid'][0]) def test_valid_rsa_key_with_comment_authenticates_bob(self): - bob = vars.DIRECTORY['uid=bob,ou=people,o=test'] - key = paramiko.RSAKey(data=get_ssh_key(bob)) + dn, bob = ldap_users('bob') + key = paramiko.RSAKey(data=self.get_ssh_key(bob)) u = authenticate(ssh_key=key) self.assertEqual(u.username, bob['uid'][0]) diff --git a/okupy/tests/unit/test_ssh.py b/okupy/tests/unit/test_ssh.py new file mode 100644 index 0000000..915be6a --- /dev/null +++ b/okupy/tests/unit/test_ssh.py @@ -0,0 +1,189 @@ +# vim:fileencoding=utf8:et:ts=4:sts=4:sw=4:ft=python + +from django.conf import settings +from django.test import TestCase +from django.test.utils import override_settings + +import base64 +import socket + +import paramiko + +from ..vars import TEST_SSH_KEY_FOR_NO_USER +from ...common.ssh import ssh_handler, SSHServer +from ...common.exceptions import OkupyError + + +@override_settings(SSH_HANDLERS={}) +class SSHUnitTests(TestCase): + def setUp(self): + self._key = paramiko.RSAKey( + data=base64.b64decode(TEST_SSH_KEY_FOR_NO_USER)) + self._server = SSHServer() + + def test_ssh_handler_decorator_works(self): + @ssh_handler + def test(key): + pass + + self.assertEqual(settings.SSH_HANDLERS.get('test'), test) + + def test_noarg_handler_works(self): + @ssh_handler + def noarg(key): + return 'yay' + + self.assertEqual( + self._server.check_auth_publickey('noarg', self._key), + paramiko.AUTH_SUCCESSFUL) + + def test_failure_is_propagated_properly(self): + @ssh_handler + def failing(key): + return None + + self.assertEqual( + self._server.check_auth_publickey('failing', self._key), + paramiko.AUTH_FAILED) + + def test_argument_splitting_works(self): + @ssh_handler + def twoarg(a, b, key): + if a == '1' and b == '2': + return 'yay' + else: + return None + + self.assertEqual( + self._server.check_auth_publickey('twoarg+1+2', self._key), + paramiko.AUTH_SUCCESSFUL) + + def test_default_arguments_work(self): + @ssh_handler + def oneortwoarg(a, b='3', key=None): + if not key: + raise ValueError('key must not be None') + if a == '1' and b == '3': + return 'yay' + else: + return None + + self.assertEqual( + self._server.check_auth_publickey('oneortwoarg+1', self._key), + paramiko.AUTH_SUCCESSFUL) + + def test_wrong_command_returns_failure(self): + @ssh_handler + def somehandler(key): + return 'er?' + + self.assertEqual( + self._server.check_auth_publickey('otherhandler', self._key), + paramiko.AUTH_FAILED) + + def test_missing_arguments_return_failure(self): + @ssh_handler + def onearg(arg, key): + return 'er?' + + self.assertEqual( + self._server.check_auth_publickey('onearg', self._key), + paramiko.AUTH_FAILED) + + def test_too_many_arguments_return_failure(self): + @ssh_handler + def onearg(arg, key): + return 'er?' + + self.assertEqual( + self._server.check_auth_publickey('onearg+1+2', self._key), + paramiko.AUTH_FAILED) + + def test_typeerror_is_propagated_properly(self): + @ssh_handler + def onearg(key): + raise TypeError + + self.assertRaises(TypeError, + self._server.check_auth_publickey, 'onearg', self._key) + + def test_result_caching_works(self): + class Cache(object): + def __init__(self): + self.first = True + + def __call__(self, key): + if self.first: + self.first = False + return 'yay' + else: + return None + + cache = Cache() + @ssh_handler + def cached(key): + return cache(key) + + if (self._server.check_auth_publickey('cached', self._key) + != paramiko.AUTH_SUCCESSFUL): + raise OkupyError('Test prerequisite failed') + self.assertEqual( + self._server.check_auth_publickey('cached', self._key), + paramiko.AUTH_SUCCESSFUL) + + def test_message_is_printed_to_exec_request(self): + @ssh_handler + def noarg(key): + return 'test-message' + + if (self._server.check_auth_publickey('noarg', self._key) + != paramiko.AUTH_SUCCESSFUL): + raise OkupyError('Test prerequisite failed') + + s1, s2 = socket.socketpair() + self.assertTrue(self._server.check_channel_exec_request(s1, ':')) + self.assertEqual(s2.makefile().read().rstrip(), 'test-message') + + def test_message_is_printed_to_shell_request(self): + @ssh_handler + def noarg(key): + return 'test-message' + + if (self._server.check_auth_publickey('noarg', self._key) + != paramiko.AUTH_SUCCESSFUL): + raise OkupyError('Test prerequisite failed') + + s1, s2 = socket.socketpair() + self.assertTrue(self._server.check_channel_shell_request(s1)) + self.assertEqual(s2.makefile().read().rstrip(), 'test-message') + + def test_cache_is_invalidated_after_channel_request(self): + class Cache(object): + def __init__(self): + self.first = True + + def __call__(self, key): + if self.first: + self.first = False + return 'test-message' + else: + return None + + cache = Cache() + @ssh_handler + def cached(key): + return cache(key) + + if (self._server.check_auth_publickey('cached', self._key) + != paramiko.AUTH_SUCCESSFUL): + raise OkupyError('Test prerequisite failed') + + s1, s2 = socket.socketpair() + if not self._server.check_channel_shell_request(s1): + raise OkupyError('Test prerequisite failed') + if s2.makefile().read().rstrip() != 'test-message': + raise OkupyError('Test prerequisite failed') + + self.assertEqual( + self._server.check_auth_publickey('cached', self._key), + paramiko.AUTH_FAILED) |