Just An Application

June 17, 2013

Programming With Rust — Part Ten: More Fun With Sockets

Currently our HTTP server is a little lacking in functionality although it is reasonably secure.

The next thing to do is to accept the incoming connection.

1.0 The accept Function

Given that there is a listen function you would expect that there would also be an accept function and indeed there is,

It is documented here and it looks like this

    fn accept(new_conn: TcpNewConnection) -> result::Result<TcpSocket, TcpErrData>

It takes a TcpNewConnection as an argument, which is handy because we get passed one of those, and it returns a

    result::Result<TcpSocket, TcpErrData>

which as we have already seen is a parameterized enum type.

In this case the value returned from a call to the accept function is going to be either

  • an instance of the Ok variant with an associated value of the type TcpSocket, or

  • an instance of the Err variant with an associated value of type TcpErrData

2.0 Working With Enums

To determine the outcome of our call to accept we will need to use a match expression.

A Rust match expression is akin to a C++ or Java switch statement but considerably more flexible than either of them.

A match expression can be used to match a value of an enum type againt its possible variants.

In the case of the value of type result::Result<TcpSocket, TcpErrData>
returned by the accept function we can do this

    match tcp::accept(newConn)
    {
        Ok(socket) => 
        {
            // connection accepted 
            // ...
        },
			   
        Err(error) => 
        {
            // whoops ! somethings gone wrong
            // ...
        }
    }

If the return value matches the Ok variant pattern then the TcpSocket value it contains is bound to the local variable socket.

Alternatively if it matches the Err variant pattern then TcpErrData value is bound to the local variable error.

3.0 Accepting The Connection

3.1 Stage One

To make it easier to show what is going in the code the two closures passed to the listen function are replaced by functions.

In this case there was nothing for them to ‘close over’ anyway so it does not really make much difference.

The three functions look like this,

    ...

    fn on_establish_callback(chan: SharedChan<Option<TcpErrData>>)
    {
        io::println(fmt!("on_establish_callback(%?)", chan));
    }


    fn new_connection_callback(newConn :TcpNewConnection, chan: SharedChan<Option<TcpErrData>>)
    {
        io::println(fmt!("new_connection_callback(%?, %?)", newConn, chan)); 
        fail!(~"Now what ?");
    }

    fn main()
    {	
        tcp::listen(
            ip::v4::parse_addr(IPV4_LOOPBACK),
            PORT,
            BACKLOG,
            &uv_iotask::spawn_iotask(task::task()),
            on_establish_callback,
            new_connection_callback);
    }

3.2 Stage Two

At this point it all gets a bit complicated.

Whilest calling the accept function is straight forward enough as is getting at the result, the documentation states

It is safe to call net::tcp::accept only within the context of the new_connect_cb callback provided as the final argument to the net::tcp::listen function.

The new_conn opaque value is provided only as the first argument to the new_connect_cb provided as a part of net::tcp::listen. It can be safely sent to another task but it must be used (via net::tcp::accept) before the new_connect_cb call it was provided to returns.

and then for good measure in the Returns section it goes on to say

On success, this function will return a net::tcp::TcpSocket as the Ok variant of a Result.
The net::tcp::TcpSocket is anchored within the task that accept was called within for its lifetime. On failure, this function will return a net::tcp::TcpErrData record as the Err variant of a Result.

So

  1. The accept function must be called before the new_connect_cb callback invoked by the listen function returns

  2. The TcpNewConnection value passed to the the new_connect_cb callback can be passed between Tasks

  3. The TcpSocket value returned from a successful call to the accept function cannot be passed between Tasks

There are two solutions to this little exercise in constraint programming, either

  • accept the connection in the new_connect_cb callback, which ensures the accept function completes before the callback returns but means the connection has to be handled in the callback as well

  • spawn a new task and accept the connection there but ensure that the new_connect_cb callback waits until the accept function has returned which will require some form of synchronization.

Since blocking the callback while we handle the connection presumably prevents any further connections being established which will make for a very serial server we will go with the second option.

4.0 Spawning A Task

We can spawn a task using the task:spawn function which is documented here and defined like this

    fn spawn(f: ~fn())

It takes an owned closure which does not take any arguments.

So to accept a connection in a new task we would need to do something like this

    ...

    task::spawn(
        ||
        {
            match tcp::accept(newConn)
            {
                Ok(socket) => 
                {
                    // ...
                },
			   
                Err(error) => 
                {
                    // ...
                }
            }
        });

    ...

5.0 Things We Need To Know Part N: The do Expression

