U
    
W[\                     @   s  d Z ddlmZmZ ddlmZ ddlmZ ddlm	Z	m
Z
 ddlmZ ddlmZ ddlmZmZ dd	lmZmZ dd
lmZmZ ddlmZ ddlmZ ddlmZ ddlmZ edredrddl m!Z! ddl"m#Z# ddl$m%Z%m&Z&m'Z' ddl(m)Z) n dZ%G dd dZ'G dd dZ&G dd de&j*Z+G dd de&j*Z,G dd de&j*Z-G d d! d!e'j.Z/eeG d"d# d#e0Z1eeG d$d% d%e0Z2eeG d&d' d'e0Z3eeG d(d) d)e0Z4G d*d+ d+ej5Z6G d,d- d-ej5Z7G d.d/ d/ej5Z8G d0d1 d1ej5Z9dS )2zT
Tests for the implementation of the ssh-userauth service.

Maintainer: Paul Swartz
    )absolute_importdivision)implementer)ICredentialsChecker)IUsernamePasswordISSHPrivateKey)
IAnonymous)UnauthorizedLogin)IRealmPortal)
ConchErrorValidPublicKey)defertask)loopback)requireModule)unittest)	_bytesChrZcryptographyZpyasn1)NS)SSHProtocolChecker)keysuserauth	transport)keydataNc                   @   s   e Zd ZG dd dZdS )r   c                   @   s   e Zd ZdZdS )ztransport.SSHTransportBaseQ
            A stub class so that later class definitions won't die.
            N__name__
__module____qualname____doc__ r    r    B/usr/lib/python3/dist-packages/twisted/conch/test/test_userauth.pySSHTransportBase$   s   r"   N)r   r   r   r"   r    r    r    r!   r   #   s   r   c                   @   s   e Zd ZG dd dZdS )r   c                   @   s   e Zd ZdZdS )zuserauth.SSHUserAuthClientr   Nr   r    r    r    r!   SSHUserAuthClient*   s   r#   N)r   r   r   r#   r    r    r    r!   r   )   s   r   c                   @   s2   e Zd ZdZdd Zdd ZdddZd	d
 ZdS )ClientUserAuthz"
    A mock user auth client.
    c                 C   s,   | j rtjtjS ttjtjS dS )z
        If this is the first time we've been called, return a blob for
        the DSA key.  Otherwise, return a blob
        for the RSA key.
        N)	ZlastPublicKeyr   Key
fromStringr   publicRSA_opensshr   succeedpublicDSA_opensshselfr    r    r!   getPublicKey6   s
    zClientUserAuth.getPublicKeyc                 C   s   t tjtjS )z@
        Return the private key object for the RSA key.
        )r   r(   r   r%   r&   r   privateRSA_opensshr*   r    r    r!   getPrivateKeyC   s    zClientUserAuth.getPrivateKeyNc                 C   s
   t dS )z/
        Return 'foo' as the password.
           foor   r(   )r+   promptr    r    r!   getPasswordJ   s    zClientUserAuth.getPasswordc                 C   s
   t dS )z>
        Return 'foo' as the answer to two questions.
        )foor3   r0   )r+   nameZinformationZanswersr    r    r!   getGenericAnswersQ   s    z ClientUserAuth.getGenericAnswers)N)r   r   r   r   r,   r.   r2   r5   r    r    r    r!   r$   1   s
   
r$   c                   @   s    e Zd ZdZdd Zdd ZdS )OldClientAuthz~
    The old SSHUserAuthClient returned a cryptography key object from
    getPrivateKey() and a string from getPublicKey
    c                 C   s   t tjtjjS N)r   r(   r   r%   r&   r   r-   Z	keyObjectr*   r    r    r!   r.   _   s    
zOldClientAuth.getPrivateKeyc                 C   s   t jtj S r7   )r   r%   r&   r   r'   blobr*   r    r    r!   r,   d   s    zOldClientAuth.getPublicKeyNr   r   r   r   r.   r,   r    r    r    r!   r6   Y   s   r6   c                   @   s    e Zd ZdZdd Zdd ZdS )ClientAuthWithoutPrivateKeyzP
    This client doesn't have a private key, but it does have a public key.
    c                 C   s   d S r7   r    r*   r    r    r!   r.   n   s    z)ClientAuthWithoutPrivateKey.getPrivateKeyc                 C   s   t jtjS r7   )r   r%   r&   r   r'   r*   r    r    r!   r,   r   s    z(ClientAuthWithoutPrivateKey.getPublicKeyNr9   r    r    r    r!   r:   i   s   r:   c                   @   sP   e Zd ZdZG dd deZG dd deZdd Zdd	 Zd
