Welcome to Selium's user guide

Ahoy, and thanks for being part of the community! In this wiki you'll learn about Selium, its components and how to use them.

If at any point you find yourself thinking "I wish they'd cover ...", please start a new discussion. We would really value your feedback and ideas. Thanks!

What is Selium?

You may have seen this description of Selium on the organisation readme: "Selium is an extremely developer friendly composable messaging platform with zero build time configuration." This is a nice statement, but lacks any depth. So, let's add some, shall we?

"Selium is a messaging platform." Messaging is simply a method of exchanging data in a structured way. You may have come across the term "Pub-Sub", which is a method of sending (publishing) the same message to lots of receivers (subscribers). That may be a bit of a grey area to people new to messaging, but you'll definitely know this one: "HTTP". Yep that's right, web requests are a form of messaging too. It's called an "RPC", or more colloquially "request-reply".

"Selium is composable." We've built Selium as a collection of parts that you can stick together to aggregate, manipulate and disseminate your data however you require, much like everyone's favourite childrens' toy that rhymes with "Pego". You guessed it - Nanoblocks!

"Selium is extremely developer friendly." Selium is designed from the ground up for developer ergonomics. Whatever requirements you have for your services and data, you can compose them in at runtime using our functional API. In other words, Selium has "zero build time configuration".

Who should use Selium?

(Anyone with ops-related trauma)

Selium is designed for software developers. If your project needs to move data around, expose services, discover services, introduce resiliency, scale out, speed up, produce or consume events, or otherwise run screaming from HTTP, Selium is for you. Liberate your stack from DevOps, simplify your deployment pipeline and never use the word "idempotent" again.

Let's get started!

First Steps

Selium comprises a client library and a server binary. In order to use Selium, there are 3 basic steps:

  1. Create TLS certificates for your client and server
  2. Run the server binary
  3. Integrate the client library into your project

Step 1 - TLS Certificates

First we need some certs to validate the client and server. Selium uses mutual TLS (mTLS) to validate both parties cryptographically, making things nice and secure.

We've built a tool to make this easy, so let's install that, then mint our certs:

# Install the selium-tools CLI
$ cargo install selium-tools
# Use this CLI to create our certificates
$ selium-tools gen-certs

You should now have a directory in the current path called certs/. Inside we have certs for the client and server, which you can move to a more convenient location if you like - the paths are configurable in code. Both the client/ and server/ directories include a copy of the certificate authority, which you'll need if you want to create more client certificates later.

Step 2 - Start the Selium Server

The Selium server allows us to exchange messages between clients. For this example we'll grab a copy from crates.io.

For production deployments you can also download prebuilt binaries from GitHub.

Let's start a new server with our freshly minted certs. In the same directory as your certs/ folder, open a new terminal and run the following commands:

# Install Selium server
$ cargo install selium-server
# Run the server
$ selium-server --bind-addr=127.0.0.1:7001

The selium-server command will not produce any output by default, but that doesn't mean it isn't working! You can increase logging using the verbosity flag:

$ selium-server -v # Warnings only
$ selium-server -vv # Info
$ selium-server -vvv # Debug
$ selium-server -vvvv # Trace

Step 3 - Implement the Selium Client

Selium Client is a composable library API for the Selium Server. Let's have a look at a minimal example:

use futures::{SinkExt, StreamExt};
use selium::{prelude::*, std::codecs::StringCodec};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let connection = selium::custom() // connect to your own Selium server
        .endpoint("127.0.0.1:7001") // your Selium server's address
        .with_certificate_authority("certs/client/ca.der")? // your Selium cert authority
        .with_cert_and_key(
            "certs/client/localhost.der",
            "certs/client/localhost.key.der",
        )? // your client certificates
        .connect()
        .await?;

    let mut publisher = connection
        .publisher("/some/topic") // choose a topic to group similar messages together
        .with_encoder(StringCodec) // allows you to exchange string messages between clients
        .open() // opens a new stream for sending data
        .await?;

    let mut subscriber = connection
        .subscriber("/some/topic") // subscribe to the publisher's topic
        .with_decoder(StringCodec) // use the same codec as the publisher
        .open() // opens a new stream for receiving data
        .await?;

    // Send a message and close the publisher
    publisher.send("Hello, world!".into()).await?;
    publisher.finish().await?;

    // Receive the message
    if let Some(Ok(message)) = subscriber.next().await {
        println!("Received message: {message}");
    }

    Ok(())
}

There's a lot to take in here, but for the moment let's just get this baby running!

# Create a new Cargo project
$ cargo new --bin hello-selium

# Move into our project
$ cd hello-selium

# Add the crate dependencies
$ cargo add futures
$ cargo add -F std selium
$ cargo add -F macros,rt tokio

Now copy and paste the code above into hello-selium/src/main.rs.

Now let's execute the project and start exchanging some messages! Make sure your server is still running from the previous step.

$ cargo run
Received message: Hello, world!