A function which takes a closure as its last argument can also be invoked using a do expression with the closure argument appearing outside the function call.

The example of spawning a task above becomes

    ...

    do task::spawn()
        ||
        {
            match tcp::accept(newConn)
            {
                Ok(socket) => 
                {
                    // ...
                },
			   
                Err(error) => 
                {
                    // ...
                }
            }
        );

    ...

If the argument list to the function is empty it can be omitted. This is also true for the closure.

Taking advantage of this the example then becomes

    ...

    do task::spawn
        {
            match tcp::accept(newConn)
            {
                Ok(socket) => 
                {
                    // ...
                },
			   
                Err(error) => 
                {
                    // ...
                }
            }
        );

    ...

This pattern is used quite heavily so it is important to be able to recognize it when you see it.

6.0 Things We Need to Know Part N + 1: Method Invocation Very Very Briefly

Methods can be defined on most Rust types.

Methods are invoked using the dot notation, that is

    value.method()

7.0 Task Synchronization

We need to be able to wait in the new_connection_callback function until the accept function has completed in the newly spawned task.

More rummaging around in the documentation turns up the std::sync module.

This module defines the useful looking Mutex struct.

The function Mutex returns a Mutex with an associated CondVar and CondVar supports signal and wait methods.

Mutex supports the lock_cond method which locks itself and then invokes a function with its associated CondVar as its argument.

To use a Mutex/CondVar for Task synchronization, in the function new_connection_callback we need to do something like this

    ...

    let mx = Mutex();
    
    do mx.lock_cond
        |cv|
        {
            // spawn task
            // ...
            cv.wait();
        }
        
    ...

and in the spawned task we need to do something like this

    ...

    match tcp::accept(newConn)
    {
        Ok(socket) => 
        {
            // call lock_cond and signal cv
                        
        },
			   
        Err(error) => 
        {
            // call lock_cond and signal cv
        }
    }

    ...

The question is how do we get hold of the Mutex in the spawned task so that we can lock it ?

The Mutex cannot be moved irrespective of where it is allocated because it is referenced by the call to lock_cond which is not going to return until the associated CondVar is signalled.

We need something which can be moved into the spawned task and enables us to act on the original Mutex/CondVar.

Some more rummaging in the documentation turns up the Mutex clone method which looks as though it is what we want.

If in new_connection_callback we create a clone of the original Mutex in the owned heap then the owned closure passed to task::spawn can take ownership of it.

So in new_connection_callback we need to do this.

    ...

    let mx = Mutex();
    
    do mx.lock_cond
        |cv|
        {
            let mxc = ~mx.clone();
            
            // spawn task
            // ...
            cv.wait();
        }
        
    ...

and in the spawned task we can do this

    ...

    match tcp::accept(newConn)
    {
        Ok(socket) => 
        {
            do mxc.lock_cond
                |cv|
                {
                    cv.signal();
                }
        },
			   
        Err(error) => 
        {
            do mxc.lock_cond
                |cv|
                {
                    cv.signal();
                }
        }
    }

    ...

8.0 Accepting The Connection Revisited

So combining all the above we end up with this

    ...
 
    fn new_connection_callback(newConn :TcpNewConnection, chan: SharedChan<Option>)
    {
        io::println(fmt!("new_connection_callback(%?, %?)", newConn, chan));
	
        let mx = Mutex();
	
        do mx.lock_cond
            |cv|
            {
                let mxc = ~mx.clone();

                do task::spawn 
                {
                    match tcp::accept(newConn)
                    {
                        Ok(socket) => 
                        {
                            io::println("accept succeeded");
                            do mxc.lock_cond
                                |cv|
                                {
                                    cv.signal();
                                }
                        },
                        Err(error) => 
                        {
                            io::println("accept failed");
                            do mxc.lock_cond
                                |cv|
                                {
                                    cv.signal();
                                }
                        }
                    }
                }
                cv.wait();
            }
        fail!(~"Now what ?");
    }

9.0 Running The Code

If we run the new version of httpd and then use a web browser to connect to 127.0.0.1:3534 we get this (output slightly re-formatted for clarity)

 
    ./httpd
    on_establish_callback({x: {data: (0x10070a570 as *())}})
    new_connection_callback(NewTcpConn((0x100832c00 as *())), {x: {data: (0x10070a570 as *())}})
    accept succeeded
    rust: task failed at 'Now what ?', httpd.rc:71
    Assertion failed: (false && "Rust task failed after reentering the Rust stack"), \
        function upcall_call_shim_on_rust_stack, \
        file /Users/simon/Src/lang/rust-0.6/src/rt/rust_upcall.cpp, line 92.
    Abort trap