d Z	dd Z
dS )FakeTransporta_  
    L{userauth.SSHUserAuthServer} expects an SSH transport which has a factory
    attribute which has a portal attribute. Because the portal is important for
    testing authentication, we need to be able to provide an interesting portal
    object to the L{SSHUserAuthServer}.

    In addition, we want to be able to capture any packets sent over the
    transport.

    @ivar packets: a list of 2-tuples: (messageType, data).  Each 2-tuple is
        a sent packet.
    @type packets: C{list}
    @param lostConnecion: True if loseConnection has been called on us.
    @type lostConnection: L{bool}
    c                   @   s   e Zd ZdZdZdd ZdS )zFakeTransport.ServicezW
        A mock service, representing the other service offered by the server.
           nancyc                 C   s   d S r7   r    r*   r    r    r!   serviceStarted   s    z$FakeTransport.Service.serviceStartedN)r   r   r   r   r4   r=   r    r    r    r!   Service   s   r>   c                   @   s   e Zd ZdZdd ZdS )zFakeTransport.Factoryzg
        A mock factory, representing the factory that spawned this user auth
        service.
        c                 C   s   |dkrt jS dS )z2
            Return our fake service.
               noneN)r;   r>   )r+   r   servicer    r    r!   
getService   s    z FakeTransport.Factory.getServiceN)r   r   r   r   rA   r    r    r    r!   Factory   s   rB   c                 C   s(   |   | _|| j_d| _| | _g | _d S NF)rB   factoryportallostConnectionr   packets)r+   rE   r    r    r!   __init__   s
    
zFakeTransport.__init__c                 C   s   | j ||f dS )z8
        Record the packet sent by the service.
        N)rG   append)r+   ZmessageTypemessager    r    r!   
sendPacket   s    zFakeTransport.sendPacketc                 C   s   dS )z
        Pretend that this transport encrypts traffic in both directions. The
        SSHUserAuthServer disables password authentication if the transport
        isn't encrypted.
        Tr    )r+   	directionr    r    r!   isEncrypted   s    zFakeTransport.isEncryptedc                 C   s
   d| _ d S NT)rF   r*   r    r    r!   loseConnection   s    zFakeTransport.loseConnectionN)r   r   r   r   objectr>   rB   rH   rK   rM   rO   r    r    r    r!   r;   w   s   	r;   c                   @   s   e Zd ZdZdd ZdS )Realmz
    A mock realm for testing L{userauth.SSHUserAuthServer}.

    This realm is not actually used in the course of testing, so it returns the
    simplest thing that could possibly work.
    c                 G   s   t |d d dd fS )Nr   c                   S   s   d S r7   r    r    r    r    r!   <lambda>       z%Realm.requestAvatar.<locals>.<lambda>r0   )r+   ZavatarIdZmindZ
interfacesr    r    r!   requestAvatar   s    zRealm.requestAvatarN)r   r   r   r   rT   r    r    r    r!   rQ      s   rQ   c                   @   s   e Zd ZdZefZdd ZdS )PasswordCheckerz
    A very simple username/password checker which authenticates anyone whose
    password matches their username and rejects all others.
    c                 C   s&   |j |jkrt|j S ttdS )NzInvalid username/password pair)usernameZpasswordr   r(   failr	   )r+   credsr    r    r!   requestAvatarId   s    zPasswordChecker.requestAvatarIdN)r   r   r   r   r   credentialInterfacesrY   r    r    r    r!   rU      s   rU   c                   @   s   e Zd ZdZefZdd ZdS )PrivateKeyCheckerz
    A very simple public key checker which authenticates anyone whose
    public/private keypair is the same keydata.public/privateRSA_openssh.
    c                 C   sX   |j tjtj  krN|jd k	rHtj|j }||j|jrN|j	S nt
 t d S r7   )r8   r   r%   r&   r   r'   	signatureZverifysigDatarV   r   r	   )r+   rX   objr    r    r!   rY      s    
