Just An Application

June 17, 2013

Programming With Rust — Part Ten: More Fun With Sockets

Currently our HTTP server is a little lacking in functionality although it is reasonably secure.

The next thing to do is to accept the incoming connection.

1.0 The accept Function

Given that there is a listen function you would expect that there would also be an accept function and indeed there is,

It is documented here and it looks like this

    fn accept(new_conn: TcpNewConnection) -> result::Result<TcpSocket, TcpErrData>

It takes a TcpNewConnection as an argument, which is handy because we get passed one of those, and it returns a

    result::Result<TcpSocket, TcpErrData>

which as we have already seen is a parameterized enum type.

In this case the value returned from a call to the accept function is going to be either

  • an instance of the Ok variant with an associated value of the type TcpSocket, or

  • an instance of the Err variant with an associated value of type TcpErrData

2.0 Working With Enums

To determine the outcome of our call to accept we will need to use a match expression.

A Rust match expression is akin to a C++ or Java switch statement but considerably more flexible than either of them.

A match expression can be used to match a value of an enum type againt its possible variants.

In the case of the value of type result::Result<TcpSocket, TcpErrData>
returned by the accept function we can do this

    match tcp::accept(newConn)
    {
        Ok(socket) => 
        {
            // connection accepted 
            // ...
        },
			   
        Err(error) => 
        {
            // whoops ! somethings gone wrong
            // ...
        }
    }

If the return value matches the Ok variant pattern then the TcpSocket value it contains is bound to the local variable socket.

Alternatively if it matches the Err variant pattern then TcpErrData value is bound to the local variable error.

3.0 Accepting The Connection

3.1 Stage One

To make it easier to show what is going in the code the two closures passed to the listen function are replaced by functions.

In this case there was nothing for them to ‘close over’ anyway so it does not really make much difference.

The three functions look like this,

    ...

    fn on_establish_callback(chan: SharedChan<Option<TcpErrData>>)
    {
        io::println(fmt!("on_establish_callback(%?)", chan));
    }


    fn new_connection_callback(newConn :TcpNewConnection, chan: SharedChan<Option<TcpErrData>>)
    {
        io::println(fmt!("new_connection_callback(%?, %?)", newConn, chan)); 
        fail!(~"Now what ?");
    }

    fn main()
    {	
        tcp::listen(
            ip::v4::parse_addr(IPV4_LOOPBACK),
            PORT,
            BACKLOG,
            &uv_iotask::spawn_iotask(task::task()),
            on_establish_callback,
            new_connection_callback);
    }

3.2 Stage Two

At this point it all gets a bit complicated.

Whilest calling the accept function is straight forward enough as is getting at the result, the documentation states

It is safe to call net::tcp::accept only within the context of the new_connect_cb callback provided as the final argument to the net::tcp::listen function.

The new_conn opaque value is provided only as the first argument to the new_connect_cb provided as a part of net::tcp::listen. It can be safely sent to another task but it must be used (via net::tcp::accept) before the new_connect_cb call it was provided to returns.

and then for good measure in the Returns section it goes on to say

On success, this function will return a net::tcp::TcpSocket as the Ok variant of a Result.
The net::tcp::TcpSocket is anchored within the task that accept was called within for its lifetime. On failure, this function will return a net::tcp::TcpErrData record as the Err variant of a Result.

So

  1. The accept function must be called before the new_connect_cb callback invoked by the listen function returns

  2. The TcpNewConnection value passed to the the new_connect_cb callback can be passed between Tasks

  3. The TcpSocket value returned from a successful call to the accept function cannot be passed between Tasks

There are two solutions to this little exercise in constraint programming, either

  • accept the connection in the new_connect_cb callback, which ensures the accept function completes before the callback returns but means the connection has to be handled in the callback as well

  • spawn a new task and accept the connection there but ensure that the new_connect_cb callback waits until the accept function has returned which will require some form of synchronization.

Since blocking the callback while we handle the connection presumably prevents any further connections being established which will make for a very serial server we will go with the second option.

4.0 Spawning A Task

We can spawn a task using the task:spawn function which is documented here and defined like this

    fn spawn(f: ~fn())

It takes an owned closure which does not take any arguments.

So to accept a connection in a new task we would need to do something like this

    ...

    task::spawn(
        ||
        {
            match tcp::accept(newConn)
            {
                Ok(socket) => 
                {
                    // ...
                },
			   
                Err(error) => 
                {
                    // ...
                }
            }
        });

    ...

5.0 Things We Need To Know Part N: The do Expression

A function which takes a closure as its last argument can also be invoked using a do expression with the closure argument appearing outside the function call.

The example of spawning a task above becomes

    ...

    do task::spawn()
        ||
        {
            match tcp::accept(newConn)
            {
                Ok(socket) => 
                {
                    // ...
                },
			   
                Err(error) => 
                {
                    // ...
                }
            }
        );

    ...

If the argument list to the function is empty it can be omitted. This is also true for the closure.

Taking advantage of this the example then becomes

    ...

    do task::spawn
        {
            match tcp::accept(newConn)
            {
                Ok(socket) => 
                {
                    // ...
                },
			   
                Err(error) => 
                {
                    // ...
                }
            }
        );

    ...

This pattern is used quite heavily so it is important to be able to recognize it when you see it.

6.0 Things We Need to Know Part N + 1: Method Invocation Very Very Briefly

Methods can be defined on most Rust types.

Methods are invoked using the dot notation, that is

    value.method()

7.0 Task Synchronization

We need to be able to wait in the new_connection_callback function until the accept function has completed in the newly spawned task.

More rummaging around in the documentation turns up the std::sync module.

This module defines the useful looking Mutex struct.

The function Mutex returns a Mutex with an associated CondVar and CondVar supports signal and wait methods.