10.0 The Source Code For httpd v0.2

    // httpd.rc

    // v0.2

    extern mod std;

    use core::comm::SharedChan;

    use core::option::Option;

    use core::task;

    use std::net::ip;
    use std::net::tcp;
    use std::net::tcp::TcpErrData;
    use std::net::tcp::TcpNewConnection;

    use std::sync::Mutex;

    use std::uv_iotask;

    static BACKLOG: uint = 5;
    static PORT:    uint = 3534;

    static IPV4_LOOPBACK: &'static str = "127.0.0.1";


    fn on_establish_callback(chan: SharedChan<Option>)
    {
        io::println(fmt!("on_establish_callback(%?)", chan));
    }

    fn new_connection_callback(newConn :TcpNewConnection, chan: SharedChan<Option>)
    {
        io::println(fmt!("new_connection_callback(%?, %?)", newConn, chan));
	
        let mx = Mutex();
	
        do mx.lock_cond
            |cv|
            {
                let mxc = ~mx.clone();

                do task::spawn 
                {
                    match tcp::accept(newConn)
                    {
                        Ok(socket) => 
                        {
                            io::println("accept succeeded");
                            do mxc.lock_cond
                                |cv|
                                {
                                    cv.signal();
                                }
                        },
                        Err(error) => 
                        {
                            io::println("accept failed");
                            do mxc.lock_cond
                                |cv|
                                {
                                    cv.signal();
                                }
                        }
                    }
                }
                cv.wait();
            }
        fail!(~"Now what ?");
    }

    fn main()
    {	
        tcp::listen(
            ip::v4::parse_addr(IPV4_LOOPBACK),
            PORT,
            BACKLOG,
            &mp;uv_iotask::spawn_iotask(task::task()),
            on_establish_callback,
            new_connection_callback);
    }

Update: 27.07.2014

The socket functions as described above no longer exist in Rust.

See here for some discussion of the equivalent functionality in the current version of Rust


Copyright (c) 2013 By Simon Lewis. All Rights Reserved.

Unauthorized use and/or duplication of this material without express and written permission from this blog’s author and owner Simon Lewis is strictly prohibited.

Excerpts and links may be used, provided that full and clear credit is given to Simon Lewis and justanapplication.wordpress.com with appropriate and specific direction to the original content.

June 13, 2013

Programming With Rust — Part Five: Things We Need To Know – Tasks And Memory

1.0 Tasks

The unit of concurrency in Rust is the Task.

Code executing in different Tasks runs concurrently.

To this extent a Task is analagous to a Thread in a language such as Java but with one very important difference.

Rust Tasks are memory independent as well as execution independent.

Code executing in separate Tasks cannot interact via shared memory.

This turns out to have a number of interesting ramifications.

2.0 Memory Access

Rust code executing in a Task has read/write access to three distinct areas of memory,

  • the current stack frame

  • a Managed heap belonging to the current Task

  • a global heap called the Owned heap

as well as read-only access to an area of static memory which is global and immutable.

2.1 The Current Stack Frame: Local Variables

Local variables can be allocated in the current stack frame like this

    let foo: uint = 5;

This creates an immutable local variable foo with the value 5;

A mutable local variable can be allocated like this

    let mut bar: uint = 5;

Local variables can hold simple discrete values such as integers as well as more complicated aggregates of values such as enum variants

    let white: Colour = RGB(255, 255, 255);

In this respect Rust is like C++ where fixed-size arrays or instances of structs or classes as well as integers, floats, etc. can be stored directly in a stack frame, and unlike Java where arrays and object instances can only ever be stored in the heap.

2.2 The Heaps

A Note On Terminology

I am not at all convinced that I fully understand the terminology used to describe the Rust memory model when it comes to heaps, so what follows is my own attempt at a consistent and hopefully accurate terminology.

A box is a piece of memory which has been allocated in a heap.

A pointer is a reference to a box which can be used to access its contents.

More specifically

  • a managed box is a box in a Managed heap

  • a managed pointer is a pointer to a managed box

  • an owned box is a box in the Owned heap

  • an owned pointer is a pointer to an owned box

2.2.1 The Managed Heap

Each Task has its own Managed heap which can only be accessed by code executing in that Task.

Each managed box may be referenced by multiple pointers.

If at some point there are no longer any pointers to a managed box it becomes eligible to be garbage collected.