z!PrivateKeyChecker.requestAvatarIdN)r   r   r   r   r   rZ   rY   r    r    r    r!   r[      s   r[   c                   @   s   e Zd ZdZefZdS )AnonymousCheckerzI
    A simple checker which isn't supported by L{SSHUserAuthServer}.
    N)r   r   r   r   r   rZ   r    r    r    r!   r_      s   r_   c                   @   s   e Zd ZdZedkrdZdd Zdd Zdd	 Zd
d Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zd d! Zd"d# Zd$d% Zd&d' Zd(d) Zd*d+ ZdS ),SSHUserAuthServerTestsz&
    Tests for SSHUserAuthServer.
    Ncannot run without cryptographyc                 C   sb   t  | _t| j| _| jt  | jt  t | _	t
| j| j	_| j	  | j	j  d S r7   )rQ   realmr   rE   registerCheckerrU   r[   r   SSHUserAuthServer
authServerr;   r   r=   supportedAuthenticationssortr*   r    r    r!   setUp  s    

zSSHUserAuthServerTests.setUpc                 C   s   | j   d | _ d S r7   )re   serviceStoppedr*   r    r    r!   tearDown  s    
zSSHUserAuthServerTests.tearDownc                 C   s(   |  | jjjd tjtdd f dS )z;
        Check that the authentication has failed.
        s   password,publickey    N)assertEqualre   r   rG   r   ZMSG_USERAUTH_FAILUREr   r+   ignoredr    r    r!   _checkFailed  s
    
z#SSHUserAuthServerTests._checkFailedc                 C   s,   | j tdtd td }|| jS )z
        A client may request a list of authentication 'method name' values
        that may continue by using the "none" authentication 'method name'.

        See RFC 4252 Section 5.2.
        r/   s   servicer?   )re   ssh_USERAUTH_REQUESTr   addCallbackrp   )r+   dr    r    r!   test_noneAuthentication  s    z.SSHUserAuthServerTests.test_noneAuthenticationc                    sJ   d tdtdtdtdtdg} j|} fdd}||S )z
        When provided with correct password authentication information, the
        server should respond by sending a MSG_USERAUTH_SUCCESS message with
        no other data.

        See RFC 4252, Section 5.1.
        rS   r/   r?      passwordr   c                    s      jjjtjdfg d S NrS   rm   re   r   rG   r   ZMSG_USERAUTH_SUCCESSro   r*   r    r!   check2  s    
zKSSHUserAuthServerTests.test_successfulPasswordAuthentication.<locals>.check)joinr   chrre   rq   rr   r+   packetrs   ry   r    r*   r!   %test_successfulPasswordAuthentication'  s    z<SSHUserAuthServerTests.test_successfulPasswordAuthenticationc                 C   sl   d tdtdtdtdtdg}t | j_| j|}| | jj	j
g  | jjd || jS )a;  
        When provided with invalid authentication details, the server should
        respond by sending a MSG_USERAUTH_FAILURE message which states whether
        the authentication was partially successful, and provides other, open
        options for authentication.

        See RFC 4252, Section 5.1.
        rS   r/   r?   ru   r      bar   )rz   r   r{   r   Clockre   clockrq   rm   r   rG   advancerr   rp   r+   r}   rs   r    r    r!   !test_failedPasswordAuthentication9  s    
z8SSHUserAuthServerTests.test_failedPasswordAuthenticationc                    s   t jtj }t jtj}tdtd td d t|  t| }d j	j
_|tdttj | }|t|7 } j	|} fdd}||S )zN
        Test that private key authentication completes successfully,
        r/   r?   	   publickey      testc                    s      jjjtjdfg d S rv   rw   rx   r*   r    r!   ry   Y  s    
zMSSHUserAuthServerTests.test_successfulPrivateKeyAuthentication.<locals>.check)r   r%   r&   r   r'   r8   r-   r   ZsshTypere   r   	sessionIDsignr{   r   MSG_USERAUTH_REQUESTrq   rr   )r+   r8   r^   r}   r\   rs   ry   r    r*   r!   'test_successfulPrivateKeyAuthenticationL  s    

