U
    
W[3  ã                   @   sp  d Z ddlmZmZ ddlZddlmZ ddlmZ zddl	Z	W n e
k
rX   dZ	Y nX zddlZW n e
k
r~   dZY nX e	ršeršddlmZmZ nd ZZddlmZ ddlmZmZ G d	d
„ d
eƒZG dd„ dejƒZG dd„ deƒZG dd„ deƒZedk	rG dd„ dejƒZG dd„ deƒZG dd„ deƒZG dd„ deƒZG dd„ deƒZG dd„ deƒZ dS )z'
Tests for L{twisted.conch.ssh.agent}.
é    )Úabsolute_importÚdivisionN)Úunittest)Úiosim)ÚkeysÚagent)Úkeydata)Ú
ConchErrorÚMissingKeyStoreErrorc                   @   s   e Zd ZdZdd„ ZdS )ÚStubFactoryzb
    Mock factory that provides the keys attribute required by the
    SSHAgentServerProtocol
    c                 C   s
   i | _ d S ©N)r   ©Úself© r   ú?/usr/lib/python3/dist-packages/twisted/conch/test/test_agent.pyÚ__init__&   s    zStubFactory.__init__N)Ú__name__Ú
__module__Ú__qualname__Ú__doc__r   r   r   r   r   r   !   s   r   c                   @   s:   e Zd ZdZedkrdZnedks*edkr.dZdd„ ZdS )ÚAgentTestBasez*
    Tests for SSHAgentServer/Client.
    Nz,iosim requires SSL, but SSL is not availablez)Cannot run without cryptography or PyASN1c                 C   sj   t  tjtj¡\| _| _| _tƒ | j_	t
j tj¡| _t
j tj¡| _t
j tj¡| _t
j tj¡| _d S r   )r   ÚconnectedServerAndClientr   ÚSSHAgentServerÚSSHAgentClientÚclientÚserverÚpumpr   Úfactoryr   ÚKeyÚ
fromStringr   ZprivateRSA_opensshÚ
rsaPrivateZprivateDSA_opensshÚ
dsaPrivateZpublicRSA_opensshÚ	rsaPublicZpublicDSA_opensshÚ	dsaPublicr   r   r   r   ÚsetUp4   s     ÿ
zAgentTestBase.setUp)	r   r   r   r   r   Úskipr   r   r$   r   r   r   r   r   +   s   r   c                   @   s   e Zd ZdZdd„ ZdS )Ú&ServerProtocolContractWithFactoryTestszÚ
    The server protocol is stateful and so uses its factory to track state
    across requests.  This test asserts that the protocol raises if its factory
    doesn't provide the necessary storage for that state.
    c                 C   s2   t  ddtj¡}| jjjd= |  t| jj	|¡ d S )Nz!LBé   r   )
ÚstructZpackr   ZAGENTC_REQUEST_IDENTITIESr   r   Ú__dict__ZassertRaisesr
   ZdataReceived)r   Úmsgr   r   r   Ú/test_factorySuppliesKeyStorageForServerProtocolL   s     ÿzVServerProtocolContractWithFactoryTests.test_factorySuppliesKeyStorageForServerProtocolN)r   r   r   r   r+   r   r   r   r   r&   F   s   r&   c                   @   s(   e Zd ZdZdd„ Zdd„ Zdd„ ZdS )	Ú"UnimplementedVersionOneServerTestsa!  
    Tests for methods with no-op implementations on the server. We need these
    for clients, such as openssh, that try v1 methods before going to v2.

    Because the client doesn't expose these operations with nice method names,
    we invoke sendRequest directly with an op code.
    c                    s0   ˆ j  tjd¡}ˆ j ¡  ‡ fdd„}| |¡S )zV
        assert that we get the correct op code for an RSA identities request
        ó    c                    s   ˆ   tjt| dd… ƒ¡ d S )Nr   r'   )ÚassertEqualr   ZAGENT_RSA_IDENTITIES_ANSWERÚord)Zpacketr   r   r   Ú_cbd   s     ÿzRUnimplementedVersionOneServerTests.test_agentc_REQUEST_RSA_IDENTITIES.<locals>._cb)r   ÚsendRequestr   ZAGENTC_REQUEST_RSA_IDENTITIESr   ÚflushÚaddCallback)r   Údr0   r   r   r   Ú"test_agentc_REQUEST_RSA_IDENTITIES^   s    
