start of storage manager

This commit is contained in:
John Smith 2023-04-01 20:04:20 -04:00
parent 1430f3f656
commit c78035a5d9
11 changed files with 420 additions and 41 deletions

View File

@ -27,13 +27,12 @@ struct Nonce24 @0xb6260db25d8d7dfc {
u2 @2 :UInt64;
}
using PublicKey = Key256; # Node id / DHT key / Route id, etc
using PublicKey = Key256; # Node id / Hash / DHT key / Route id, etc
using Nonce = Nonce24; # One-time encryption nonce
using Signature = Signature512; # Signature block
using TunnelID = UInt64; # Id for tunnels
using CryptoKind = UInt32; # FOURCC code for cryptography type
using ValueSeqNum = UInt32; # sequence numbers for values
using ValueSchema = UInt32; # FOURCC code for schema (0 = freeform, SUB0 = subkey control v0)
using Subkey = UInt32; # subkey index for dht
struct TypedKey @0xe2d567a9f1e61b29 {
@ -319,15 +318,31 @@ struct SubkeyRange {
struct ValueData @0xb4b7416f169f2a3d {
seq @0 :ValueSeqNum; # sequence number of value
schema @1 :ValueSchema; # fourcc code of schema for value
data @2 :Data; # value or subvalue contents
data @1 :Data; # value or subvalue contents
owner @2 :PublicKey; # the public key of the owner
writer @3 :PublicKey; # the public key of the writer
signature @5 :Signature; # signature of data at this subkey, using the writer key (which may be the same as the owner key)
# signature covers:
# * ownerKey
# * subkey
# * sequence number
# * data
# signature does not need to cover schema because schema is validated upon every set
# so the data either fits, or it doesn't.
schema @6 :Data; # (optional) the schema in use
# If not set and seqnum == 0, uses the default schema.
# If not set and If seqnum != 0, the schema must have been set prior and no other schema may be used, but this field may be eliminated to save space
# Changing this after key creation is not supported as it would change the dht key
# Schema data is signed by ownerKey and is verified both by set and get operations
}
struct OperationGetValueQ @0xf88a5b6da5eda5d0 {
key @0 :TypedKey; # the location of the value
subkey @1 :Subkey; # the index of the subkey (0 for the default subkey)
wantSchema @2 :bool; # whether or not to include the schema for the key
}
struct OperationGetValueA @0xd896bb46f2e0249f {
union {
data @0 :ValueData; # the value if successful
@ -335,16 +350,17 @@ struct OperationGetValueA @0xd896bb46f2e0249f {
}
}
struct OperationSetValueQ @0xbac06191ff8bdbc5 {
key @0 :TypedKey; # the location of the value
subkey @1 :Subkey; # the index of the subkey (0 for the default subkey)
value @2 :ValueData; # value or subvalue contents (older or equal seq number gets dropped)
struct OperationSetValueQ @0xbac06191ff8bdbc5 {
key @0 :TypedKey; # DHT Key = Hash(ownerKeyKind) of: [ ownerKeyValue, schema ]
subkey @3 :Subkey; # the index of the subkey
value @4 :ValueData; # value or subvalue contents (older or equal seq number gets dropped)
}
struct OperationSetValueA @0x9378d0732dc95be2 {
union {
data @0 :ValueData; # the new value if successful, may be a different value than what was set if the seq number was lower or equal
peers @1 :List(PeerInfo); # returned 'closer peer' information if not successful
schemaError @0 :Void; # Either the schema is not available at the node, or the data does not match the schema that is there
data @1 :ValueData; # the new value if successful, may be a different value than what was set if the seq number was lower or equal
peers @2 :List(PeerInfo); # returned 'closer peer' information if this node is refusing to store the key
}
}
@ -353,6 +369,7 @@ struct OperationWatchValueQ @0xf9a5a6c547b9b228 {
subkeys @1 :List(SubkeyRange); # subkey range to watch, if empty, watch everything
expiration @2 :UInt64; # requested timestamp when this watch will expire in usec since epoch (can be return less, 0 for max)
count @3 :UInt32; # requested number of changes to watch for (0 = cancel, 1 = single shot, 2+ = counter, UINT32_MAX = continuous)
signature @4 :Signature; # signature of the watcher, must be one of the schema members or the key owner. signature covers: key, subkeys, expiration, count
}
struct OperationWatchValueA @0xa726cab7064ba893 {

View File

@ -1,12 +1,16 @@
use crate::api_tracing_layer::*;
use crate::attachment_manager::*;
use crate::crypto::Crypto;
use crate::storage_manager::*;
use crate::veilid_api::*;
use crate::veilid_config::*;
use crate::*;
pub type UpdateCallback = Arc<dyn Fn(VeilidUpdate) + Send + Sync>;
/// Internal services startup mechanism
/// Ensures that everything is started up, and shut down in the right order
/// and provides an atomic state for if the system is properly operational
struct ServicesContext {
pub config: VeilidConfig,
pub update_callback: UpdateCallback,
@ -16,6 +20,7 @@ struct ServicesContext {
pub block_store: Option<BlockStore>,
pub crypto: Option<Crypto>,
pub attachment_manager: Option<AttachmentManager>,
pub storage_manager: Option<StorageManager>,
}
impl ServicesContext {
@ -28,6 +33,7 @@ impl ServicesContext {
block_store: None,
crypto: None,
attachment_manager: None,
storage_manager: None,
}
}
@ -39,6 +45,7 @@ impl ServicesContext {
block_store: BlockStore,
crypto: Crypto,
attachment_manager: AttachmentManager,
storage_manager: StorageManager,
) -> Self {
Self {
config,
@ -48,6 +55,7 @@ impl ServicesContext {
block_store: Some(block_store),
crypto: Some(crypto),
attachment_manager: Some(attachment_manager),
storage_manager: Some(storage_manager),
}
}
@ -114,6 +122,26 @@ impl ServicesContext {
}
self.attachment_manager = Some(attachment_manager);
// Set up storage manager
trace!("init storage manager");
let storage_manager = StorageManager::new(
self.config.clone(),
self.crypto.clone().unwrap(),
self.protected_store.clone().unwrap(),
self.table_store.clone().unwrap(),
self.block_store.clone().unwrap(),
self.attachment_manager
.clone()
.unwrap()
.network_manager()
.rpc_processor(),
);
if let Err(e) = storage_manager.init().await {
self.shutdown().await;
return Err(e);
}
self.storage_manager = Some(storage_manager.clone());
info!("Veilid API startup complete");
Ok(())
}
@ -122,6 +150,10 @@ impl ServicesContext {
pub async fn shutdown(&mut self) {
info!("Veilid API shutting down");
if let Some(storage_manager) = &mut self.storage_manager {
trace!("terminate storage manager");
storage_manager.terminate().await;
}
if let Some(attachment_manager) = &mut self.attachment_manager {
trace!("terminate attachment manager");
attachment_manager.terminate().await;
@ -163,6 +195,7 @@ pub struct VeilidCoreContext {
pub table_store: TableStore,
pub block_store: BlockStore,
pub crypto: Crypto,
pub storage_manager: StorageManager,
pub attachment_manager: AttachmentManager,
}
@ -215,6 +248,7 @@ impl VeilidCoreContext {
table_store: sc.table_store.unwrap(),
block_store: sc.block_store.unwrap(),
crypto: sc.crypto.unwrap(),
storage_manager: sc.storage_manager.unwrap(),
attachment_manager: sc.attachment_manager.unwrap(),
})
}
@ -228,6 +262,7 @@ impl VeilidCoreContext {
self.table_store,
self.block_store,
self.crypto,
self.storage_manager,
self.attachment_manager,
);
sc.shutdown().await;

View File

@ -47,6 +47,12 @@ pub const ROUTE_ID_LENGTH: usize = 32;
/// Length of a route id in bytes afer encoding to base64url
#[allow(dead_code)]
pub const ROUTE_ID_LENGTH_ENCODED: usize = 43;
/// Length of a hash digest in bytes
#[allow(dead_code)]
pub const HASH_DIGEST_LENGTH: usize = 32;
/// Length of a hash digest in bytes after encoding to base64url
#[allow(dead_code)]
pub const HASH_DIGEST_LENGTH_ENCODED: usize = 43;
//////////////////////////////////////////////////////////////////////

View File

@ -55,5 +55,6 @@ pub type TypedKey = CryptoTyped<PublicKey>;
pub type TypedSecret = CryptoTyped<SecretKey>;
pub type TypedKeyPair = CryptoTyped<KeyPair>;
pub type TypedSignature = CryptoTyped<Signature>;
pub type TypedKeySet = CryptoTypedSet<PublicKey>;
pub type TypedSecretSet = CryptoTypedSet<SecretKey>;

View File

@ -28,6 +28,7 @@ mod network_manager;
mod receipt_manager;
mod routing_table;
mod rpc_processor;
mod storage_manager;
mod veilid_api;
#[macro_use]
mod veilid_config;

View File

@ -312,7 +312,7 @@ impl RPCProcessor {
#[instrument(level = "debug", skip_all, err)]
pub async fn startup(&self) -> EyreResult<()> {
trace!("startup rpc processor");
debug!("startup rpc processor");
let mut inner = self.inner.lock();
let channel = flume::bounded(self.unlocked_inner.queue_size as usize);

View File

@ -0,0 +1,154 @@
use super::*;
use crate::rpc_processor::*;
struct StorageManagerInner {}
struct StorageManagerUnlockedInner {
config: VeilidConfig,
crypto: Crypto,
protected_store: ProtectedStore,
table_store: TableStore,
block_store: BlockStore,
rpc_processor: RPCProcessor,
}
#[derive(Clone)]
pub struct StorageManager {
unlocked_inner: Arc<StorageManagerUnlockedInner>,
inner: Arc<Mutex<StorageManagerInner>>,
}
impl StorageManager {
fn new_unlocked_inner(
config: VeilidConfig,
crypto: Crypto,
protected_store: ProtectedStore,
table_store: TableStore,
block_store: BlockStore,
rpc_processor: RPCProcessor,
) -> StorageManagerUnlockedInner {
StorageManagerUnlockedInner {
config,
crypto,
protected_store,
table_store,
block_store,
rpc_processor,
}
}
fn new_inner() -> StorageManagerInner {}
pub fn new(
config: VeilidConfig,
crypto: Crypto,
protected_store: ProtectedStore,
table_store: TableStore,
block_store: BlockStore,
rpc_processor: RPCProcessor,
) -> StorageManager {
StorageManager {
unlocked_inner: Arc::new(Self::new_unlocked_inner(
config,
crypto,
protected_store,
table_store,
block_store,
rpc_processor,
)),
inner: Arc::new(Mutex::new(Self::new_inner())),
}
}
#[instrument(level = "debug", skip_all, err)]
pub async fn init(&self) -> EyreResult<()> {
debug!("startup storage manager");
let mut inner = self.inner.lock();
// xxx
Ok(())
}
pub fn terminate(&self) {
debug!("starting storage manager shutdown");
// Release the storage manager
*self.inner.lock() = Self::new_inner();
debug!("finished storage manager shutdown");
}
/// Creates a new DHT value with a specified crypto kind and schema
/// Returns the newly allocated DHT Key if successful.
pub async fn create_value(
&self,
kind: CryptoKind,
schema: &DHTSchema,
safety_selection: SafetySelection,
) -> Result<TypedKey, VeilidAPIError> {
unimplemented!();
}
/// Opens a DHT value at a specific key. Associates a secret if one is provided to provide writer capability.
/// Returns the DHT key descriptor for the opened key if successful
/// Value may only be opened or created once. To re-open with a different routing context, first close the value.
pub async fn open_value(
key: TypedKey,
secret: Option<SecretKey>,
safety_selection: SafetySelection,
) -> Result<DHTDescriptor, VeilidAPIError> {
unimplemented!();
}
/// Closes a DHT value at a specific key that was opened with create_value or open_value.
/// Closing a value allows you to re-open it with a different routing context
pub async fn close_value(key: TypedKey) -> Result<(), VeilidAPIError> {
unimplemented!();
}
/// Gets the latest value of a subkey from the network
/// Returns the possibly-updated value data of the subkey
pub async fn get_value(
&self,
key: TypedKey,
subkey: ValueSubkey,
force_refresh: bool,
) -> Result<ValueData, VeilidAPIError> {
unimplemented!();
}
/// Pushes a changed subkey value to the network
/// Returns None if the value was successfully put
/// Returns Some(newer_value) if the value put was older than the one available on the network
pub async fn set_value(
&self,
key: TypedKey,
subkey: ValueSubkey,
value_data: ValueData,
) -> Result<Option<ValueData>, VeilidAPIError> {
unimplemented!();
}
/// Watches changes to an opened or created value
/// Changes to subkeys within the subkey range are returned via a ValueChanged callback
/// If the subkey range is empty, all subkey changes are considered
/// Expiration can be infinite to keep the watch for the maximum amount of time
/// Return value upon success is the amount of time allowed for the watch
pub async fn watch_value(
&self,
key: TypedKey,
subkeys: &[ValueSubkeyRange],
expiration: Timestamp,
count: u32,
) -> Result<Timestamp, VeilidAPIError> {
unimplemented!();
}
/// Cancels a watch early
/// This is a convenience function that cancels watching all subkeys in a range
pub async fn cancel_watch_value(
&self,
key: TypedKey,
subkeys: &[ValueSubkeyRange],
) -> Result<bool, VeilidAPIError> {
unimplemented!();
}
}

View File

@ -112,6 +112,13 @@ impl VeilidAPI {
}
Err(VeilidAPIError::NotInitialized)
}
pub fn storage_manager(&self) -> Result<StorageManager, VeilidAPIError> {
let inner = self.inner.lock();
if let Some(context) = &inner.context {
return Ok(context.storage_manager.clone());
}
Err(VeilidAPIError::NotInitialized)
}
////////////////////////////////////////////////////////////////
// Attach/Detach

View File

@ -34,5 +34,6 @@ use rkyv::{Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as
use routing_table::{RouteSpecStore, RoutingTable};
use rpc_processor::*;
use serde::*;
use storage_manager::StorageManager;
/////////////////////////////////////////////////////////////////////////////////////////////////////

View File

@ -197,39 +197,91 @@ impl RoutingContext {
///////////////////////////////////
/// DHT Values
/// Creates a new DHT value with a specified crypto kind and schema
/// Returns the newly allocated DHT Key if successful.
pub async fn create_value(
&self,
kind: CryptoKind,
schema: &DHTSchema,
) -> Result<TypedKey, VeilidAPIError> {
let storage_manager = self.api.storage_manager()?;
storage_manager
.create_value(kind, schema, self.unlocked_inner.safety_selection)
.await
}
/// Opens a DHT value at a specific key. Associates a secret if one is provided to provide writer capability.
/// Returns the DHT key descriptor for the opened key if successful
/// Value may only be opened or created once. To re-open with a different routing context, first close the value.
pub async fn open_value(
key: TypedKey,
secret: Option<SecretKey>,
) -> Result<DHTDescriptor, VeilidAPIError> {
let storage_manager = self.api.storage_manager()?;
storage_manager
.open_value(key, secret, self.unlocked_inner.safety_selection)
.await
}
/// Closes a DHT value at a specific key that was opened with create_value or open_value.
/// Closing a value allows you to re-open it with a different routing context
pub async fn close_value(key: TypedKey) -> Result<(), VeilidAPIError> {
let storage_manager = self.api.storage_manager()?;
storage_manager.close_value(key).await
}
/// Gets the latest value of a subkey from the network
/// Returns the possibly-updated value data of the subkey
pub async fn get_value(
&self,
_key: TypedKey,
_subkey: ValueSubkey,
key: TypedKey,
subkey: ValueSubkey,
force_refresh: bool,
) -> Result<ValueData, VeilidAPIError> {
panic!("unimplemented");
let storage_manager = self.api.storage_manager()?;
storage_manager.get_value(key, subkey, force_refresh).await
}
/// Pushes a changed subkey value to the network
/// Returns None if the value was successfully put
/// Returns Some(newer_value) if the value put was older than the one available on the network
pub async fn set_value(
&self,
_key: TypedKey,
_subkey: ValueSubkey,
_value: ValueData,
) -> Result<bool, VeilidAPIError> {
panic!("unimplemented");
key: TypedKey,
subkey: ValueSubkey,
value_data: ValueData,
) -> Result<Option<ValueData>, VeilidAPIError> {
let storage_manager = self.api.storage_manager()?;
storage_manager.set_value(key, subkey, value_data).await
}
/// Watches changes to an opened or created value
/// Changes to subkeys within the subkey range are returned via a ValueChanged callback
/// If the subkey range is empty, all subkey changes are considered
/// Expiration can be infinite to keep the watch for the maximum amount of time
/// Return value upon success is the amount of time allowed for the watch
pub async fn watch_value(
&self,
_key: TypedKey,
_subkeys: &[ValueSubkeyRange],
_expiration: Timestamp,
_count: u32,
) -> Result<bool, VeilidAPIError> {
panic!("unimplemented");
key: TypedKey,
subkeys: &[ValueSubkeyRange],
expiration: Timestamp,
count: u32,
) -> Result<Timestamp, VeilidAPIError> {
let storage_manager = self.api.storage_manager()?;
storage_manager
.watch_values(key, subkeys, expiration, count)
.await
}
/// Cancels a watch early
/// This is a convenience function that cancels watching all subkeys in a range
pub async fn cancel_watch_value(
&self,
_key: TypedKey,
_subkeys: &[ValueSubkeyRange],
key: TypedKey,
subkeys: &[ValueSubkeyRange],
) -> Result<bool, VeilidAPIError> {
panic!("unimplemented");
let storage_manager = self.api.storage_manager()?;
storage_manager.cancel_watch_value(key, subkey).await
}
///////////////////////////////////

View File

@ -15,8 +15,6 @@ pub type OperationId = AlignedU64;
pub type ByteCount = AlignedU64;
/// Tunnel identifier
pub type TunnelId = AlignedU64;
/// Value schema
pub type ValueSchema = FourCC;
/// Value subkey
pub type ValueSubkey = u32;
/// Value subkey range
@ -356,19 +354,19 @@ pub struct VeilidState {
#[archive_attr(repr(C), derive(CheckBytes))]
pub struct ValueData {
pub seq: ValueSeqNum,
pub schema: ValueSchema,
pub data: Vec<u8>,
pub writer: PublicKey,
}
impl ValueData {
pub fn new(schema: ValueSchema, data: Vec<u8>) -> Self {
pub fn new(data: Vec<u8>, writer: PublicKey) -> Self {
Self {
seq: 0,
schema,
data,
writer,
}
}
pub fn new_with_seq(seq: ValueSeqNum, schema: ValueSchema, data: Vec<u8>) -> Self {
Self { seq, schema, data }
pub fn new_with_seq(seq: ValueSeqNum, data: Vec<u8>, writer: PublicKey) -> Self {
Self { seq, data, writer }
}
pub fn change(&mut self, data: Vec<u8>) {
self.data = data;
@ -2402,22 +2400,129 @@ pub struct PeerStats {
/////////////////////////////////////////////////////////////////////////////////////////////////////
/// Parameter for Signal operation
#[derive(Clone, Debug, Serialize, Deserialize, RkyvArchive, RkyvSerialize, RkyvDeserialize)]
#[archive_attr(repr(u8), derive(CheckBytes))]
pub enum SignalInfo {
/// UDP Hole Punch Request
HolePunch {
// UDP Hole Punch Request
receipt: Vec<u8>, // Receipt to be returned after the hole punch
peer_info: PeerInfo, // Sender's peer info
/// /// Receipt to be returned after the hole punch
receipt: Vec<u8>,
/// Sender's peer info
peer_info: PeerInfo,
},
/// Reverse Connection Request
ReverseConnect {
// Reverse Connection Request
receipt: Vec<u8>, // Receipt to be returned by the reverse connection
peer_info: PeerInfo, // Sender's peer info
/// Receipt to be returned by the reverse connection
receipt: Vec<u8>,
/// Sender's peer info
peer_info: PeerInfo,
},
// XXX: WebRTC
}
/////////////////////////////////////////////////////////////////////////////////////////////////////
/// Default DHT Schema (DFLT)
#[derive(
Debug, Clone, PartialEq, Eq, Serialize, Deserialize, RkyvArchive, RkyvSerialize, RkyvDeserialize,
)]
#[archive_attr(repr(C), derive(CheckBytes))]
pub struct DHTSchemaDFLT {
/// Owner subkey count
pub o_cnt: u16,
}
impl DHTSchemaDFLT {
pub fn compile(&self) -> Vec<u8> {
let mut out = Vec::<u8>::with_capacity(6);
// kind
out.extend_from_slice(&FourCC::from_str("DFLT").unwrap().0);
// o_cnt
out.extend_from_slice(&self.o_cnt.to_le_bytes());
out
}
}
/// Simple DHT Schema (SMPL) Member
#[derive(
Debug, Clone, PartialEq, Eq, Serialize, Deserialize, RkyvArchive, RkyvSerialize, RkyvDeserialize,
)]
#[archive_attr(repr(C), derive(CheckBytes))]
pub struct DHTSchemaSMPLMember {
/// Member key
pub m_key: PublicKey,
/// Member subkey countanyway,
pub m_cnt: u16,
}
/// Simple DHT Schema (SMPL)
#[derive(
Debug, Clone, PartialEq, Eq, Serialize, Deserialize, RkyvArchive, RkyvSerialize, RkyvDeserialize,
)]
#[archive_attr(repr(C), derive(CheckBytes))]
pub struct DHTSchemaSMPL {
/// Owner subkey count
pub o_cnt: u16,
/// Members
pub members: Vec<DHTSchemaSMPLMember>,
}
impl DHTSchemaSMPL {
pub fn compile(&self) -> Vec<u8> {
let mut out = Vec::<u8>::with_capacity(6 + (self.members.len() * (PUBLIC_KEY_LENGTH + 2)));
// kind
out.extend_from_slice(&FourCC::from_str("SMPL").unwrap().0);
// o_cnt
out.extend_from_slice(&self.o_cnt.to_le_bytes());
// members
for m in self.members {
// m_key
out.extend_from_slice(&m.m_key.bytes);
// m_cnt
out.extend_from_slice(&m.m_cnt.to_le_bytes());
}
out
}
}
/// Enum over all the supported DHT Schemas
#[derive(
Debug, Clone, PartialEq, Eq, Serialize, Deserialize, RkyvArchive, RkyvSerialize, RkyvDeserialize,
)]
#[archive_attr(repr(u8), derive(CheckBytes))]
#[serde(tag = "kind")]
pub enum DHTSchema {
DFLT(DHTSchemaDFLT),
SMPL(DHTSchemaSMPL),
}
impl DHTSchema {
pub fn dflt(o_cnt: u16) -> DHTSchema {
DHTSchema::DFLT(DHTSchemaDFLT { o_cnt })
}
pub fn smpl(o_cnt: u16, members: Vec<DHTSchemaSMPLMember>) -> DHTSchema {
DHTSchema::SMPL(DHTSchemaSMPL { o_cnt, members })
}
pub fn compile(&self) -> Vec<u8> {
match self {
DHTSchema::DFLT(d) => d.compile(),
DHTSchema::SMPL(s) => s.compile(),
}
}
}
/// DHT Key Descriptor
#[derive(
Debug, Clone, PartialEq, Eq, Serialize, Deserialize, RkyvArchive, RkyvSerialize, RkyvDeserialize,
)]
#[archive_attr(repr(C), derive(CheckBytes))]
pub struct DHTDescriptor {
pub owner: PublicKey,
pub schema: DHTSchema,
}
/////////////////////////////////////////////////////////////////////////////////////////////////////
#[derive(
Copy,