z>SSHUserAuthServerTests.test_successfulPrivateKeyAuthenticationc                    s   t   dd }dd } fdd}| | jd| | | jd| | | jd	| td
td td td }| j| |  tS )z
        ssh_USERAUTH_REQUEST should raise a ConchError if tryAuth returns
        None. Added to catch a bug noticed by pyflakes.
        c                 S   s   |  d d S )Nz&request should have raised ConochError)rW   rn   r    r    r!   mockCbFinishedAuthf  s    zOSSHUserAuthServerTests.test_requestRaisesConchError.<locals>.mockCbFinishedAuthc                 S   s   d S r7   r    )Zkinduserdatar    r    r!   mockTryAuthi  s    zHSSHUserAuthServerTests.test_requestRaisesConchError.<locals>.mockTryAuthc                    s     | j d S r7   )Zerrbackvalue)reasonrs   r    r!   mockEbBadAuthl  s    zJSSHUserAuthServerTests.test_requestRaisesConchError.<locals>.mockEbBadAuthtryAuthZ_cbFinishedAuthZ
_ebBadAuths   userr?   s
   public-keys   data)r   ZDeferredpatchre   r   rq   assertFailurer   )r+   r   r   r   r}   r    r   r!   test_requestRaisesConchError_  s     z3SSHUserAuthServerTests.test_requestRaisesConchErrorc                    sb   t jtj  tdtd td d td t  }j|} fdd}|	|S )z@
        Test that verifying a valid private key works.
        r/   r?   r   rl      ssh-rsac                    s*    jjjtjtdt  fg d S )Nr   )rm   re   r   rG   r   MSG_USERAUTH_PK_OKr   rx   r8   r+   r    r!   ry     s    z@SSHUserAuthServerTests.test_verifyValidPrivateKey.<locals>.check)
r   r%   r&   r   r'   r8   r   re   rq   rr   r|   r    r   r!   test_verifyValidPrivateKeyz  s    z1SSHUserAuthServerTests.test_verifyValidPrivateKeyc                 C   sV   t jtj }tdtd td d td t| }| j|}|	| j
S )d
        Test that private key authentication fails when the public key
        is invalid.
        r/   r?   r   rl   s   ssh-dsar   r%   r&   r   r)   r8   r   re   rq   rr   rp   r+   r8   r}   rs   r    r    r!   3test_failedPrivateKeyAuthenticationWithoutSignature  s    zJSSHUserAuthServerTests.test_failedPrivateKeyAuthenticationWithoutSignaturec                 C   s|   t jtj }t jtj}tdtd td d td t| t|| }d| j	j
_| j	|}|| jS )r   r/   r?   r   r   r   r   )r   r%   r&   r   r'   r8   r-   r   r   re   r   r   rq   rr   rp   )r+   r8   r^   r}   rs   r    r    r!   0test_failedPrivateKeyAuthenticationWithSignature  s    
zGSSHUserAuthServerTests.test_failedPrivateKeyAuthenticationWithSignaturec                 C   sj   t jtj }td|dd  }tdtd td d td t| }| j|}|	| j
S )	z
        Private key authentication fails when the public key type is
        unsupported or the public key is corrupt.
        s   ssh-bad-type   Nr/   r?   r   rl   r   r   r   r    r    r!   test_unsupported_publickey  s    z1SSHUserAuthServerTests.test_unsupported_publickeyc                 C   sR   t  }t| j|_| jt  |  |  |j	
  | |j	ddg dS )ah  
        L{SSHUserAuthServer} sets up
        C{SSHUserAuthServer.supportedAuthentications} by checking the portal's
        credentials interfaces and mapping them to SSH authentication method
        strings.  If the Portal advertises an interface that
        L{SSHUserAuthServer} can't map, it should be ignored.  This is a white
        box test.
        ru   r   N)r   rd   r;   rE   r   rc   r_   r=   ri   rf   rg   rm   r+   serverr    r    r!    test_ignoreUnknownCredInterfaces  s    	