zEUnimplementedVersionOneServerTests.test_agentc_REQUEST_RSA_IDENTITIESc                 C   s(   | j  tjd¡}| j ¡  | | jd¡S )z[
        assert that we get the correct op code for an RSA remove identity request
        r-   )r   r1   r   ZAGENTC_REMOVE_RSA_IDENTITYr   r2   r3   r.   ©r   r4   r   r   r   Útest_agentc_REMOVE_RSA_IDENTITYj   s    
zBUnimplementedVersionOneServerTests.test_agentc_REMOVE_RSA_IDENTITYc                 C   s(   | j  tjd¡}| j ¡  | | jd¡S )zj
        assert that we get the correct op code for an RSA remove all identities
        request.
        r-   )r   r1   r   Z AGENTC_REMOVE_ALL_RSA_IDENTITIESr   r2   r3   r.   r6   r   r   r   Ú%test_agentc_REMOVE_ALL_RSA_IDENTITIESs   s    
zHUnimplementedVersionOneServerTests.test_agentc_REMOVE_ALL_RSA_IDENTITIESN)r   r   r   r   r5   r7   r8   r   r   r   r   r,   U   s   	r,   c                   @   s    e Zd ZdZdd„ Zdd„ ZdS )ÚCorruptServerzº
        A misbehaving server that returns bogus response op codes so that we can
        verify that our callbacks that deal with these op codes handle such
        miscreants.
        c                 C   s   |   dd¡ d S ©Néþ   r-   ©ZsendResponse©r   Údatar   r   r   Úagentc_REQUEST_IDENTITIES…   s    z'CorruptServer.agentc_REQUEST_IDENTITIESc                 C   s   |   dd¡ d S r:   r<   r=   r   r   r   Úagentc_SIGN_REQUEST‰   s    z!CorruptServer.agentc_SIGN_REQUESTN)r   r   r   r   r?   r@   r   r   r   r   r9      s   r9   c                   @   s(   e Zd ZdZdd„ Zdd„ Zdd„ ZdS )	ÚClientWithBrokenServerTestszM
    verify error handling code in the client using a misbehaving server
    c                 C   s2   t  | ¡ t ttj¡\| _| _| _	t
ƒ | j_d S r   )r   r$   r   r   r9   r   r   r   r   r   r   r   r   r   r   r   r$   “   s    
 ÿz!ClientWithBrokenServerTests.setUpc                 C   s*   | j  | j ¡ d¡}| j ¡  |  |t¡S )zÄ
        Assert that L{SSHAgentClient.signData} raises a ConchError
        if we get a response from the server whose opcode doesn't match
        the protocol for data signing requests.
        ó   John Hancock)r   ÚsignDatar"   Úblobr   r2   ÚassertFailurer	   r6   r   r   r   Ú"test_signDataCallbackErrorHandlingœ   s    
z>ClientWithBrokenServerTests.test_signDataCallbackErrorHandlingc                 C   s    | j  ¡ }| j ¡  |  |t¡S )zÉ
        Assert that L{SSHAgentClient.requestIdentities} raises a ConchError
        if we get a response from the server whose opcode doesn't match
        the protocol for identity requests.
        )r   ÚrequestIdentitiesr   r2   rE   r	   r6   r   r   r   Ú+test_requestIdentitiesCallbackErrorHandling§   s    

zGClientWithBrokenServerTests.test_requestIdentitiesCallbackErrorHandlingN)r   r   r   r   r$   rF   rH   r   r   r   r   rA   Ž   s   	rA   c                   @   s0   e Zd ZdZdd„ Zdd„ Zdd„ Zdd	„ Zd
S )ÚAgentKeyAdditionTestsz<
    Test adding different flavors of keys to an agent.
    c                    s2   ˆ j  ˆ j ¡ ¡}ˆ j ¡  ‡ fdd„}| |¡S )á@  
        L{SSHAgentClient.addIdentity} adds the private key it is called
        with to the SSH agent server to which it is connected, associating
        it with the comment it is called with.

        This test asserts that omitting the comment produces an
        empty string for the comment on the server.
        c                    s:   ˆ j jjˆ j ¡  }ˆ  ˆ j|d ¡ ˆ  d|d ¡ d S ©Nr   r-   r'   ©r   r   r   r    rD   r.   ©ÚignoredZ	serverKeyr   r   r   Ú_checkÃ   s    zBAgentKeyAdditionTests.test_addRSAIdentityNoComment.<locals>._check©r   ÚaddIdentityr    ÚprivateBlobr   r2   r3   ©r   r4   rO   r   r   r   Útest_addRSAIdentityNoComment¸   s    	