Next Steps

We've just setup a working Selium publish/subscribe project, but you can also use RPC too.

Request/Reply (RPC pattern)

In the previous chapter we discussed our first steps with the publish/subscribe pattern. In this chapter we will explore Selium's other major communication pattern - request/reply (otherwise known as RPC).

Request/reply is useful in the same way that a REST API is. For instance, you can use Selium to replace your internal APIs:

#[derive(Deserialize)]
enum Request {
    LookupCustomer(u64),
    // Other requests
}

async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let connection = selium::custom()
        ...

    // Setup a database connection to demonstrate state sharing
    let mut db = create_db()?;

    let mut replier = connection
        .replier("/customers/api") // choose a topic to serve your API on
        .with_request_decoder(StringCodec) // let's use a StringCodec to interface with JSON clients
        .with_request_encoder(StringCodec) // let's use a StringCodec to interface with JSON clients
        .with_handler(move |json| {
            let req: Request = serde_json::from_str(&json)?;
            match req {
                Request::LookupCustomer(id) => lookup_customer(&mut db, r.client_id),
            }
        })
        .open()
        .await?;

    replier.listen().await?;

    Ok(())
}

async fn lookup_customer(db: &mut Database, id: u64) -> Result<String> {
    // Lookup request...

    // Reply with a JSON response
    Ok(r#"
        {
            "name": "John Doe",
            "age": 43,
            "phones": [
                "+44 1234567",
                "+44 2345678"
            ]
        }"#)
}

And now for the requestor:

async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let connection = selium::custom()
        ...

    let requestor = connection
        .requestor("/some/endpoint")
        .with_request_encoder(StringCodec)
        .with_reply_decoder(StringCodec)
        .with_request_timeout(Duration::from_secs(1))?
        .open()
        .await?;

    let customer = requestor.request(r#"{"LookupCustomer": 85028}"#).await.unwrap();

    Ok(())
}

Benefits over HTTP

  1. No web servers
  2. Built in mutual TLS
  3. Ability to exchange pure Rust types over the wire
  4. Transport much larger amounts of data, much faster
  5. Multiple messaging patterns at your fingertips
  6. 100% Rust toolchain

Next Steps

Nice work, we've got all kinds of data flowing! Now it's time to learn about reusing Selium connections.

Reusing Connections

Selium supports reusing connections to the Selium server, otherwise known as multiplexing. This allows you to maintain a single connection that publishes and/or subscribes to multiple topics simultaneously.

Let's have another look at that code from the previous chapter:

async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let connection = selium::custom()
        ...

    let mut publisher = connection
        .publisher("/some/topic") // choose a topic to group similar messages together
        .with_encoder(StringCodec) // allows you to exchange string messages between clients
        .open() // opens a new stream for sending data
        .await?;

    let mut subscriber = connection
        .subscriber("/some/topic") // subscribe to the publisher's topic
        .with_decoder(StringCodec) // use the same codec as the publisher
        .open() // opens a new stream for receiving data
        .await?;

    ...

    Ok(())
}

We can see that each time a new behaviour is required, we create a new stream using Client::publisher and Client::subscriber. For each new stream, the full range of builder options is available, uniquely for that stream.

Next, let's see how we can send/receive data more ergonomically with Stream and Sink traits.

Stream and Sink traits

If you've ever used Rust's futures or tokio crates, you'll almost certainly be aware of the Stream and Sink traits. These traits govern the sending and receiving of messages between components, and are arguably a de facto standard in Rust.

If you've never used these traits before, don't worry! Not only are they quite intuitive, but the futures crate has great documentation explaining how they work.

The good news for Selium users, is that Selium's Publisher and Subscriber both implement Sink and Stream respectively. Thus they should fit hand in glove with your existing streaming applications.

You can also make use of the futures crate's StreamExt and SinkExt extensions. These traits are implemented by default on any Stream/Sink implementations (like Selium's) and contain lots of helpful tools.

Let's go back to our example from previous chapters and see what we can do with SinkExt on a Publisher:

use futures::{future, stream, SinkExt, Stream, StreamExt};
use selium::{prelude::*, std::codecs::StringCodec};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let connection = selium::custom()
        .endpoint("127.0.0.1:7001")
        .with_certificate_authority("certs/client/ca.der")? // your Selium cert authority
        .with_cert_and_key(
            "certs/client/localhost.der",
            "certs/client/localhost.key.der",
        )? // your client certificates
        .connect() // your Selium server's address
        .await?;

    let publisher = connection
        .publisher("/some/topic") // choose a topic to group similar messages together
        .with_encoder(StringCodec) // allows you to exchange string messages between clients
        .open() // opens a new stream for sending data
        .await?;

    let mut sink = publisher.with(|item: String| {
        if item.contains('@') {
            future::ok(item)
        } else {
            future::ok("default_email@company.org".into())
        }
    });
    sink.send_all(&mut some_stream()).await?;

    Ok(())
}