The garbage collection of a managed box will occur at some time between the point at which it becomes eligible to be garbage collected and the point at which the Task which owns the Managed heap ends.

To allocate a uint in the managed heap of the current Task you can do this

    let cod: @uint = @5;

In this example both the local variable baz and the managed box are immutable.

You can allocate a mutable managed box like this

    let dab: @mut uint = @mut 5;

The local variable cod is immutable but the managed box is mutable.

You can obviously allocate things other than unsigned integers in a Managed heap. For example,

    let eel: @Colour = @RGB(0, 0, 0);

2.2.2 The Owned Heap

There is a single Owned heap which can be accessed by code executing in any Task.

An owned box can only be referenced by a single pointer.

An owned box is garbage collected at the point, if any, that it is no longer referenced by a pointer.

To allocate a uint in the Owned heap you can do this

    let bream: ~uint = ~5;

In this example both the local variable bream and the owned box are immutable.

You can allocate a mutable owned box like this

    let mut chub: ~uint = ~5;

And an example of allocating something other than an unsigned integer in the Owned heap.

    let dace: ~Colour = ~RGB(0, 0, 0);

2.3 Static Memory

A program’s static memory holds the values of items processed at compile time.

2.4 Enforcing The Semantics Of Managed And Owned Boxes And Pointers

The semantics of managed and owned boxes and pointers are enforced at compilation time.

2.4.1 Type Safety

Managed pointers and owned pointers are distinct types and they are not interchangeable, i.e. their types include where they are pointing as well as what they are pointing at.

For example you cannot do this

    let rudd: @uint = ~5;

nor this

    let scad: ~uint = @5;

2.4.2 Assignment Of Owned Pointers

Because there can only ever be one owned pointer to an owned box, if you do this

    let hake = ~17;

and at some point you then do this

    let goby = hake;

then from that point on the local variable hake is no longer usable.

2.4.3 Passing Owned Pointers As Arguments To Functions

If you pass an owned pointer to a function then ownership is transferred to that function.

For example, given this enum type

    enum FreshwaterFish
    {
        Loach,
        Perch,
        Roach,
        Tench,
    }

and this function definition

    fn catch(fish: ~FreshwaterFish)
    {
        ...
    }

if you do this

    let perch : ~FreshwaterFish = ~Perch;
    
    catch(perch);

then after the call to catch the local variable perch is no longer usable.

2.4.4 Returning Owned Pointers From Functions

If you return an owned pointer from a function then ownership is transferred to the caller of the function.

For example, given this enum type again

    enum FreshwaterFish
    {
        Loach,
        Perch,
        Roach,
        Tench,
    }

and this function definition

    fn catch_and_return(fish: ~FreshwaterFish) -> ~FreshwaterFish
    {
        fish
    }

then the net effect of this

    let tench: ~FreshwaterFish = ~Tench;

    ...
    
    let fish:  ~FreshwaterFish = catch_and_return(tench);

is to transfer ownership of the owned box from the local variable tench to the local variable fish.

3.0 Borrowed Pointers

You can borrow a pointer to a piece of memory irrespective of its location so long as the lifetime of the memory pointed to is guaranteed to be longer than the lifetime of the borrowed pointer.

This constraint is enforced by the compiler. If it cannot prove that the constraint is true then the code will not compile.

A borrowed pointer is declared using an ampersand (&).

For example,

    fn set_background(colour: &Colour)
    {
        ...
    }

The function set_background takes a borrowed pointer to a value of type Colour.

Because you can obtain a borrowed pointer to a value irrespective of its location the set_background function can be passed a Colour value that is stored in the current stack frame, or in the Managed heap of the current Task, or in the Owned heap, like so

    ...

   let s_white:  Colour = RGB(255, 255, 255);
   let m_white: @Colour = @RGB(255, 255, 255);
   let o_white: ~Colour = ~RGB(255, 255, 255);
   
   set_background(&s_white);
   set_background(m_white);
   set_background(o_white);
   
   ...

Note that to obtain a pointer to a local variable you use the & operator. The compiler will automatically create a borrowed pointer for a managed or owned box.


Copyright (c) 2013 By Simon Lewis. All Rights Reserved.

Unauthorized use and/or duplication of this material without express and written permission from this blog’s author and owner Simon Lewis is strictly prohibited.

Excerpts and links may be used, provided that full and clear credit is given to Simon Lewis and justanapplication.wordpress.com with appropriate and specific direction to the original content.

Blog at WordPress.com.

%d bloggers like this: