Stream and Sink traits

If you've ever used Rust's futures or tokio crates, you'll almost certainly be aware of the Stream and Sink traits. These traits govern the sending and receiving of messages between components, and are arguably a de facto standard in Rust.

If you've never used these traits before, don't worry! Not only are they quite intuitive, but the futures crate has great documentation explaining how they work.

The good news for Selium users, is that Selium's Publisher and Subscriber both implement Sink and Stream respectively. Thus they should fit hand in glove with your existing streaming applications.

You can also make use of the futures crate's StreamExt and SinkExt extensions. These traits are implemented by default on any Stream/Sink implementations (like Selium's) and contain lots of helpful tools.

Let's go back to our example from previous chapters and see what we can do with SinkExt on a Publisher:

use futures::{future, stream, SinkExt, Stream, StreamExt};
use selium::{prelude::*, std::codecs::StringCodec};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let connection = selium::custom()
        .endpoint("127.0.0.1:7001")
        .with_certificate_authority("certs/client/ca.der")? // your Selium cert authority
        .with_cert_and_key(
            "certs/client/localhost.der",
            "certs/client/localhost.key.der",
        )? // your client certificates
        .connect() // your Selium server's address
        .await?;

    let publisher = connection
        .publisher("/some/topic") // choose a topic to group similar messages together
        .with_encoder(StringCodec) // allows you to exchange string messages between clients
        .open() // opens a new stream for sending data
        .await?;

    let mut sink = publisher.with(|item: String| {
        if item.contains('@') {
            future::ok(item)
        } else {
            future::ok("default_email@company.org".into())
        }
    });
    sink.send_all(&mut some_stream()).await?;

    Ok(())
}

fn some_stream() -> impl Stream<Item = Result<String, Box<dyn std::error::Error>>> {
    stream::iter(vec![
        "hello@world.com".into(),
        "some@example.net".into(),
        "notanemail.com".into(),
    ])
    .map(|email| Ok(email))
}

In this example, we take a stream of email addresses and optionally map invalid email addresses to a default email address. Of course, this is not a very useful example, but the Rust ecosystem is full of Streams and Sinks. Selium's native support for these traits allows you to slot Selium client into your libraries and applications with ease.

Next, let's get to grips with codecs.