fn some_stream() -> impl Stream<Item = Result<String, Box<dyn std::error::Error>>> {
    stream::iter(vec![
        "hello@world.com".into(),
        "some@example.net".into(),
        "notanemail.com".into(),
    ])
    .map(|email| Ok(email))
}

In this example, we take a stream of email addresses and optionally map invalid email addresses to a default email address. Of course, this is not a very useful example, but the Rust ecosystem is full of Streams and Sinks. Selium's native support for these traits allows you to slot Selium client into your libraries and applications with ease.

Next, let's get to grips with codecs.

Codecs

Looking back at the previous chapters, you probably noticed lines like these ones:

#![allow(unused)]
fn main() {
let mut publisher = connection
    ...
    .with_encoder(StringCodec) // allows you to exchange string messages between clients

let mut subscriber = connection
    ...
    .with_decoder(StringCodec) // use the same codec as the publisher
}

If you've not worked with data streaming before, codecs might seem like a very strange concept indeed. However the answer is quite simple. The term codecs is a portmanteau of encoder and decoder. Thus a codec is simply a structure that can encode frames of data on one side of a connection, and decode them on the other.

In the example above, we use StringCodec, which does exactly what it says on the tin - it encodes strings to bytes and then decodes those bytes back to strings.

Why do we need codecs?

We need codecs because at the network level, computers only support sending raw bytes. However most data we work with are not raw bytes - they're strings, integers, booleans, enumerators, structures etc. In order to abstract away the pain of converting these types to bytes and back again, we use a codec that knows how to do it for us.

Can I send more than just strings?

Yep! With the help of the serde crate, we support sending all manner of Rust types. In the example below, we use the BincodeCodec to send a stream of StockEvents.

You'll need to install serde with the derive feature to follow this example:

$ cargo add -F derive serde
use futures::SinkExt;
use selium::{prelude::*, std::codecs::BincodeCodec};
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize)]
struct StockEvent {
    ticker: String,
    change: f64,
}

impl StockEvent {
    pub fn new(ticker: &str, change: f64) -> Self {
        Self {
            ticker: ticker.to_owned(),
            change,
        }
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let connection = selium::custom()
        .endpoint("127.0.0.1:7001")
        .with_certificate_authority("certs/client/ca.der")?
        .with_cert_and_key(
            "certs/client/localhost.der",
            "certs/client/localhost.key.der",
        )?
        .connect()
        .await?;

    let mut publisher = connection
        .publisher("/some/topic")
        .with_encoder(BincodeCodec::default())
        .open()
        .await?;

    publisher.send(StockEvent::new("INTC", -9.0)).await?;
    publisher.finish().await?;

    Ok(())
}

Can I build my own codec?

Yes, and you can use third party codecs like protocol buffers, SBE etc. We'll have a chapter on this coming soon.

Selium Cloud

Selium Cloud (Beta) consists of a managed Selium Server, authentication, certificate management, dashboard and technical support. If you want a hands-free, production-ready solution for all of your software comms, backed by the people that made Selium, this is for you.

Getting Started

1. Sign up

To get started with Selium Cloud, you'll need an account! Selium Cloud is free for life, and super affordable as you grow with us.

2. Create your first certificate

Once you've got your account, login to the Selium Cloud dashboard at cloud.selium.io. From here, you can create TLS certificates for each client on your account.

After logging in, you should see a screen like this:

Dashboard

To create a certificate, fill in a name for your client, e.g. "web1.example.com", then click the submit button. After a moment you should see a dialog box like this:

Add client

Make sure you download the private key as you cannot download it again!

Add the public and private keys to your project and you're ready to start using Selium.

Basic Usage

Selium Cloud's API feels just like running your own Selium server. The main difference is when establishing a connection to the server. Instead of using selium::custom() to connect to your own server, use selium::cloud() to connect to the cloud:

#![allow(unused)]
fn main() {
let connection = selium::cloud()
    .with_cert_and_key(
        "./web1.example.com.der",
        "./web1.example.com.key.der",
    )?
    .connect()
    .await?;
}

Note that when using selium::cloud(), you won't need to specify a CA path or endpoint. These details are baked into the Selium client for your convenience.

Namespaces

Each Selium Cloud account is linked to a unique namespace. This is used to distinguish your data from other accounts, and is linked to every certificate you create. You can find your namespace on the Selium Cloud dashboard:

Namespace

To use your namespace, simply prepend it to every topic name. For example, to publish the topic "retail-transactions", you would code the following:

#![allow(unused)]
fn main() {
let mut publisher = connection
    .publisher("/example/retail-transactions")
    ...
    .await?;
}

IMPORTANT NOTE!

You must compile your code with the --release flag for both testing and production use. This is so the Selium client knows to use the production Certificate Authority!

If you don't do this, you will not be able to connect to Selium.

N.B. We know this isn't ideal, and likely we'll be removing this restriction in future versions. We'd love your feedback on this too!