Add integration tests
This commit is contained in:
parent
ff3c0a1b9f
commit
9757c34fe7
6 changed files with 261 additions and 13 deletions
2
.envrc
Normal file
2
.envrc
Normal file
|
@ -0,0 +1,2 @@
|
|||
dotenv_if_exists
|
||||
use nix -p redis cargo-llvm-cov
|
2
.gitignore
vendored
2
.gitignore
vendored
|
@ -1,2 +1,4 @@
|
|||
/target
|
||||
/Cargo.lock
|
||||
/.direnv
|
||||
/.env
|
||||
|
|
|
@ -13,3 +13,6 @@ postcard = { version = "1.0.4", default-features = false, features = ["use-std"]
|
|||
redis = { version = "0.22.3", default-features = false, features = ["tokio-comp"] }
|
||||
serde = { version = "1.0.158", default-features = false }
|
||||
thiserror = { version = "1.0.40", default-features = false }
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { version = "1.26.0", default-features = false, features = ["macros", "rt-multi-thread", "time", "parking_lot"] }
|
||||
|
|
|
@ -48,6 +48,7 @@ where
|
|||
&self,
|
||||
key: K,
|
||||
tags: &[&str],
|
||||
ttl: Option<Duration>,
|
||||
func: F,
|
||||
) -> Result<T, AsyncRedisCacheError>
|
||||
where
|
||||
|
@ -59,15 +60,51 @@ where
|
|||
&mut self.conn.clone(),
|
||||
&self.namespace,
|
||||
key,
|
||||
self.default_ttl,
|
||||
tags,
|
||||
ttl.unwrap_or(self.default_ttl),
|
||||
func,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Get a specific cache entry by its key.
|
||||
pub async fn get<T, K>(&self, key: K) -> Result<Option<T>, AsyncRedisCacheError>
|
||||
where
|
||||
T: Serialize + DeserializeOwned,
|
||||
K: Serialize,
|
||||
{
|
||||
match get::<Vec<u8>>(&mut self.conn.clone(), &self.namespace, &make_key(key)?).await? {
|
||||
Some(serialized) => Ok(Some(postcard::from_bytes(&serialized)?)),
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
/// Insert a new or update an existing cache entry.
|
||||
pub async fn put<T, K>(
|
||||
&self,
|
||||
key: K,
|
||||
value: T,
|
||||
tags: &[&str],
|
||||
ttl: Option<Duration>,
|
||||
) -> Result<(), AsyncRedisCacheError>
|
||||
where
|
||||
T: Serialize + DeserializeOwned,
|
||||
K: Serialize,
|
||||
{
|
||||
let serialized = postcard::to_stdvec(&value)?;
|
||||
Ok(put(
|
||||
&mut self.conn.clone(),
|
||||
&self.namespace,
|
||||
&make_key(key)?,
|
||||
&serialized,
|
||||
tags,
|
||||
ttl.unwrap_or(self.default_ttl),
|
||||
)
|
||||
.await?)
|
||||
}
|
||||
|
||||
/// Invalidate a specific cache entry by its key.
|
||||
pub async fn invalidate_key<K>(&self, key: K) -> Result<(), AsyncRedisCacheError>
|
||||
pub async fn pop_key<K>(&self, key: K) -> Result<(), AsyncRedisCacheError>
|
||||
where
|
||||
K: Serialize,
|
||||
{
|
||||
|
@ -75,12 +112,12 @@ where
|
|||
}
|
||||
|
||||
/// Invalidate all cache entries that are associated with the given tag.
|
||||
pub async fn invalidate_tag(&self, tag: &str) -> Result<(), AsyncRedisCacheError> {
|
||||
pub async fn pop_tag(&self, tag: &str) -> Result<(), AsyncRedisCacheError> {
|
||||
Ok(pop_tag(&mut self.conn.clone(), &self.namespace, tag).await?)
|
||||
}
|
||||
|
||||
/// Invalidate all cache entries that are associated with ALL of the given tags.
|
||||
pub async fn invalidate_tags(&self, tags: &[&str]) -> Result<(), AsyncRedisCacheError> {
|
||||
pub async fn pop_tags(&self, tags: &[&str]) -> Result<(), AsyncRedisCacheError> {
|
||||
Ok(pop_tags(&mut self.conn.clone(), &self.namespace, tags).await?)
|
||||
}
|
||||
}
|
||||
|
@ -90,8 +127,8 @@ pub async fn cached<T, K, F>(
|
|||
redis: &mut impl AsyncCommands,
|
||||
namespace: &str,
|
||||
key: K,
|
||||
ttl: Duration,
|
||||
tags: &[&str],
|
||||
ttl: Duration,
|
||||
func: F,
|
||||
) -> Result<T, AsyncRedisCacheError>
|
||||
where
|
||||
|
@ -105,7 +142,7 @@ where
|
|||
}
|
||||
let value = func.await;
|
||||
let serialized = postcard::to_stdvec(&value)?;
|
||||
put(redis, namespace, &key, &serialized, ttl, tags).await?;
|
||||
put(redis, namespace, &key, &serialized, tags, ttl).await?;
|
||||
Ok(value)
|
||||
}
|
||||
|
||||
|
@ -127,8 +164,8 @@ pub async fn put<T: ToRedisArgs>(
|
|||
namespace: &str,
|
||||
key: &str,
|
||||
value: &T,
|
||||
ttl: Duration,
|
||||
tags: &[&str],
|
||||
ttl: Duration,
|
||||
) -> RedisResult<()> {
|
||||
let mut pipe = redis::pipe();
|
||||
pipe.set_ex(
|
||||
|
|
12
src/lib.rs
12
src/lib.rs
|
@ -13,7 +13,7 @@
|
|||
//!
|
||||
//! impl Application {
|
||||
//! async fn cached_function(&self, a: i32, b: i32) -> i32 {
|
||||
//! self.cache.cached((a, b), &["sum"], async move {
|
||||
//! self.cache.cached((a, b), &["sum"], None, async move {
|
||||
//! // expensive computation
|
||||
//! a + b
|
||||
//! })
|
||||
|
@ -28,10 +28,10 @@
|
|||
//! let app = Application {
|
||||
//! cache: AsyncRedisCache::new(conn, "my_application".to_owned(), Duration::from_secs(600)),
|
||||
//! };
|
||||
//! assert_eq!(app.cached_function(1, 2).await, 3); // running expensive computation and filling cache
|
||||
//! assert_eq!(app.cached_function(1, 2).await, 3); // loading result from cache
|
||||
//! app.cache.invalidate_key((1, 2)).await.unwrap(); // invalidate cache by key
|
||||
//! app.cache.invalidate_tag("sum").await.unwrap(); // invalidate cache by tag
|
||||
//! assert_eq!(app.cached_function(1, 2).await, 3); // run expensive computation and fill cache
|
||||
//! assert_eq!(app.cached_function(1, 2).await, 3); // load result from cache
|
||||
//! app.cache.pop_key((1, 2)).await.unwrap(); // invalidate cache by key
|
||||
//! app.cache.pop_tag("sum").await.unwrap(); // invalidate cache by tag
|
||||
//! # };
|
||||
//! ```
|
||||
|
||||
|
@ -45,5 +45,5 @@ use serde::Serialize;
|
|||
pub mod async_redis;
|
||||
|
||||
fn make_key(key: impl Serialize) -> Result<String, postcard::Error> {
|
||||
Ok(general_purpose::STANDARD.encode(postcard::to_stdvec(&key)?))
|
||||
Ok(general_purpose::STANDARD_NO_PAD.encode(postcard::to_stdvec(&key)?))
|
||||
}
|
||||
|
|
204
tests/async_redis.rs
Normal file
204
tests/async_redis.rs
Normal file
|
@ -0,0 +1,204 @@
|
|||
use std::time::{Duration, Instant};
|
||||
|
||||
use fnct::async_redis::AsyncRedisCache;
|
||||
use redis::{aio::MultiplexedConnection, Client};
|
||||
use tokio::sync::OnceCell;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_cached() {
|
||||
let cache = get_cache("test_cached").await;
|
||||
|
||||
struct App {
|
||||
cache: AsyncRedisCache<MultiplexedConnection>,
|
||||
}
|
||||
|
||||
impl App {
|
||||
async fn expensive_computation(&self, a: i32, b: i32) -> i32 {
|
||||
self.cache
|
||||
.cached((a, b), &[], None, async {
|
||||
tokio::time::sleep(Duration::from_secs(3)).await;
|
||||
a + b
|
||||
})
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
let app = App { cache };
|
||||
|
||||
let now = Instant::now();
|
||||
assert_eq!(app.expensive_computation(1, 2).await, 3);
|
||||
assert!(now.elapsed() >= Duration::from_secs(3));
|
||||
|
||||
let now = Instant::now();
|
||||
assert_eq!(app.expensive_computation(1, 2).await, 3);
|
||||
assert_eq!(app.expensive_computation(1, 2).await, 3);
|
||||
assert_eq!(app.expensive_computation(1, 2).await, 3);
|
||||
assert!(now.elapsed() < Duration::from_secs(3));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_basic_insert_expire() {
|
||||
let cache = get_cache("test_basic_insert_expire").await;
|
||||
assert_eq!(cache.get::<String, _>("foo").await.unwrap(), None);
|
||||
assert_eq!(cache.get::<String, _>("asdf").await.unwrap(), None);
|
||||
cache
|
||||
.put("foo", "bar".to_owned(), &[], Some(Duration::from_secs(1)))
|
||||
.await
|
||||
.unwrap();
|
||||
cache
|
||||
.put("asdf", "baz".to_owned(), &[], Some(Duration::from_secs(2)))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(cache.get("foo").await.unwrap(), Some("bar".to_owned()));
|
||||
assert_eq!(cache.get("asdf").await.unwrap(), Some("baz".to_owned()));
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
assert_eq!(cache.get::<String, _>("foo").await.unwrap(), None);
|
||||
assert_eq!(cache.get("asdf").await.unwrap(), Some("baz".to_owned()));
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
assert_eq!(cache.get::<String, _>("foo").await.unwrap(), None);
|
||||
assert_eq!(cache.get::<String, _>("asdf").await.unwrap(), None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_delete_by_key() {
|
||||
let cache = get_cache("test_delete_by_key").await;
|
||||
cache.put("foo", "bar".to_owned(), &[], None).await.unwrap();
|
||||
cache
|
||||
.put("asdf", "baz".to_owned(), &[], None)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(cache.get("foo").await.unwrap(), Some("bar".to_owned()));
|
||||
assert_eq!(cache.get("asdf").await.unwrap(), Some("baz".to_owned()));
|
||||
cache.pop_key("asdf").await.unwrap();
|
||||
assert_eq!(cache.get("foo").await.unwrap(), Some("bar".to_owned()));
|
||||
assert_eq!(cache.get::<String, _>("asdf").await.unwrap(), None);
|
||||
cache.pop_key("foo").await.unwrap();
|
||||
assert_eq!(cache.get::<String, _>("foo").await.unwrap(), None);
|
||||
assert_eq!(cache.get::<String, _>("asdf").await.unwrap(), None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_delete_by_tag() {
|
||||
let cache = get_cache("test_delete_by_tag").await;
|
||||
cache
|
||||
.put("foo", 1, &["t1", "t2", "t3"], None)
|
||||
.await
|
||||
.unwrap();
|
||||
cache.put("bar", 2, &["t1", "t2"], None).await.unwrap();
|
||||
cache.put("baz", 3, &["t1", "t3"], None).await.unwrap();
|
||||
assert_eq!(cache.get("foo").await.unwrap(), Some(1));
|
||||
assert_eq!(cache.get("bar").await.unwrap(), Some(2));
|
||||
assert_eq!(cache.get("baz").await.unwrap(), Some(3));
|
||||
cache.pop_tag("t3").await.unwrap();
|
||||
assert_eq!(cache.get::<i32, _>("foo").await.unwrap(), None);
|
||||
assert_eq!(cache.get("bar").await.unwrap(), Some(2));
|
||||
assert_eq!(cache.get::<i32, _>("baz").await.unwrap(), None);
|
||||
cache.pop_tag("t2").await.unwrap();
|
||||
assert_eq!(cache.get::<i32, _>("foo").await.unwrap(), None);
|
||||
assert_eq!(cache.get::<i32, _>("bar").await.unwrap(), None);
|
||||
assert_eq!(cache.get::<i32, _>("baz").await.unwrap(), None);
|
||||
|
||||
cache
|
||||
.put("foo", 1, &["t1", "t2", "t3"], None)
|
||||
.await
|
||||
.unwrap();
|
||||
cache.put("bar", 2, &["t1", "t2"], None).await.unwrap();
|
||||
cache.put("baz", 3, &["t1", "t3"], None).await.unwrap();
|
||||
assert_eq!(cache.get("foo").await.unwrap(), Some(1));
|
||||
assert_eq!(cache.get("bar").await.unwrap(), Some(2));
|
||||
assert_eq!(cache.get("baz").await.unwrap(), Some(3));
|
||||
cache.pop_tag("t1").await.unwrap();
|
||||
assert_eq!(cache.get::<i32, _>("foo").await.unwrap(), None);
|
||||
assert_eq!(cache.get::<i32, _>("bar").await.unwrap(), None);
|
||||
assert_eq!(cache.get::<i32, _>("baz").await.unwrap(), None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_delete_by_tags_intersect() {
|
||||
let cache = get_cache("test_delete_by_tags_intersect").await;
|
||||
cache
|
||||
.put("foo", 1, &["t1", "t2", "t3"], None)
|
||||
.await
|
||||
.unwrap();
|
||||
cache.put("bar", 2, &["t1", "t2"], None).await.unwrap();
|
||||
cache.put("baz", 3, &["t1", "t3"], None).await.unwrap();
|
||||
assert_eq!(cache.get("foo").await.unwrap(), Some(1));
|
||||
assert_eq!(cache.get("bar").await.unwrap(), Some(2));
|
||||
assert_eq!(cache.get("baz").await.unwrap(), Some(3));
|
||||
cache.pop_tags(&["t2", "t3"]).await.unwrap();
|
||||
assert_eq!(cache.get::<i32, _>("foo").await.unwrap(), None);
|
||||
assert_eq!(cache.get("bar").await.unwrap(), Some(2));
|
||||
assert_eq!(cache.get("baz").await.unwrap(), Some(3));
|
||||
cache.pop_tags(&["t1", "t2"]).await.unwrap();
|
||||
assert_eq!(cache.get::<i32, _>("foo").await.unwrap(), None);
|
||||
assert_eq!(cache.get::<i32, _>("bar").await.unwrap(), None);
|
||||
assert_eq!(cache.get("baz").await.unwrap(), Some(3));
|
||||
|
||||
cache
|
||||
.put("foo", 1, &["t1", "t2", "t3"], None)
|
||||
.await
|
||||
.unwrap();
|
||||
cache.put("bar", 2, &["t1", "t2"], None).await.unwrap();
|
||||
cache.put("baz", 3, &["t1", "t3"], None).await.unwrap();
|
||||
assert_eq!(cache.get("foo").await.unwrap(), Some(1));
|
||||
assert_eq!(cache.get("bar").await.unwrap(), Some(2));
|
||||
assert_eq!(cache.get("baz").await.unwrap(), Some(3));
|
||||
cache.pop_tags(&["t1", "t3"]).await.unwrap();
|
||||
assert_eq!(cache.get::<i32, _>("foo").await.unwrap(), None);
|
||||
assert_eq!(cache.get("bar").await.unwrap(), Some(2));
|
||||
assert_eq!(cache.get::<i32, _>("baz").await.unwrap(), None);
|
||||
|
||||
cache
|
||||
.put("foo", 1, &["t1", "t2", "t3"], None)
|
||||
.await
|
||||
.unwrap();
|
||||
cache.put("bar", 2, &["t1", "t2"], None).await.unwrap();
|
||||
cache.put("baz", 3, &["t1", "t3"], None).await.unwrap();
|
||||
cache.put("xy", 4, &["t4"], None).await.unwrap();
|
||||
assert_eq!(cache.get("foo").await.unwrap(), Some(1));
|
||||
assert_eq!(cache.get("bar").await.unwrap(), Some(2));
|
||||
assert_eq!(cache.get("baz").await.unwrap(), Some(3));
|
||||
cache.pop_tags(&["t1"]).await.unwrap();
|
||||
assert_eq!(cache.get::<i32, _>("foo").await.unwrap(), None);
|
||||
assert_eq!(cache.get::<i32, _>("bar").await.unwrap(), None);
|
||||
assert_eq!(cache.get::<i32, _>("baz").await.unwrap(), None);
|
||||
assert_eq!(cache.get("xy").await.unwrap(), Some(4));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_delete_by_tags_all() {
|
||||
let cache = get_cache("test_delete_by_tags_all").await;
|
||||
|
||||
cache
|
||||
.put("foo", 1, &["t1", "t2", "t3"], None)
|
||||
.await
|
||||
.unwrap();
|
||||
cache.put("bar", 2, &["t1", "t2"], None).await.unwrap();
|
||||
cache.put("baz", 3, &["t1", "t3"], None).await.unwrap();
|
||||
assert_eq!(cache.get("foo").await.unwrap(), Some(1));
|
||||
assert_eq!(cache.get("bar").await.unwrap(), Some(2));
|
||||
assert_eq!(cache.get("baz").await.unwrap(), Some(3));
|
||||
cache.pop_tags(&[]).await.unwrap();
|
||||
assert_eq!(cache.get::<i32, _>("foo").await.unwrap(), None);
|
||||
assert_eq!(cache.get::<i32, _>("bar").await.unwrap(), None);
|
||||
assert_eq!(cache.get::<i32, _>("baz").await.unwrap(), None);
|
||||
}
|
||||
|
||||
async fn get_cache(namespace: &str) -> AsyncRedisCache<MultiplexedConnection> {
|
||||
let client =
|
||||
Client::open(std::env::var("REDIS_SERVER").unwrap_or("redis://127.0.0.1:6379/0".into()))
|
||||
.unwrap();
|
||||
let mut conn = client.get_multiplexed_async_connection().await.unwrap();
|
||||
|
||||
static ONCE: OnceCell<()> = OnceCell::const_new();
|
||||
ONCE.get_or_init(|| async {
|
||||
redis::cmd("FLUSHDB")
|
||||
.query_async::<_, ()>(&mut conn)
|
||||
.await
|
||||
.unwrap();
|
||||
})
|
||||
.await;
|
||||
|
||||
AsyncRedisCache::new(conn, namespace.into(), Duration::from_secs(20))
|
||||
}
|
Reference in a new issue