z7SSHUserAuthServerTests.test_ignoreUnknownCredInterfacesc                 C   s   |  d| jj t }t| j|_dd |j_|	  |
  | d|j t }t| j|_dd |j_|	  |
  |  d|j dS )z
        Test that the userauth service does not advertise password
        authentication if the password would be send in cleartext.
        ru   c                 S   s   dS rC   r    xr    r    r!   rR     rS   zISSHUserAuthServerTests.test_removePasswordIfUnencrypted.<locals>.<lambda>c                 S   s   | dkS Ninr    r   r    r    r!   rR     rS   N)ZassertInre   rf   r   rd   r;   rE   r   rM   r=   ri   ZassertNotIn)r+   clearAuthServerhalfAuthServerr    r    r!    test_removePasswordIfUnencrypted  s    z7SSHUserAuthServerTests.test_removePasswordIfUnencryptedc                 C   s   t | j}|t  t }t||_dd |j_|	  |
  | |jdg t }t||_dd |j_|	  |
  | |jdg dS )z
        If the L{SSHUserAuthServer} is not advertising passwords, then an
        unencrypted connection should not cause any warnings or exceptions.
        This is a white box test.
        c                 S   s   dS rC   r    r   r    r    r!   rR     rS   zSSSHUserAuthServerTests.test_unencryptedConnectionWithoutPasswords.<locals>.<lambda>r   c                 S   s   | dkS r   r    r   r    r    r!   rR     rS   N)r   rb   rc   r[   r   rd   r;   r   rM   r=   ri   rm   rf   )r+   rE   r   r   r    r    r!   *test_unencryptedConnectionWithoutPasswords  s$    


zASSHUserAuthServerTests.test_unencryptedConnectionWithoutPasswordsc                 C   s~   t  }t |_t| j|_|  |j	d |
  | |jjtjdttj td td fg | |jj dS )z0
        Test that the login times out.
        鰚        s   you took too longrS   N)r   rd   r   r   r   r;   rE   r   r=   r   ri   rm   rG   MSG_DISCONNECTr{   )DISCONNECT_NO_MORE_AUTH_METHODS_AVAILABLEr   Z
assertTruerF   r+   ZtimeoutAuthServerr    r    r!   test_loginTimeout  s$    

z(SSHUserAuthServerTests.test_loginTimeoutc                 C   s\   t  }t |_t| j|_|  |	  |j
d | |jjg  | |jj dS )zN
        Test that stopping the service also stops the login timeout.
        r   N)r   rd   r   r   r   r;   rE   r   r=   ri   r   rm   rG   assertFalserF   r   r    r    r!   test_cancelLoginTimeout  s    
z.SSHUserAuthServerTests.test_cancelLoginTimeoutc                    sr   d tdtdtdtdtdg}t  j_tdD ]} j|} jj	d q< fd	d
}|
|S )zm
        Test that the server disconnects if the client fails authentication
        too many times.
        rS   r/   r?   ru   r   r      r   c                    s:      jjjd tjdttj td td f d S )Nrk   r   s   too many bad authsrS   )rm   re   r   rG   r   r{   r   r   rx   r*   r    r!   ry   $  s    z:SSHUserAuthServerTests.test_tooManyAttempts.<locals>.check)rz   r   r{   r   r   re   r   rangerq   r   rr   )r+   r}   irs   ry   r    r*   r!   test_tooManyAttempts  s    z+SSHUserAuthServerTests.test_tooManyAttemptsc                 C   sL   t dt d t d td t d }t | j_| j|}|| jS )zo
        If the user requests a service that we don't support, the
        authentication should fail.
        r/   rS   ru   r   )	r   r{   r   r   re   r   rq   rr   rp   r   r    r    r!   test_failIfUnknownService-  s    (z0SSHUserAuthServerTests.test_failIfUnknownServicec                    sV   dd }   jd|    jdd  fdd} jddd} |t|S )	aZ  
        tryAuth() has two edge cases that are difficult to reach.

        1) an authentication method auth_* returns None instead of a Deferred.
        2) an authentication type that is defined does not have a matching
           auth_* method.

        Both these cases should return a Deferred which fails with a
        ConchError.
        c                 S   s   d S r7   r    )r}   r    r    r!   mockAuthC  s    z>SSHUserAuthServerTests.test_tryAuthEdgeCases.<locals>.mockAuthZauth_publickeyZauth_passwordNc                    s    j dd d } |tS )Nru   )re   r   r   r   )ro   Zd2r*   r    r!   
