By Balraj Singh - 2013-08-15
Lwt is a lightweight cooperative threading library for OCaml. A good way to understand Lwt and its use in MirageOS is to write some simple code. This document introduces the basic concepts and suggests programs to write. Code for all examples is in the
The full Lwt manual is available elsewhere, but the minimal stuff needed to get started is here.
The core type in Lwt is a "thread" (also known as a "promise" in some other systems).
'a Lwt.t is a thread that should produce a value of type
'a (for example, an
int Lwt.t should produce a single
Initially a thread is sleeping (the result is not yet known). At some point, it changes to be either returned (with a value of type
'a) or failed (with an exception). Once returned or failed, a thread never changes state again.
Lwt provides a number of functions for working with threads.
The first useful function is
return, which constructs a trivial, already-returned thread:
val return: 'a -> 'a Lwt.t
This is useful if an API requires a thread, but you already happen to know the value.
Once the value is wrapped in its Lwt thread, it cannot directly be used (as in general a thread may not have terminated yet). This is where the
>>= operator (pronounced "bind") comes in:
val ( >>= ): 'a Lwt.t -> ('a -> 'b Lwt.t) -> 'b Lwt.t
t >>= f creates a thread which first waits for thread
t to return some value
x, then behaves as the new thread
f x. If
t is a sleeping thread, then
t >>= f will initially be a sleeping thread too. If
t fails, then the resulting thread will fail with the same exception.
If you ignore the
Lwt.t bits in the types above, you can see that
return looks like the identity function and
>>= looks like
|> ("pipe" or "apply").
You can convert any synchronous program into an equivalent Lwt-threaded one using just
For example consider this code to input two values and add them:
let x = let a = get_input "Enter a" in let b = get_input "Enter b" in a + b
let ... in ... syntax, we could also write:
let x = get_input "Enter a" |> fun a -> get_input "Enter b" |> fun b -> a + b
get_input function's type is changed from
string -> int to the threaded-equivalent,
string -> int Lwt.t, then our example could be changed to:
let x = get_input "Enter a" >>= fun a -> get_input "Enter b" >>= fun b -> Lwt.return (a + b)
Note that the final result,
x, is itself a thread now.
Since we didn't change
+ to return a thread, we must wrap the result with
return to give it the correct type.
Of course, the reason for using Lwt is to write programs that do more than just behave like synchronous programs: we want to be doing multiple things at once, by composing threads in more ways than just "a then b".
Two important functions to compose threads are
val join : unit Lwt.t list -> unit Lwt.t
join takes a list of threads and waits for all of them to terminate. If at least one thread fails then
join l will fail with the same exception as the first to fail, after all threads terminate.
val choose : 'a t list -> 'a t
choose l behaves as the first thread in
l to terminate. If several threads are already terminated, one is chosen at random.
The Lwt_list module provides many other functions for handling lists of threads.
Now write a program that spins off two threads, each of which sleeps for some amount of time, say 1 and 2 seconds and then one prints "Heads", the other "Tails". After both have finished, it prints "Finished" and exits.
To sleep for some number of nanoseconds use
OS.Time.sleep_ns, and to print to
the console use
C.log. Note that
OS is a Mirage-specific module; if you are
using Lwt in another context, use
Lwt_io.write. (You will
also need to manually start the main event loop with
For convenience, you'll likely want to also use the Duration library, which provides handy functions for converting between seconds, milliseconds, nanoseconds, and other units of time.
OS.Time.sleep_ns (Duration.of_sec 3) (* sleep for 3 seconds *)
You will need to have MirageOS installed. Create a file
config.ml with the following content:
open Mirage let packages = [package "duration"] let () = let main = foreign ~packages "Unikernel.Heads1" (console @-> job) in register "heads1" [ main $ default_console ]
Add a file
unikernel.ml with the following content and edit it:
open OS open Lwt.Infix module Heads1 (C: Mirage_console.S) = struct let start c = (* Add your implementation here... *) C.log c "Finished" end
Assuming you want to build as a normal Unix process, compile the application with:
mirage configure -t unix make depend make ./main.native
If you prefer to build for another target (like
hvt), change the
-t argument to
mirage configure. To see the available backends, have a look at the documentation available with
mirage configure --help.
open OS open Lwt.Infix module Heads1 (C: Mirage_console.S) = struct let start c = Lwt.join [ (Time.sleep_ns (Duration.of_sec 1) >>= fun () -> C.log c "Heads"); (Time.sleep_ns (Duration.of_sec 2) >>= fun () -> C.log c "Tails") ] >>= fun () -> C.log c "Finished" end
Write an echo server that reads from a dummy input generator and, for each line it reads, writes it to the console. The server should stop after reading 10 lines.
Hint: it's easier to convert a program to use Lwt if you write loops in a functional style (using tail recursion) rather than using special syntax (e.g.
For convenience, here is a
config.ml which you might use for this exercise:
open Mirage let packages = [package "duration"; package "randomconv"] let () = let main = foreign ~packages "Unikernel.Echo_server" (console @-> random @-> job) in register "echo_server" [ main $ default_console $ default_random ]
You might notice that it's very similar to the previous example
config.ml, but it requires an extra package
randomconv has convenience functions for dealing with random data, which this challenge asks you to do. Here is a basic dummy input generator you can use for testing:
let read_line () = OS.Time.sleep_ns (Duration.of_ms (Randomconv.int ~bound:2500 R.generate)) >|= fun () -> String.make (Randomconv.int ~bound:20 R.generate) 'a'
By the way, the
>|= operator ("map") used here is similar to
>>= but automatically wraps the result of the function you provide with
return. It's used here because
String.make is synchronous (it doesn't return a thread). We could also have used
return together to get the same effect.
open OS open Lwt.Infix module Echo_server (C: Mirage_console.S) (R: Mirage_random.S) = struct let read_line () = OS.Time.sleep_ns (Duration.of_ms (Randomconv.int ~bound:2500 R.generate)) >|= fun () -> String.make (Randomconv.int ~bound:20 R.generate) 'a' let start c _r = let rec echo_server = function | 0 -> Lwt.return () | n -> read_line () >>= fun s -> C.log c s >>= fun () -> echo_server (n - 1) in echo_server 10 end
>>= operator does the threaded equivalent of a tail-call
optimisation, so this won't consume more and more memory as it runs.
Understanding the basic principles behind Lwt can be helpful.
The core of Lwt is based on an event loop. In "standard" (non-MirageOS) settings,
this loop is started using the
Lwt_main.run function. However, when using
MirageOS, the loop is automatically started by the
main.ml file autogenerated
mirage command-line tool.
Because it's based on an event loop, threads are very cheap in Lwt when compared to preemptive system threads. Sleeping registers an event that will wake up the associated thread when possible.
With Lwt, it is often possible to avoid mutexes altogether! The web server from the Ocsigen project uses only two, for example. In usual concurrent systems, mutexes are used to prevent two (or more) threads executing concurrently on a given piece of data. This can happen when a thread is preemptively interrupted and another one starts running. In Lwt, a thread executes serially until it explicitly yields (most commonly via
>>=); for this reason, Lwt threads are said to be cooperative.
For example, consider this code to generate unique IDs:
let next = let i = ref 0 in fun () -> incr i; !i
It is entirely safe to call this from multiple Lwt threads, since we know that
incr, the only function we call, isn't going to somehow recursively call
next while it's running.
x >>= f (and similar) will run other threads while waiting for
x to terminate, and these may well invoke the function again, so you can't assume things won't be modified across a bind.
For example, this version is not safe:
let next = let i = ref 0 in fun () -> incr i; foo () >|= fun () -> (* Another thread might call [next] here *) !i
Of course, this is true of any function that might, directly or indirectly, call
next, not just Lwt ones.
The obvious danger associated with cooperative threading is having threads not cooperating: if an expression takes a lot of time to compute with no cooperation point, then the whole program hangs. The
Lwt.yield function introduces an explicit cooperation point.
sleeping also obviously makes the thread cooperate.
If locking a data structure is still needed, the
Lwt_mutex module provides the necessary functions. To obtain more information on thread switching (and how to prevent it) read the Lwt mailing list archive: Lwt_stream, thread switch within push function which continues here.
If you want to spawn a thread without waiting for the result, use
Lwt.async (fun () -> OS.Time.sleep_ns (Duration.of_sec 10) >>= fun () -> C.log c "Finished" )
Note: do not do
let _ = my_background_thread (). This ignores the result of the thread, which means that if it fails with an exception then the error will never be reported.
Lwt.async reports errors to the user's configured
Lwt.async_exception_handler, which may or may not terminate the unikernel depending on how it has been configured.
It is often better to catch such exceptions and log them with some contextual information. Here's some real Mirage code that spawns a new background thread to handle a new frame received from the network. The log message includes the exception it caught, a dump of the troublesome frame and, like all log messages, information about when it occurred and in which module.
(* Handle a frame of data from the network... *) Lwt.async (fun () -> Lwt.catch (fun () -> fn data) (fun ex -> Log.err (fun f -> f "uncaught exception from listen callback \\ while handling frame:@\\n%a@\\nException: @[%s@]" S.pp_frame data (Printexc.to_string ex)); Lwt.return () ) )
By the way, the reason
catch take functions that create threads rather than just plain threads is so they can start the thread inside a
try .. with block and so handle OCaml exceptions consistently.
Be careful not to disable this safety feature by accident - consider:
let test1 () = let t = raise (Failure "early failure") in Lwt.catch (fun () -> t) (fun ex -> print_endline "caught exception!"; Lwt.return ()) let test2 () = let t = OS.Time.sleep_ns (Duration.of_sec 1) >>= fun () -> raise (Failure "late failure") in Lwt.catch (fun () -> t) (fun ex -> print_endline "caught exception!"; Lwt.return ())
t raises an exception immediately (without waiting for a sleeping thread and thus getting added to an event queue),
test1 will exit with an exception before even reaching the
t blocks first. In this case, the sleeping
t is passed to
catch, which handles the exception.
let t = inside the
catch callback avoids this problem (as does using
Lwt.fail instead of
In Mirage code, we typically distinguish two types of error: programming errors (bugs, which should be reported to the programmer to be fixed) and expected errors (e.g. network disconnected or invalid TCP packet received). We try to use the type system to ensure that expected errors are handled gracefully.
For expected errors, you should use the
result type, which provides
This is a built-in in OCaml 4.03 and available from the
result opam package for older versions.
Here's an example that calls
read_arg twice and returns the sum of the results on success. If either
read_arg returns an error then that is returned immediately.
let example () = read_arg () >>= function | Error _ as e -> Lwt.return e | Ok a -> read_arg () >>= function | Error _ as e -> Lwt.return e | Ok b -> Lwt.return (Ok (a + b))
It is often useful to provide some helpers to handle this pattern (using Lwt threads and result types together) more simply:
let ok x = Lwt.return (Ok x) let (>>*=) m f = m >>= function | Error _ as e -> Lwt.return e | Ok x -> f x let example () = read_arg () >>*= fun a -> read_arg () >>*= fun b -> ok (a + b)
If a bug is detected, you should raise an exception. In threaded code you should use
Lwt.fail, although Lwt will catch exceptions and turn them into failures automatically if you forget.
You shouldn't normally need to catch specific exceptions (it would be better to use an
Error return in that case), but it is sometimes necessary.
The Lwt-equivalent of
try foo x with | Error_you_want_to_catch -> (* handle error here *)
Lwt.catch (fun () -> foo x) (function | Error_you_want_to_catch -> (* handle error here *) | ex -> Lwt.fail ex (* Pass others on *) )
Depending on how the unikernel is set up, an exception may or may not be fatal.
In general, if you allocate a resource that won't be automatically freed by the garbage collector then you should use
Lwt.finalize to ensure it is cleaned up whether the function using it succeeds or not:
let r = Resource.alloc () in Lwt.finalize (fun () -> use r) (fun () -> Resource.free r)
To make it harder to get this wrong, it is a good idea to provide a
with_ function, so users can just do:
with_resource (fun r -> use r)
You can create a thread that sleeps until you explicitly make it return a result with
which returns a thread and a waker:
let invoke_remote msg = let t, waker = Lwt.wait () in let id = new_id () in on_response_to id (fun resp -> Lwt.wakeup waker resp); send_request id msg; t
This is mainly useful when interacting with external processes (as in this example), or libraries that don't support Lwt directly.
In order to cancel a thread, the function
cancel (provided by the module Lwt) is needed. It has type
'a t -> unit and does exactly what it says (except on certain complicated cases that are not in the scope of this tutorial). A simple timeout function that cancels a thread after a given number of seconds can be written easily:
(* In this example and all those afterwards, we consider Lwt and OS to be opened *) let timeout delay t = Time.sleep_ns delay >|= fun () -> cancel t
timeout function does not allow one to use the result returned by the thread
timeout function so that it returns either
t has not yet returned after
delay seconds or
Some v if
delay seconds. In order to achieve this behaviour it is possible to use the function
Lwt.state that, given a thread, returns the state it is in, either
You can test your solution with this application, which creates a thread that may be cancelled before it returns:
let start c _r = let t = Time.sleep_ns (Duration.of_ms (Randomconv.int ~bound:3000 R.generate)) >|= fun () -> "Heads" in timeout (Duration.of_sec 2) t >>= function | None -> C.log c "Cancelled" | Some v -> C.log c (Printf.sprintf "Returned %S" v)
let timeout delay t = Time.sleep_ns delay >>= fun () -> match Lwt.state t with | Lwt.Sleep -> Lwt.cancel t; Lwt.return None | Lwt.Return v -> Lwt.return (Some v) | Lwt.Fail ex -> Lwt.fail ex
This solution and application are found in tutorial/lwt/timeout1/unikernel.ml in the repository.
Does your solution match the one given here and always returns after
f seconds, even when
t returns within
This is a good place to introduce a third operation to compose threads:
val pick : 'a t list -> 'a t
pick behaves exactly like
choose except that it cancels all other sleeping threads when one terminates.
In a typical use of a timeout, if
t returns before the timeout has expired, one would want the timeout to be cancelled right away. The next challenge is to modify the timeout function to return
Some v right after
t returns. Of course if the timeout does expire then it should cancel
t and return
In order to test your solution, you can compile it to a mirage executable and run it using the skeleton provided for the previous challenge.
let timeout delay t = let tmout = Time.sleep_ns delay in Lwt.pick [ (tmout >|= fun () -> None); (t >|= fun v -> Some v); ]
Found in lwt/tutorial/timeout2/unikernel.ml in the repository.
cancel function should be used very sparingly, since it essentially throws an unexpected exception into the middle of some executing code that probably wasn't expecting it.
A cancel that occurs when the thread happens to be performing an uncancellable operation will be silently ignored.
A safer alternative is to use Lwt_switch. This means that cancellation will only happen at well defined points, although it does require explicit support from the code being cancelled. If you have a function that only responds to cancel, you might want to wrap it in a function that takes a switch and cancels it when the switch is turned off.
Lwt provides many more features. See the manual for details. However, the vast majority of code will only need the basic features described here.