z2AgentKeyAdditionTests.test_addRSAIdentityNoCommentc                    s2   ˆ j  ˆ j ¡ ¡}ˆ j ¡  ‡ fdd„}| |¡S )rJ   c                    s:   ˆ j jjˆ j ¡  }ˆ  ˆ j|d ¡ ˆ  d|d ¡ d S rK   ©r   r   r   r!   rD   r.   rM   r   r   r   rO   Õ   s    zBAgentKeyAdditionTests.test_addDSAIdentityNoComment.<locals>._check©r   rQ   r!   rR   r   r2   r3   rS   r   r   r   Útest_addDSAIdentityNoCommentÊ   s    	
z2AgentKeyAdditionTests.test_addDSAIdentityNoCommentc                    s6   ˆ j jˆ j ¡ dd}ˆ j ¡  ‡ fdd„}| |¡S )á1  
        L{SSHAgentClient.addIdentity} adds the private key it is called
        with to the SSH agent server to which it is connected, associating
        it with the comment it is called with.

        This test asserts that the server receives/stores the comment
        as sent by the client.
        ó   My special key©Zcommentc                    s:   ˆ j jjˆ j ¡  }ˆ  ˆ j|d ¡ ˆ  d|d ¡ d S ©Nr   rY   r'   rL   rM   r   r   r   rO   è   s    zDAgentKeyAdditionTests.test_addRSAIdentityWithComment.<locals>._checkrP   rS   r   r   r   Útest_addRSAIdentityWithCommentÜ   s    	 ÿ
z4AgentKeyAdditionTests.test_addRSAIdentityWithCommentc                    s6   ˆ j jˆ j ¡ dd}ˆ j ¡  ‡ fdd„}| |¡S )rX   rY   rZ   c                    s:   ˆ j jjˆ j ¡  }ˆ  ˆ j|d ¡ ˆ  d|d ¡ d S r[   rU   rM   r   r   r   rO   û   s    zDAgentKeyAdditionTests.test_addDSAIdentityWithComment.<locals>._checkrV   rS   r   r   r   Útest_addDSAIdentityWithCommentï   s    	 ÿ
z4AgentKeyAdditionTests.test_addDSAIdentityWithCommentN)r   r   r   r   rT   rW   r\   r]   r   r   r   r   rI   ³   s
   rI   c                   @   s   e Zd Zdd„ ZdS )ÚAgentClientFailureTestsc                 C   s$   | j  dd¡}| j ¡  |  |t¡S )zK
        verify that the client raises ConchError on AGENT_FAILURE
        r;   r-   )r   r1   r   r2   rE   r	   r6   r   r   r   Útest_agentFailure  s    
z)AgentClientFailureTests.test_agentFailureN)r   r   r   r_   r   r   r   r   r^     s   r^   c                   @   s8   e Zd ZdZdd„ Zdd„ Zdd„ Zdd	„ Zd
d„ ZdS )ÚAgentIdentityRequestsTestszJ
    Test operations against a server with identities already loaded.
    c                 C   sB   t  | ¡ | jdf| jjj| j ¡ < | jdf| jjj| j ¡ < d S ©Nó	   a commentó   another comment©r   r$   r!   r   r   r   rD   r    r   r   r   r   r$     s    
 ÿ ÿz AgentIdentityRequestsTests.setUpc                 C   sX   | j  | j ¡ d¡}| j ¡  |  |¡}| j d¡}|  	||¡ |  
| j |d¡¡ dS )zc
        Sign data with an RSA private key and then verify it with the public
        key.
        rB   N)r   rC   r"   rD   r   r2   ZsuccessResultOfr    Zsignr.   Ú
assertTrueÚverify)r   r4   Z	signatureÚexpectedr   r   r   Útest_signDataRSA  s    

z+AgentIdentityRequestsTests.test_signDataRSAc                    s4   ˆ j  ˆ j ¡ d¡}ˆ j ¡  ‡ fdd„}| |¡S )zb
        Sign data with a DSA private key and then verify it with the public
        key.
        rB   c                    s   ˆ   ˆ j | d¡¡ d S )NrB   )re   r#   rf   )Zsigr   r   r   rO   0  s    z;AgentIdentityRequestsTests.test_signDataDSA.<locals>._check)r   rC   r#   rD   r   r2   r3   rS   r   r   r   Útest_signDataDSA)  s    