secondTestI  s    z@SSHUserAuthServerTests.test_tryAuthEdgeCases.<locals>.secondTestr   )r   re   r   r   r   rr   )r+   r   r   Zd1r    r*   r!   test_tryAuthEdgeCases8  s    z,SSHUserAuthServerTests.test_tryAuthEdgeCases)r   r   r   r   r   skiprh   rj   rp   rt   r~   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r    r    r    r!   r`      s.   	r`   c                   @   s   e Zd ZdZedkrdZdd Zdd Zdd	 Zd
d Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zd d! ZdS )"SSHUserAuthClientTestsz&
    Tests for SSHUserAuthClient.
    Nra   c                 C   s4   t dt | _td | j_d| jj_| j  d S )Nr/   r   )r$   r;   r>   
authClientr   r   r=   r*   r    r    r!   rh   [  s    
zSSHUserAuthClientTests.setUpc                 C   s   | j   d | _ d S r7   )r   ri   r*   r    r    r!   rj   b  s    
zSSHUserAuthClientTests.tearDownc                 C   sT   |  | jjd |  | jjjd |  | jjjtjt	dt	d t	d fg dS )z;
        Test that client is initialized properly.
        r/   r<   r?   N)
rm   r   r   instancer4   r   rG   r   r   r   r*   r    r    r!   	test_initg  s    z SSHUserAuthClientTests.test_initc                    s@   dg  fdd}|| j j_| j d |  d | j j dS )z9
        Test that the client succeeds properly.
        Nc                    s   |  d< d S )Nr   r    )r@   r   r    r!   stubSetServicew  s    zDSSHUserAuthClientTests.test_USERAUTH_SUCCESS.<locals>.stubSetServicerS   r   )r   r   Z
setServiceZssh_USERAUTH_SUCCESSrm   r   )r+   r   r    r   r!   test_USERAUTH_SUCCESSr  s
    
z,SSHUserAuthClientTests.test_USERAUTH_SUCCESSc              	   C   s  | j tdd  | | j jjd tjtdtd td d td ttj	
tj  f | j tdd  ttj	
tj }| | j jjd tjtdtd td d td | f | j tdttj	
tj   t| j jjttj td td td d td | }tj	
tj}| | j jjd tjtdtd td d td | t|| f d	S )
zJ
        Test that the client can authenticate with a public key.
        r   rl   rk   r/   r<   s   ssh-dssr      N)r   ssh_USERAUTH_FAILUREr   rm   r   rG   r   r   r   r%   r&   r   r)   r8   r'   ssh_USERAUTH_PK_OKr   r{   r-   r   )r+   r8   r]   r^   r    r    r!   test_publickey~  sv    z%SSHUserAuthClientTests.test_publickeyc                 C   sz   t dt }td|_d|j_|  |d g |j_| |	d | 
|jjtjtdtd td fg dS )z
        If the SSHUserAuthClient doesn't return anything from signData,
        the client should start the authentication over again by requesting
        'none' authentication.
        r/   Nr   r   rS   r<   r?   )r:   r;   r>   r   r   r=   r   rG   assertIsNoner   rm   r   r   r   )r+   r   r    r    r!   !test_publickey_without_privatekey  s    


z8SSHUserAuthClientTests.test_publickey_without_privatekeyc                    s.   dd  j _ j d} fdd}||S )z{
        If there's no public key, auth_publickey should return a Deferred
        called back with a False value.
        c                 S   s   d S r7   r    r   r    r    r!   rR     rS   z:SSHUserAuthClientTests.test_no_publickey.<locals>.<lambda>r   c                    s     |  d S r7   )r   resultr*   r    r!   ry     s    z7SSHUserAuthClientTests.test_no_publickey.<locals>.check)r   r,   r   rr   )r+   rs   ry   r    r*   r!   test_no_publickey  s    z(SSHUserAuthClientTests.test_no_publickeyc                 C   s   | j tdd  | | j jjd tjtdtd td d td f | j tdtd  | | j jjd tjtdtd td d tdd  f d	S )
