Today I wanted to play with Elixir Stream to mimic the ideas of Observable collections from the Rx (Reactive Extension) concepts. The best playground is the IEx, the interactive shell for Elixir. However, in order to achieve the results that I wanted there was a lot of details that you need to know before having fun with it.

I'm glad that I had some help from @rcillo a co-worker of mine that have been studying Elixir for quite some time. We paired to achieve the results that we wanted. It took just 10~15min to get the desired result.

So what are we going to see in this posts:

  1. running multiples shells
  2. naming process
  3. IO device

So, let's start!

Elixir version used 0.15.1

The Elixir stream code snippet

The concept that I wanted to try is use the GenEvent stream support and the for syntax. This is the snippet:

{:ok, manager} = GenEvent.start_link
stream = GenEvent.stream(manager)
for x <- stream, do: IO.puts(x)
manager.notify("Hello World")

Pretty simple. I'm not going to dig each line of the snippet, the basic idea is:

given a stream of events, for each event published I want to print it on the screen

Side note

(You can advance to the next section if you want)

I'm passionate for new programming languages learning new ones is a way to always keep thinking different. The for notation in Elixir is available in other languages. (Each have different details in implementation, but all proposes the same feature)

Haskell have the do-notation and list-comprehension

main = do
  x <- doSomeComputation
  y <- doSomeComputation
  return (x, y)

-- list comprehension
[x | x <- [1...5]]

In Scala we would have:

for {
  x <- doSomeComputation
  y <- doSomeComputation
} yield (x, y)

And C# LINQ also supports it

from x in doSomeComputation
from y in doSomeComputation
select new { x, y }

So, go check this stuff if you never saw them.

Running on IEx

So, to run the IEx you just do

> iex

Then you can start writing Elixir code on it. So we the code that we want to test, but it didn't worked as expected.

iex> {:ok, manager} = GenEvent.start_link
{:ok, #PID<0.172.0>}
iex> stream = GenEvent.stream(manager)
%GenEvent.Stream{duration: :infinity, id: nil, manager: :eventor, mode: :ack, timeout: :infinity}
iex> for x <- stream, do: IO.puts(stream)
BOOM! console freezes

Well, once you write the for code, the console is blocked because the stream is infinity. The <- notation can be considered as a generator. You can read it as "for each x generated by stream, do...". We could stop it by stopping the GenEvent process. But, how to do that if the console is blocked?

Spawning local shells

I didn't know about it, but @rcillo showed me some tricks. You can type the C-g (control-g) and you will get a prompt where you can type some commands.

User switch command
 --> h
  c [nn]            - connect to job
  i [nn]            - interrupt job
  k [nn]            - kill job
  j                 - list all jobs
  s [shell]         - start local shell
  r [node [shell]]  - start remote shell
  q                 - quit erlang
  ? | h             - this message

Type h to get some helps. If you type j you will get all shells running with its job number.

--> j
   1  {erlang,apply,[#Fun<Elixir.IEx.CLI.0.105530432>,[]]}
   2  {'Elixir.IEx',start,[]}
   3* {'Elixir.IEx',start,[]}

With this you are able to switch between shells by using c <job-number>.

In order to spawn a new shell with IEx, you do:

--> s 'Elixir.IEx'

Note that it is with single quotes (double quotes will fail). In case the shell is seems not to be responsive, just hit ENTER and you it will show the iex> prompt.

So, now that you have two shells, you just go and try:

iex> GenEvent.notify(manager, "hello world")
** (RuntimeError) undefined function: manager/0

Well, it didn't work. How can we fix this? We need to register the process with a name.

Registering processes

Erlang provides a register/2 method that allows you bind a PID process to a name, this way we will have access to it.

iex> {:ok, manager} = GenEvent.start_link
{:ok, #PID<0.185.0>}
iex> :erlang.register(:eventor, manager)
true
iex> :erlang.whereis(:eventor)
#PID<0.185.0>

Ok, now we are able to just GenEvent.notify(:eventor, "Hello World") from both shells.

But, you just send notify messages but nothing prints in the console. You might think that it will the test the shell that is listening for events, but it don't. What is the problem? In order to print the texts in console there is the concept of devices in Erlang (that I will not discuss it here, because I don't know anything about it). But, let's think that it is similar to the STDOUT. The problem is that the shell with for snippet is blocked, and the IO.puts there can't flush its content, and your second shell have a different "device".

We need to specify which device do we want the IO.puts to print out.

Specifying the device where IO.puts prints out

If you check the IO.puts doc you will see the following doc:

puts(device \\ :erlang.group_leader(), item)

So, we can specify the first argument the device that we want to output.

Each shell, has its own :erlang.group_leader(), what we will do is register the group_leader that we want the text to output.

iex> :erlang.register(:globalio, :erlang.group_leader())
true
iex> IO.puts(:globalio, "Hello World")
Hello World

So what we have to do is just rewrite the for code with the following code:

iex> for x <- stream, do: IO.puts(:globalio, x)

Now, every notify event that you send will be printed out on the console! Yay =)

Summary

We saw a lot of things, so let's reproduce the steps in order. Considering the iex(n)> to identify each shell.

iex(1)> {:ok, manager} = GenEvent.start_link
{:ok, #PID<0.185.0>}
iex(1)> :erlang.register(:eventor, manager)
true
Type C-g
s 'Elixir.IEx'
iex(2)> :erlang.register(:globalio, :erlang.group_leader())
true
Type C-g
--> c 1
iex(1)> stream = GenEvent.stream(manager)
%GenEvent.Stream{duration: :infinity, id: nil, manager: :eventor, mode: :ack, timeout: :infinity}
iex(1)> for x <- stream, do: IO.puts(:globalio, x)
Type C-g
--> c 2
iex(2)> GenEvent.notify(:eventor, "Hello World")
Hello World
:ok

Ok, now you can keep exploring the Stream API in Elixir. Have fun!