z+AgentIdentityRequestsTests.test_signDataDSAc                 C   s<   | j jj| j ¡ = | j | j ¡ d¡}| j ¡  |  	|t
¡S )zm
        Assert that we get an errback if we try to sign data using a key that
        wasn't added.
        rB   )r   r   r   r"   rD   r   rC   r   r2   rE   r	   r6   r   r   r   Ú$test_signDataRSAErrbackOnUnknownBlob8  s    
z?AgentIdentityRequestsTests.test_signDataRSAErrbackOnUnknownBlobc                    s*   ˆ j  ¡ }ˆ j ¡  ‡ fdd„}| |¡S )z}
        Assert that we get all of the keys/comments that we add when we issue a
        request for all identities.
        c                    s^   i }d|ˆ j  ¡ < d|ˆ j ¡ < i }| D ]$}|d |tjj|d dd ¡ < q(ˆ  ||¡ d S )Nrb   rc   r'   r   rD   )Útype)r#   rD   r"   r   r   r   r.   )Zkeytrg   ZreceivedÚkr   r   r   rO   J  s    "zAAgentIdentityRequestsTests.test_requestIdentities.<locals>._check)r   rG   r   r2   r3   rS   r   r   r   Útest_requestIdentitiesC  s    

	z1AgentIdentityRequestsTests.test_requestIdentitiesN)	r   r   r   r   r$   rh   ri   rj   rm   r   r   r   r   r`     s   r`   c                   @   s0   e Zd ZdZdd„ Zdd„ Zdd„ Zdd	„ Zd
S )ÚAgentKeyRemovalTestsz<
    Test support for removing keys in a remote server.
    c                 C   sB   t  | ¡ | jdf| jjj| j ¡ < | jdf| jjj| j ¡ < d S ra   rd   r   r   r   r   r$   \  s    
 ÿ ÿzAgentKeyRemovalTests.setUpc                    s2   ˆ j  ˆ j ¡ ¡}ˆ j ¡  ‡ fdd„}| |¡S )z<
        Assert that we can remove an RSA identity.
        c                    sJ   ˆ   dtˆ jjjƒ¡ ˆ  ˆ j ¡ ˆ jjj¡ ˆ  ˆ j	 ¡ ˆ jjj¡ d S ©Nr'   )
r.   Úlenr   r   r   ÚassertInr!   rD   ZassertNotInr    ©rN   r   r   r   rO   l  s    z;AgentKeyRemovalTests.test_removeRSAIdentity.<locals>._check)r   ÚremoveIdentityr    rD   r   r2   r3   rS   r   r   r   Útest_removeRSAIdentityd  s    
z+AgentKeyRemovalTests.test_removeRSAIdentityc                    s2   ˆ j  ˆ j ¡ ¡}ˆ j ¡  ‡ fdd„}| |¡S )z;
        Assert that we can remove a DSA identity.
        c                    s2   ˆ   dtˆ jjjƒ¡ ˆ  ˆ j ¡ ˆ jjj¡ d S ro   )r.   rp   r   r   r   rq   r    rD   rr   r   r   r   rO   {  s    z;AgentKeyRemovalTests.test_removeDSAIdentity.<locals>._check)r   rs   r!   rD   r   r2   r3   rS   r   r   r   Útest_removeDSAIdentitys  s    
z+AgentKeyRemovalTests.test_removeDSAIdentityc                    s*   ˆ j  ¡ }ˆ j ¡  ‡ fdd„}| |¡S )z;
        Assert that we can remove all identities.
        c                    s   ˆ   dtˆ jjjƒ¡ d S )Nr   )r.   rp   r   r   r   rr   r   r   r   rO   ˆ  s    z=AgentKeyRemovalTests.test_removeAllIdentities.<locals>._check)r   ZremoveAllIdentitiesr   r2   r3   rS   r   r   r   Útest_removeAllIdentities  s    

z-AgentKeyRemovalTests.test_removeAllIdentitiesN)r   r   r   r   r$   rt   ru   rv   r   r   r   r   rn   W  s
   rn   )!r   Z
__future__r   r   r(   Ztwisted.trialr   Ztwisted.testr   ZcryptographyÚImportErrorZpyasn1Ztwisted.conch.sshr   r   Ztwisted.conch.testr   Ztwisted.conch.errorr	   r
   Úobjectr   ZTestCaser   r&   r,   r   r9   rA   rI   r^   r`   rn   r   r   r   r   Ú<module>   s8   


)
%PI