zx
        Test that the client can authentication with a password.  This
        includes changing the password.
        ru   rl   rk   r/   r<   rS   r   r   N)	r   r   r   rm   r   rG   r   r   r   r*   r    r    r!   test_password  s(    
z$SSHUserAuthClientTests.test_passwordc                 C   s"   dd | j _| | j d dS )zK
        If getPassword returns None, tryAuth should return False.
        c                   S   s   d S r7   r    r    r    r    r!   rR     rS   z9SSHUserAuthClientTests.test_no_password.<locals>.<lambda>ru   N)r   r2   r   r   r*   r    r    r!   test_no_password  s    z'SSHUserAuthClientTests.test_no_passwordc                 C   s`   | j tdtd td d td d  | | j jjd tjdtd td f dS )	zj
        Make sure that the client can authenticate with the keyboard
        interactive method.
        rS   s      s
   Password: rl   rk   s      r/   N)r   Z'ssh_USERAUTH_PK_OK_keyboard_interactiver   rm   r   rG   r   ZMSG_USERAUTH_INFO_RESPONSEr*   r    r    r!   test_keyboardInteractive  s    z/SSHUserAuthClientTests.test_keyboardInteractivec                 C   sP   d| j _g | j j_| j d | | j jjtjtdtd td fg dS )z
        If C{SSHUserAuthClient} gets a MSG_USERAUTH_PK_OK packet when it's not
        expecting it, it should fail the current authentication and move on to
        the next type.
        s   unknownrS   r/   r<   r?   N)	r   ZlastAuthr   rG   r   rm   r   r   r   r*   r    r    r!   "test_USERAUTH_PK_OK_unknown_method  s    

z9SSHUserAuthClientTests.test_USERAUTH_PK_OK_unknown_methodc                    s    fdd} fdd}| j _| j _ j tdd    j jjd tj	tdtd	 td
 d td f  j tdd    j jjdd ddg dS )z
        ssh_USERAUTH_FAILURE should sort the methods by their position
        in SSHUserAuthClient.preferredOrder.  Methods that are not in
        preferredOrder should be sorted at the end of that list.
        c                      s    j jdd d S )N      here is datar   r   rK   r    r*   r    r!   auth_firstmethod  s    zNSSHUserAuthClientTests.test_USERAUTH_FAILURE_sorting.<locals>.auth_firstmethodc                      s    j jdd dS )N   
   other dataTr   r    r*   r    r!   auth_anothermethod  s    zPSSHUserAuthClientTests.test_USERAUTH_FAILURE_sorting.<locals>.auth_anothermethods   anothermethod,passwordrl   rk   r/   r<   ru   s"   firstmethod,anothermethod,passwordr   N)r   r   )r   r   )
r   r   r   r   r   rm   r   rG   r   r   )r+   r   r   r    r*   r!   test_USERAUTH_FAILURE_sorting  s,    
z4SSHUserAuthClientTests.test_USERAUTH_FAILURE_sortingc                 C   sT   | j tdd  | j tdd  | | j jjd tjdtd d f dS )	z
        If there are no more available user authentication messages,
        the SSHUserAuthClient should disconnect with code
        DISCONNECT_NO_MORE_AUTH_METHODS_AVAILABLE.
        ru   rl   r   rk   s      s(   no more authentication methods availables       N)r   r   r   rm   r   rG   r   r*   r    r    r!   %test_disconnectIfNoMoreAuthentication
  s    z<SSHUserAuthClientTests.test_disconnectIfNoMoreAuthenticationc                 C   sH   g | j j_| j d | | j jjtjtdtd td fg dS )z
        _ebAuth (the generic authentication error handler) should send
        a request for the 'none' authentication method.
        Nr/   r<   r?   )r   r   rG   Z_ebAuthrm   r   r   r   r*   r    r    r!   test_ebAuth  s    
z"SSHUserAuthClientTests.test_ebAuthc                    s`   t dt      fdd} fdddd   }|j	|S )z
        getPublicKey() should return None.  getPrivateKey() should return a
        failed Deferred.  getPassword() should return a failed Deferred.
        getGenericAnswers() should return a failed Deferred.
        r/   c                    s$   |  t   }|jS r7   )trapNotImplementedErrorr2   rr   rW   
