WIP use Redis cluster client for memorydb

This commit is contained in:
Simon Bihel 2021-12-16 19:02:08 +00:00
parent 0287a60296
commit ba8a31386d
No known key found for this signature in database
GPG Key ID: B7013150BEAA28FD
5 changed files with 111 additions and 315 deletions

293
Cargo.lock generated
View File

@ -35,66 +35,6 @@ version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6"
[[package]]
name = "async-channel"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2114d64672151c0c5eaa5e131ec84a74f06e1e559830dabba01ca30605d66319"
dependencies = [
"concurrent-queue",
"event-listener",
"futures-core",
]
[[package]]
name = "async-executor"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "871f9bb5e0a22eeb7e8cf16641feb87c9dc67032ccf8ff49e772eb9941d3a965"
dependencies = [
"async-task",
"concurrent-queue",
"fastrand",
"futures-lite",
"once_cell",
"slab",
]
[[package]]
name = "async-global-executor"
version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9586ec52317f36de58453159d48351bc244bc24ced3effc1fce22f3d48664af6"
dependencies = [
"async-channel",
"async-executor",
"async-io",
"async-mutex",
"blocking",
"futures-lite",
"num_cpus",
"once_cell",
]
[[package]]
name = "async-io"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a811e6a479f2439f0c04038796b5cfb3d2ad56c230e0f2d3f7b04d68cfee607b"
dependencies = [
"concurrent-queue",
"futures-lite",
"libc",
"log",
"once_cell",
"parking",
"polling",
"slab",
"socket2",
"waker-fn",
"winapi",
]
[[package]]
name = "async-lock"
version = "2.4.0"
@ -104,25 +44,6 @@ dependencies = [
"event-listener",
]
[[package]]
name = "async-mutex"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "479db852db25d9dbf6204e6cb6253698f175c15726470f78af0d918e99d6156e"
dependencies = [
"event-listener",
]
[[package]]
name = "async-redis-session"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba82ce101e6cde598074604ef4a882bdd6b3a283baff446ae73ae2727c242452"
dependencies = [
"async-session",
"redis 0.20.2",
]
[[package]]
name = "async-session"
version = "3.0.0"
@ -144,39 +65,6 @@ dependencies = [
"sha2",
]
[[package]]
name = "async-std"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8056f1455169ab86dd47b47391e4ab0cbd25410a70e9fe675544f49bafaf952"
dependencies = [
"async-channel",
"async-global-executor",
"async-io",
"async-lock",
"crossbeam-utils",
"futures-channel",
"futures-core",
"futures-io",
"futures-lite",
"gloo-timers",
"kv-log-macro",
"log",
"memchr",
"num_cpus",
"once_cell",
"pin-project-lite",
"pin-utils",
"slab",
"wasm-bindgen-futures",
]
[[package]]
name = "async-task"
version = "4.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0"
[[package]]
name = "async-trait"
version = "0.1.51"
@ -197,12 +85,6 @@ dependencies = [
"autocfg 1.0.1",
]
[[package]]
name = "atomic-waker"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "065374052e7df7ee4047b1160cca5e1467a12351a40b3da123c870ba0b8eda2a"
[[package]]
name = "autocfg"
version = "0.1.7"
@ -276,17 +158,6 @@ dependencies = [
"tokio",
]
[[package]]
name = "bb8-redis"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c440295545cb69b3cec992ae8844fbb1de1c84f2f90248438af287e14bb09bde"
dependencies = [
"async-trait",
"bb8",
"redis 0.21.4",
]
[[package]]
name = "bincode"
version = "1.3.3"
@ -368,20 +239,6 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d696c370c750c948ada61c69a0ee2cbbb9c50b1019ddb86d9317157a99c2cae"
[[package]]
name = "blocking"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "046e47d4b2d391b1f6f8b407b1deb8dee56c1852ccd868becf2710f601b5f427"
dependencies = [
"async-channel",
"async-task",
"atomic-waker",
"fastrand",
"futures-lite",
"once_cell",
]
[[package]]
name = "bumpalo"
version = "3.8.0"
@ -409,12 +266,6 @@ dependencies = [
"serde",
]
[[package]]
name = "cache-padded"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "631ae5198c9be5e753e5cc215e1bd73c2b466a3565173db433f52bb9d3e66dba"
[[package]]
name = "cc"
version = "1.0.72"
@ -461,15 +312,6 @@ dependencies = [
"tokio-util",
]
[[package]]
name = "concurrent-queue"
version = "1.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30ed07550be01594c6026cff2a1d7fe9c8f683caa798e12b68694ac9e88286a3"
dependencies = [
"cache-padded",
]
[[package]]
name = "const-oid"
version = "0.6.2"
@ -513,6 +355,12 @@ dependencies = [
"libc",
]
[[package]]
name = "crc16"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "338089f42c427b86394a5ee60ff321da23a5c89c9d89514c829687b26359fcff"
[[package]]
name = "crossbeam-utils"
version = "0.8.5"
@ -573,16 +421,6 @@ dependencies = [
"subtle",
]
[[package]]
name = "ctor"
version = "0.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ccc0a48a9b826acdf4028595adc9db92caea352f7af011a3034acd172a52a0aa"
dependencies = [
"quote",
"syn",
]
[[package]]
name = "der"
version = "0.4.5"
@ -752,15 +590,6 @@ version = "2.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7531096570974c3a9dcf9e4b8e1cede1ec26cf5046219fb3b9d897503b9be59"
[[package]]
name = "fastrand"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b394ed3d285a429378d3b384b9eb1285267e7df4b166df24b7a6939a04dc392e"
dependencies = [
"instant",
]
[[package]]
name = "ff"
version = "0.10.1"
@ -840,21 +669,6 @@ version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e481354db6b5c353246ccf6a728b0c5511d752c08da7260546fc0933869daa11"
[[package]]
name = "futures-lite"
version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7694489acd39452c77daa48516b894c153f192c3578d5a839b62c58099fcbf48"
dependencies = [
"fastrand",
"futures-core",
"futures-io",
"memchr",
"parking",
"pin-project-lite",
"waker-fn",
]
[[package]]
name = "futures-sink"
version = "0.3.18"
@ -907,19 +721,6 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "gloo-timers"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f16c88aa13d2656ef20d1c042086b8767bbe2bdb62526894275a1b062161b2e"
dependencies = [
"futures-channel",
"futures-core",
"js-sys",
"wasm-bindgen",
"web-sys",
]
[[package]]
name = "group"
version = "0.10.0"
@ -1215,15 +1016,6 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67c21572b4949434e4fc1e1978b99c5f77064153c59d998bf13ecd96fb5ecba7"
[[package]]
name = "kv-log-macro"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f"
dependencies = [
"log",
]
[[package]]
name = "lazy_static"
version = "1.4.0"
@ -1261,7 +1053,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710"
dependencies = [
"cfg-if 1.0.0",
"value-bag",
]
[[package]]
@ -1512,12 +1303,6 @@ dependencies = [
"syn",
]
[[package]]
name = "parking"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72"
[[package]]
name = "parking_lot"
version = "0.11.2"
@ -1637,19 +1422,6 @@ dependencies = [
"zeroize",
]
[[package]]
name = "polling"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "685404d509889fade3e86fe3a5803bca2ec09b0c0778d5ada6ec8bf7a8de5259"
dependencies = [
"cfg-if 1.0.0",
"libc",
"log",
"wepoll-ffi",
"winapi",
]
[[package]]
name = "ppv-lite86"
version = "0.2.15"
@ -1768,27 +1540,6 @@ dependencies = [
"rand_core",
]
[[package]]
name = "redis"
version = "0.20.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4f0ceb2ec0dd769483ecd283f6615aa83dcd0be556d5294c6e659caefe7cc54"
dependencies = [
"async-std",
"async-trait",
"bytes",
"combine",
"dtoa",
"futures-util",
"itoa",
"percent-encoding",
"pin-project-lite",
"sha1",
"tokio",
"tokio-util",
"url",
]
[[package]]
name = "redis"
version = "0.21.4"
@ -1798,11 +1549,13 @@ dependencies = [
"async-trait",
"bytes",
"combine",
"crc16",
"dtoa",
"futures-util",
"itoa",
"percent-encoding",
"pin-project-lite",
"rand",
"tokio",
"tokio-util",
"url",
@ -2174,10 +1927,10 @@ name = "siwe-oidc"
version = "0.1.0"
dependencies = [
"anyhow",
"async-redis-session",
"async-session",
"async-trait",
"axum",
"bb8-redis",
"bb8",
"bincode",
"chrono",
"cookie",
@ -2187,6 +1940,7 @@ dependencies = [
"iri-string",
"openidconnect",
"rand",
"redis",
"rsa",
"rust-argon2",
"serde",
@ -2741,28 +2495,12 @@ dependencies = [
"serde",
]
[[package]]
name = "value-bag"
version = "1.0.0-alpha.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79923f7731dc61ebfba3633098bf3ac533bbd35ccd8c57e7088d9a5eebe0263f"
dependencies = [
"ctor",
"version_check",
]
[[package]]
name = "version_check"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe"
[[package]]
name = "waker-fn"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca"
[[package]]
name = "want"
version = "0.3.0"
@ -2884,15 +2622,6 @@ dependencies = [
"webpki 0.21.4",
]
[[package]]
name = "wepoll-ffi"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d743fdedc5c64377b5fc2bc036b01c7fd642205a0d96356034ae3404d49eb7fb"
dependencies = [
"cc",
]
[[package]]
name = "winapi"
version = "0.3.9"

View File

@ -35,5 +35,8 @@ figment = { version = "0.10.6", features = ["toml", "env"] }
sha2 = "0.9.0"
cookie = "0.15.1"
bincode = "1.3.3"
bb8-redis = "0.10.1"
async-redis-session = "0.2.2"
# bb8-redis = "0.10.1"
bb8 = "0.7"
# async-redis-session = "0.2.2"
redis = { version = "0.21", default-features = false, features = ["tokio-comp", "cluster"] }
async-trait = "0.1"

View File

@ -1,5 +1,6 @@
use anyhow::{anyhow, Result};
use async_redis_session::RedisSessionStore;
// use async_redis_session::RedisSessionStore;
use async_session::MemoryStore;
use axum::{
body::{Bytes, Full},
error_handling::HandleErrorExt,
@ -12,7 +13,8 @@ use axum::{
routing::{get, post, service_method_routing},
AddExtensionLayer, Json, Router,
};
use bb8_redis::{bb8, bb8::Pool, redis::AsyncCommands, RedisConnectionManager};
use bb8::Pool;
// use bb8_redis::{bb8, bb8::Pool, redis::AsyncCommands, RedisConnectionManager};
use chrono::{Duration, Utc};
use figment::{
providers::{Env, Format, Serialized, Toml},
@ -29,12 +31,13 @@ use openidconnect::{
CoreSubjectIdentifierType, CoreTokenResponse, CoreTokenType, CoreUserInfoClaims,
},
registration::{EmptyAdditionalClientMetadata, EmptyAdditionalClientRegistrationResponse},
AccessToken, Audience, AuthUrl, ClientId, EmptyAdditionalClaims,
AccessToken, Audience, AuthUrl, ClientId, ClientSecret, EmptyAdditionalClaims,
EmptyAdditionalProviderMetadata, EmptyExtraTokenFields, IssuerUrl, JsonWebKeyId,
JsonWebKeySetUrl, Nonce, PrivateSigningKey, RedirectUrl, RegistrationUrl, ResponseTypes, Scope,
StandardClaims, SubjectIdentifier, TokenUrl, UserInfoUrl,
};
use rand::rngs::OsRng;
use redis::Commands;
use rsa::{
pkcs1::{FromRsaPrivateKey, ToRsaPrivateKey},
RsaPrivateKey,
@ -52,8 +55,10 @@ use urlencoding::decode;
use uuid::Uuid;
mod config;
mod redis_conn;
mod session;
use redis_conn::*;
use session::*;
const KID: &str = "key1";
@ -170,7 +175,7 @@ async fn provider_metadata(
#[derive(Deserialize)]
struct TokenForm {
code: String,
code: Uuid,
client_id: String,
client_secret: Option<String>,
grant_type: CoreGrantType, // TODO should just be authorization_code apparently?
@ -198,7 +203,6 @@ async fn token(
if let Some(secret) = form.client_secret.clone() {
let stored_secret: Option<String> = conn
.get(format!("{}/{}", KV_CLIENT_PREFIX, form.client_id))
.await
.map_err(|e| anyhow!("Failed to get kv: {}", e))?;
if stored_secret.is_none() {
Err(CustomError::Unauthorized(
@ -214,7 +218,6 @@ async fn token(
let serialized_entry: Option<Vec<u8>> = conn
.get(form.code.to_string())
.await
.map_err(|e| anyhow!("Failed to get kv: {}", e))?;
if serialized_entry.is_none() {
Err(CustomError::BadRequest("Unknown code.".to_string()))?;
@ -237,7 +240,6 @@ async fn token(
),
ENTRY_LIFETIME,
)
.await
.map_err(|e| anyhow!("Failed to set kv: {}", e))?;
let access_token = AccessToken::new(form.code.to_string().clone());
@ -279,7 +281,7 @@ struct AuthorizeParams {
redirect_uri: RedirectUrl,
scope: Scope,
response_type: CoreResponseType,
state: String,
state: String, // TODO not required
nonce: Option<Nonce>,
}
@ -527,7 +529,6 @@ async fn sign_in(
),
ENTRY_LIFETIME,
)
.await
.map_err(|e| anyhow!("Failed to set kv: {}", e))?;
let mut url = params.redirect_uri.url().clone();
@ -556,7 +557,6 @@ async fn register(
.await
.map_err(|e| anyhow!("Failed to get connection to database: {}", e))?;
conn.set(format!("{}/{}", KV_CLIENT_PREFIX, id), secret.to_string())
.await
.map_err(|e| anyhow!("Failed to set kv: {}", e))?;
Ok(CoreClientRegistrationResponse::new(
@ -565,6 +565,7 @@ async fn register(
EmptyAdditionalClientMetadata::default(),
EmptyAdditionalClientRegistrationResponse::default(),
)
.set_client_secret(Some(ClientSecret::new(secret.to_string())))
.into())
}
@ -583,7 +584,6 @@ async fn userinfo(
.map_err(|e| anyhow!("Failed to get connection to database: {}", e))?;
let serialized_entry: Option<Vec<u8>> = conn
.get(code)
.await
.map_err(|e| anyhow!("Failed to get kv: {}", e))?;
if serialized_entry.is_none() {
Err(CustomError::BadRequest("Unknown code.".to_string()))?;
@ -601,6 +601,7 @@ async fn userinfo(
.into())
}
// TODO ping Redis
async fn healthcheck() {}
#[tokio::main]
@ -614,20 +615,20 @@ async fn main() {
let manager = RedisConnectionManager::new(config.redis_url.clone()).unwrap();
let pool = bb8::Pool::builder().build(manager.clone()).await.unwrap();
let pool2 = bb8::Pool::builder().build(manager).await.unwrap();
// let pool2 = bb8::Pool::builder().build(manager).await.unwrap();
let mut conn = pool2
.get()
.await
.map_err(|e| anyhow!("Failed to get connection to database: {}", e))
.unwrap();
for (id, secret) in &config.default_clients.clone() {
let _: () = conn
.set(format!("{}/{}", KV_CLIENT_PREFIX, id), secret)
.await
.map_err(|e| anyhow!("Failed to set kv: {}", e))
.unwrap();
}
// let mut conn = pool2
// .get()
// .await
// .map_err(|e| anyhow!("Failed to get connection to database: {}", e))
// .unwrap();
// for (id, secret) in &config.default_clients.clone() {
// let _: () = conn
// .set(format!("{}/{}", KV_CLIENT_PREFIX, id), secret)
// .await
// .map_err(|e| anyhow!("Failed to set kv: {}", e))
// .unwrap();
// }
let private_key = if let Some(key) = &config.rsa_pem {
RsaPrivateKey::from_pkcs1_pem(&key)
@ -702,11 +703,12 @@ async fn main() {
.layer(AddExtensionLayer::new(private_key))
.layer(AddExtensionLayer::new(config.clone()))
.layer(AddExtensionLayer::new(pool))
.layer(AddExtensionLayer::new(
RedisSessionStore::new(config.redis_url.clone())
.unwrap()
.with_prefix("async-sessions/"),
))
// .layer(AddExtensionLayer::new(
// RedisSessionStore::new(config.redis_url.clone())
// .unwrap()
// .with_prefix("async-sessions/"),
// ))
.layer(AddExtensionLayer::new(MemoryStore::new()))
.layer(TraceLayer::new_for_http());
let addr = SocketAddr::from((config.address, config.port));

52
src/redis_conn.rs Normal file
View File

@ -0,0 +1,52 @@
use std::ops::DerefMut;
use bb8;
use redis;
use async_trait::async_trait;
use redis::{aio::Connection, ErrorKind};
use redis::{
cluster::{ClusterClient, ClusterConnection},
IntoConnectionInfo, RedisError,
};
/// A `bb8::ManageConnection` for `redis::Client::get_async_connection`.
#[derive(Clone)]
pub struct RedisConnectionManager {
client: ClusterClient,
}
impl RedisConnectionManager {
/// Create a new `RedisConnectionManager`.
/// See `redis::Client::open` for a description of the parameter types.
pub fn new<T: IntoConnectionInfo>(info: T) -> Result<RedisConnectionManager, RedisError> {
Ok(RedisConnectionManager {
client: ClusterClient::open(vec![info.into_connection_info()?])?,
})
}
}
#[async_trait]
impl bb8::ManageConnection for RedisConnectionManager {
type Connection = ClusterConnection;
type Error = RedisError;
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
self.client.get_connection()
}
async fn is_valid(
&self,
conn: &mut bb8::PooledConnection<'_, Self>,
) -> Result<(), Self::Error> {
let pong: String = redis::cmd("PING").query(conn.deref_mut())?;
match pong.as_str() {
"PONG" => Ok(()),
_ => Err((ErrorKind::ResponseError, "ping request").into()),
}
}
fn has_broken(&self, _: &mut Self::Connection) -> bool {
false
}
}

View File

@ -1,4 +1,5 @@
use async_redis_session::RedisSessionStore;
// use async_redis_session::RedisSessionStore;
use async_session::MemoryStore;
use async_session::{Session, SessionStore as _};
use axum::{
async_trait,
@ -28,7 +29,8 @@ where
type Rejection = (StatusCode, String);
async fn from_request(req: &mut RequestParts<B>) -> Result<Self, Self::Rejection> {
let Extension(store) = match Extension::<RedisSessionStore>::from_request(req).await {
// TODO sessions are set without expiry
let Extension(store) = match Extension::<MemoryStore>::from_request(req).await {
Ok(s) => s,
Err(e) => {
return Err((
@ -80,6 +82,14 @@ where
let session = match store.load_session(session_cookie.value().to_string()).await {
Ok(Some(s)) => s,
Err(e) => {
debug!("Could not load session: {}", e);
let mut cookie = session_cookie.clone();
cookie.make_removal();
return Ok(Self::InvalidUserSession(
cookie.to_string().parse().unwrap(),
));
}
_ => {
debug!("Could not load session");
let mut cookie = session_cookie.clone();