Mutex supports the lock_cond method which locks itself and then invokes a function with its associated CondVar as its argument.

To use a Mutex/CondVar for Task synchronization, in the function new_connection_callback we need to do something like this

    ...

    let mx = Mutex();
    
    do mx.lock_cond
        |cv|
        {
            // spawn task
            // ...
            cv.wait();
        }
        
    ...

and in the spawned task we need to do something like this

    ...

    match tcp::accept(newConn)
    {
        Ok(socket) => 
        {
            // call lock_cond and signal cv
                        
        },
			   
        Err(error) => 
        {
            // call lock_cond and signal cv
        }
    }

    ...

The question is how do we get hold of the Mutex in the spawned task so that we can lock it ?

The Mutex cannot be moved irrespective of where it is allocated because it is referenced by the call to lock_cond which is not going to return until the associated CondVar is signalled.

We need something which can be moved into the spawned task and enables us to act on the original Mutex/CondVar.

Some more rummaging in the documentation turns up the Mutex clone method which looks as though it is what we want.

If in new_connection_callback we create a clone of the original Mutex in the owned heap then the owned closure passed to task::spawn can take ownership of it.

So in new_connection_callback we need to do this.

    ...

    let mx = Mutex();
    
    do mx.lock_cond
        |cv|
        {
            let mxc = ~mx.clone();
            
            // spawn task
            // ...
            cv.wait();
        }
        
    ...

and in the spawned task we can do this

    ...

    match tcp::accept(newConn)
    {
        Ok(socket) => 
        {
            do mxc.lock_cond
                |cv|
                {
                    cv.signal();
                }
        },
			   
        Err(error) => 
        {
            do mxc.lock_cond
                |cv|
                {
                    cv.signal();
                }
        }
    }

    ...

8.0 Accepting The Connection Revisited

So combining all the above we end up with this

    ...
 
    fn new_connection_callback(newConn :TcpNewConnection, chan: SharedChan<Option>)
    {
        io::println(fmt!("new_connection_callback(%?, %?)", newConn, chan));
	
        let mx = Mutex();
	
        do mx.lock_cond
            |cv|
            {
                let mxc = ~mx.clone();

                do task::spawn 
                {
                    match tcp::accept(newConn)
                    {
                        Ok(socket) => 
                        {
                            io::println("accept succeeded");
                            do mxc.lock_cond
                                |cv|
                                {
                                    cv.signal();
                                }
                        },
                        Err(error) => 
                        {
                            io::println("accept failed");
                            do mxc.lock_cond
                                |cv|
                                {
                                    cv.signal();
                                }
                        }
                    }
                }
                cv.wait();
            }
        fail!(~"Now what ?");
    }

9.0 Running The Code

If we run the new version of httpd and then use a web browser to connect to 127.0.0.1:3534 we get this (output slightly re-formatted for clarity)

 
    ./httpd
    on_establish_callback({x: {data: (0x10070a570 as *())}})
    new_connection_callback(NewTcpConn((0x100832c00 as *())), {x: {data: (0x10070a570 as *())}})
    accept succeeded
    rust: task failed at 'Now what ?', httpd.rc:71
    Assertion failed: (false && "Rust task failed after reentering the Rust stack"), \
        function upcall_call_shim_on_rust_stack, \
        file /Users/simon/Src/lang/rust-0.6/src/rt/rust_upcall.cpp, line 92.
    Abort trap

10.0 The Source Code For httpd v0.2

    // httpd.rc

    // v0.2

    extern mod std;

    use core::comm::SharedChan;

    use core::option::Option;

    use core::task;

    use std::net::ip;
    use std::net::tcp;
    use std::net::tcp::TcpErrData;
    use std::net::tcp::TcpNewConnection;

    use std::sync::Mutex;

    use std::uv_iotask;

    static BACKLOG: uint = 5;
    static PORT:    uint = 3534;

    static IPV4_LOOPBACK: &'static str = "127.0.0.1";


    fn on_establish_callback(chan: SharedChan<Option>)
    {
        io::println(fmt!("on_establish_callback(%?)", chan));
    }

    fn new_connection_callback(newConn :TcpNewConnection, chan: SharedChan<Option>)
    {
        io::println(fmt!("new_connection_callback(%?, %?)", newConn, chan));
	
        let mx = Mutex();
	
        do mx.lock_cond
            |cv|
            {
                let mxc = ~mx.clone();

                do task::spawn 
                {
                    match tcp::accept(newConn)
                    {
                        Ok(socket) => 
                        {
                            io::println("accept succeeded");
                            do mxc.lock_cond
                                |cv|
                                {
                                    cv.signal();
                                }
                        },
                        Err(error) => 
                        {
                            io::println("accept failed");
                            do mxc.lock_cond
                                |cv|
                                {
                                    cv.signal();
                                }
                        }
                    }
                }
                cv.wait();
            }
        fail!(~"Now what ?");
    }

    fn main()
    {	
        tcp::listen(
            ip::v4::parse_addr(IPV4_LOOPBACK),
            PORT,
            BACKLOG,
            &mp;uv_iotask::spawn_iotask(task::task()),
            on_establish_callback,
            new_connection_callback);
    }

Update: 27.07.2014

The socket functions as described above no longer exist in Rust.

See here for some discussion of the equivalent functionality in the current version of Rust


Copyright (c) 2013 By Simon Lewis. All Rights Reserved.

Unauthorized use and/or duplication of this material without express and written permission from this blog’s author and owner Simon Lewis is strictly prohibited.

Excerpts and links may be used, provided that full and clear credit is given to Simon Lewis and justanapplication.wordpress.com with appropriate and specific direction to the original content.

Blog at WordPress.com.