addErrbackr   rs   )r   check2r+   r    r!   ry   -  s    
z3SSHUserAuthClientTests.test_defaults.<locals>.checkc                    s*   |  t  d d d }|jS r7   )r   r   r5   rr   rW   r   r   )r   check3r+   r    r!   r   1  s    
z4SSHUserAuthClientTests.test_defaults.<locals>.check2c                 S   s   |  t d S r7   )r   r   r   r    r    r!   r   5  s    z4SSHUserAuthClientTests.test_defaults.<locals>.check3)
r   r#   r;   r>   r   r,   r.   rr   rW   r   )r+   ry   rs   r    )r   r   r   r+   r!   test_defaults$  s    z$SSHUserAuthClientTests.test_defaults)r   r   r   r   r   r   rh   rj   r   r   r   r   r   r   r   r   r   r   r   r   r   r    r    r    r!   r   R  s$   r   c                   @   s.   e Zd ZedkrdZG dd dZdd ZdS )LoopbackTestsN)cannot run without cryptography or PyASN1c                   @   s"   e Zd ZG dd dZdd ZdS )zLoopbackTests.Factoryc                   @   s    e Zd ZdZdd Zdd ZdS )zLoopbackTests.Factory.Service   TestServicec                 C   s   | j   d S r7   )r   rO   r*   r    r    r!   r=   G  s    z,LoopbackTests.Factory.Service.serviceStartedc                 C   s   d S r7   r    r*   r    r    r!   ri   K  s    z,LoopbackTests.Factory.Service.serviceStoppedN)r   r   r   r4   r=   ri   r    r    r    r!   r>   C  s   r>   c                 C   s   | j S r7   )r>   )r+   Zavatarr4   r    r    r!   rA   O  s    z LoopbackTests.Factory.getServiceN)r   r   r   r>   rA   r    r    r    r!   rB   B  s   rB   c                    s   t  tdj }t _j_dd j_t |_||j_d j_	|j_	dd  j_
|j_
 j_d_t }t|}t   t   t   fdd _|  |jj_tj|j}dd jj_d	d |jj_  |  fd
d}||S )zW
        Test that the userauth server and client play nicely with each other.
        r/   c                 S   s   dS rN   r    r   r    r    r!   rR   ]  rS   z-LoopbackTests.test_loopback.<locals>.<lambda>rS   c                   S   s   d S r7   r    r    r    r    r!   rR   c  rS   r   c                    s   t  j|  dkS )Nr   )lenZsuccessfulCredentials)ZaId)checkerr    r!   rR   m  s    c                   S   s   dS )NZ_ServerLoopbackr    r    r    r    r!   rR   s  rS   c                   S   s   dS )NZ_ClientLoopbackr    r    r    r    r!   rR   t  rS   c                    s     jjjd d S )Nr   )rm   r   r@   r4   rx   r   r    r!   ry   y  s    z*LoopbackTests.test_loopback.<locals>.check)r   rd   r$   rB   r>   r   r"   r@   rM   r   ZsendKexInitrD   ZpasswordDelayrQ   r   r   rc   rU   r[   ZareDonerE   r   ZloopbackAsyncZ	logPrefixr=   rr   )r+   Zclientrb   rE   rs   ry   r    )r   r+   r   r!   test_loopbackS  s6    



zLoopbackTests.test_loopback)r   r   r   r   r   rB   r   r    r    r    r!   r   <  s   r   c                   @   s    e Zd ZedkrdZdd ZdS )ModuleInitializationTestsNr   c                 C   s,   |  tjjd d |  tjjd d d S )N<   r   )rm   r   rd   ZprotocolMessagesr#   r*   r    r    r!   test_messages  s    z'ModuleInitializationTests.test_messages)r   r   r   r   r   r   r    r    r    r!   r     s   r   ):r   Z
__future__r   r   Zzope.interfacer   Ztwisted.cred.checkersr   Ztwisted.cred.credentialsr   r   r   Ztwisted.cred.errorr	   Ztwisted.cred.portalr
   r   Ztwisted.conch.errorr   r   Ztwisted.internetr   r   Ztwisted.protocolsr   Ztwisted.python.reflectr   Ztwisted.trialr   Ztwisted.python.compatr   r{   Ztwisted.conch.ssh.commonr   Ztwisted.conch.checkersr   Ztwisted.conch.sshr   r   r   Ztwisted.conch.testr   r#   r$   r6   r:   r"   r;   rP   rQ   rU   r[   r_   ZTestCaser`   r   r   r   r    r    r    r!   <module>   sP   (G  \ kC