Tutorial: Build a Simple Rust App and Connect It to ScyllaDB NoSQL
Learn how to build a simple Rust application that will connect to a NoSQL database cluster and perform basic queries.
Join the DZone community and get the full member experience.
Join For FreeScyllaDB, an open-source database for data-intensive applications that require high performance and low latency, is an excellent match for Rust. Similar to the Rust programming language and the Tokio framework, ScyllaDB is built on an asynchronous, non-blocking runtime that works well for building highly-reliable low-latency distributed applications.
The ScyllaDB team has developed the scylla-rust-driver, an open-source ScyllaDB (and Apache Cassandra) driver for Rust. It’s written in pure Rust with a fully async API using Tokio. You can read more regarding its benchmark results, plus how our developers solved a performance regression.
We recently developed a new free training lesson on using the new driver for interacting with a ScyllaDB cluster. In this article, I’ll cover the essential parts of the lesson in which you’ll build a simple Rust application that will connect to a ScyllaDB cluster and perform basic queries.
Why Rust?
Before we dive into the new Rust lessons, let’s address the obvious question: why Rust?
Rust is a modern, performant language that is gaining popularity and becoming more widely used. It’s a systems programming language. However, you can pretty much develop anything with it. It’s built to run fast and safe, preventing most crashes since all memory accesses are checked. It also eliminates data races.
Moreover, Rust also implements a unique and interesting async model. Namely, Rust’s futures represent computations, and the responsibility to move these asynchronous computations forward belongs to the programmer. That allows creating async programs in a very efficient way, minimizing the need for allocations, since the state machine represented by Rust’s async functions is known at compile time.
Now, onto the new Rust lesson…
Creating the Data Schema
The sample Rust application for our lesson will be able to store and query temperature time-series data. Each measurement will contain the following information:
- The sensor ID for the sensor that measured the temperature
- The time the temperature was measured
- The temperature value
First, create a keyspace called tutorial
:
CREATE KEYSPACE IF NOT EXISTS tutorial
WITH REPLICATION = {
'class': 'SimpleStrategy',
'replication_factor': 1
};
Based on the desired query being the temperature reported by a specific device for a given time interval, create the following table:
CREATE TABLE IF NOT EXISTS tutorial.temperature (
device UUID,
time timestamp,
temperature smallint,
PRIMARY KEY(device, time)
);
The application you’re building will be able to query all temperatures measured by a given device within a selected time frame. That’s why you will use the following SELECT
query:
SELECT * FROM tutorial.temperature
WHERE device = ?
AND time > ?
AND time < ?;
Where ?
will be replaced with actual values: device ID, time-from, and time-to, respectively.
Connecting to the Database With Rust
The application name is temperature, and the required dependencies are defined in the Cargo.toml file:
uuid = {version = "0.8", features = ["v4"]}
tokio = {version = "1.1.0", features = ["full"]}
scylla = "0.3.1"
futures = "0.3.6"
chrono = "0.4.0"
Where:
- uuid – Package that provides UUID.
- tokio – Provides the async runtime to execute database queries in.
- scylla – Rust ScyllaDB/Casandra driver.
- chrono – Package for working with time.
The main
function works asynchronously by using tokio
. The following makes sure it returns the result:
#[tokio::main]
async fn main() -> Result<()> {
...
}
The file /src/db.rs
will hold the logic for working with the ScyllaDB instance. The first step is to establish a database session.
use scylla::{Session, SessionBuilder};
use crate::Result;
pub async fn create_session(uri: &str) -> Result<Session> {
SessionBuilder::new()
.known_node(uri)
.build()
.await
.map_err(From::from)
}
To initialize the session:
#[tokio::main]
async fn main() -> Result<()> {
println!("connecting to db");
let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string());
let session = db::create_session(&uri).await?;
todo!()
}
Notice the .await
after create_session
. That’s because async
functions return a Future. Futures can be await
-ed inside other async
functions to get their actual value, which in this case is Result<Session, Error>
. And lastly, with the ?
after await
we are making sure that if we get back an error instead of a session from create_session
, the error will be propagated up, and the application will terminate, printing the error.
Next, the file /src/db.rs
, defines functions for creating the keyspace and table to store temperature measurements. You’ll use queries for creating the keyspace and a table:
use scylla::{IntoTypedRows, Session, SessionBuilder};
use uuid::Uuid;
use crate::{Duration, Result, TemperatureMeasurement};
static CREATE_KEYSPACE_QUERY: &str = r#"
CREATE KEYSPACE IF NOT EXISTS tutorial
WITH REPLICATION = {
'class': 'SimpleStrategy',
'replication_factor': 1
};
"#;
static CREATE_TEMPERATURE_TABLE_QUERY: &str = r#"
CREATE TABLE IF NOT EXISTS tutorial.temperature (
device UUID,
time timestamp,
temperature smallint,
PRIMARY KEY(device, time)
);
"#;
pub async fn initialize(session: &Session) -> Result<()> {
create_keyspace(session).await?;
create_temperature_table(session).await?;
Ok(())
}
async fn create_keyspace(session: &Session) -> Result<()> {
session
.query(CREATE_KEYSPACE_QUERY, ())
.await
.map(|_| ())
.map_err(From::from)
}
async fn create_temperature_table(session: &Session) -> Result<()> {
session
.query(CREATE_TEMPERATURE_TABLE_QUERY, ())
.await
.map(|_| ())
.map_err(From::from)
}
The file /src/db.rs
, defines the insert query. ScyllaDB will use each value as a replacement for ?
:
static ADD_MEASUREMENT_QUERY: &str = r#"
INSERT INTO tutorial.temperature (device, time, temperature)
VALUES (?, ?, ?);
"#;
pub async fn add_measurement(session: &Session, measurement: TemperatureMeasurement) -> Result<()> {
session
.query(ADD_MEASUREMENT_QUERY, measurement)
.await
.map(|_| ())
.map_err(From::from)
}
Reading Measurements
Next, the select-query logic is defined in the /src/db.rs
module:
static SELECT_MEASUREMENTS_QUERY: &str = r#"
SELECT * FROM fast_logger.temperature
WHERE device = ?
AND time > ?
AND time < ?;
"#;
pub async fn select_measurements(
session: &Session,
device: Uuid,
time_from: Duration,
time_to: Duration,
) -> Result<Vec<TemperatureMeasurement>> {
session
.query(SELECT_MEASUREMENTS_QUERY, (device, time_from, time_to))
.await?
.rows
.unwrap_or_default()
.into_typed::<TemperatureMeasurement>()
.map(|v| v.map_err(From::from))
.collect()
}
The important steps are:
- Make a select query with the specified parameters (device ID, start and end date).
- Await the response and convert it into rows.
- The rows might be empty.
unwrap_or_default
ensures that you will get an emptyVec
if that’s the case. - Once the rows are obtained, convert each row by using
into_typed::<TemperatureMeasurement>()
, which will use theFromRow
derive macro. - Since
into_typed
returns aResult
, that means converting each result might fail. With.map(|v| v.map_err(From::from))
you ensure that each row’s error will be converted to the generic error defined in/src/result.rs
. - Finally,
collect
saves the iterated values to a vector.
Now, back in /src/main.rs
you can see the rest of the main
function, imports, and modules:
use uuid::Uuid;
use crate::duration::Duration;
use crate::result::Result;
use crate::temperature_measurement::TemperatureMeasurement;
mod db;
mod duration;
mod result;
mod temperature_measurement;
#[tokio::main]
async fn main() -> Result<()> {
println!("connecting to db");
let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string());
let session = db::create_session(&uri).await?;
db::initialize(&session).await?;
println!("Adding measurements");
let measurement = TemperatureMeasurement {
device: Uuid::parse_str("72f6d49c-76ea-44b6-b1bb-9186704785db")?,
time: Duration::seconds(1000000000001),
temperature: 40,
};
db::add_measurement(&session, measurement).await?;
let measurement = TemperatureMeasurement {
device: Uuid::parse_str("72f6d49c-76ea-44b6-b1bb-9186704785db")?,
time: Duration::seconds(1000000000003),
temperature: 60,
};
db::add_measurement(&session, measurement).await?;
println!("Selecting measurements");
let measurements = db::select_measurements(
&session,
Uuid::parse_str("72f6d49c-76ea-44b6-b1bb-9186704785db")?,
Duration::seconds(1000000000000),
Duration::seconds(10000000000009),
)
.await?;
println!(" >> Measurements: {:?}", measurements);
Ok(())
}
Additional Rust Learning Opportunities
Check out the full Rust tutorial on ScyllaDB University to see the full code and run the example on your own.
Published at DZone with permission of Guy Shtub. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments