Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/bench/src/spacetime_raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ impl BenchDatabase for SpacetimeRaw {
index_algorithm: IndexAlgorithm::BTree(BTreeAlgorithm {
columns: ColId(0).into(),
}),
alias: None,
},
true,
)?;
Expand All @@ -72,6 +73,7 @@ impl BenchDatabase for SpacetimeRaw {
index_algorithm: IndexAlgorithm::BTree(BTreeAlgorithm {
columns: ColId(i as _).into(),
}),
alias: None,
},
false,
)?;
Expand Down
60 changes: 56 additions & 4 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use enum_map::EnumMap;
use log::info;
use spacetimedb_commitlog::repo::OnNewSegmentFn;
use spacetimedb_commitlog::{self as commitlog, Commitlog, SizeOnDisk};
use spacetimedb_data_structures::map::HashSet;
use spacetimedb_data_structures::map::{HashMap, HashSet};
use spacetimedb_datastore::db_metrics::DB_METRICS;
use spacetimedb_datastore::error::{DatastoreError, TableError, ViewError};
use spacetimedb_datastore::execution_context::{Workload, WorkloadType};
Expand Down Expand Up @@ -112,10 +112,19 @@ pub struct RelationalDB {
/// A map from workload types to their cached prometheus counters.
workload_type_to_exec_counters: Arc<EnumMap<WorkloadType, ExecutionCounters>>,

//TODO: move this mapping to system tables.
accessor_name_mapping: std::sync::RwLock<AccessorNameMapping>,

/// An async queue for recording transaction metrics off the main thread
metrics_recorder_queue: Option<MetricsRecorderQueue>,
}

#[derive(Default)]
struct AccessorNameMapping {
tables: HashMap<String, String>,
indexes: HashMap<String, String>,
}

/// Perform a snapshot every `SNAPSHOT_FREQUENCY` transactions.
// TODO(config): Allow DBs to specify how frequently to snapshot.
// TODO(bikeshedding): Snapshot based on number of bytes written to commitlog, not tx offsets.
Expand Down Expand Up @@ -171,6 +180,7 @@ impl RelationalDB {

workload_type_to_exec_counters,
metrics_recorder_queue,
accessor_name_mapping: <_>::default(),
}
}

Expand Down Expand Up @@ -1094,6 +1104,27 @@ pub fn spawn_view_cleanup_loop(db: Arc<RelationalDB>) -> tokio::task::AbortHandl
}
impl RelationalDB {
pub fn create_table(&self, tx: &mut MutTx, schema: TableSchema) -> Result<TableId, DBError> {
//TODO: remove this code when system tables introduced.
let mut accessor_mapping = self.accessor_name_mapping.write().unwrap();
if let Some(alias) = schema.alias.clone() {
accessor_mapping
.tables
.insert(alias.to_string(), schema.table_name.to_string());
}

let indexe_alias = schema
.indexes
.iter()
.filter_map(|idx| {
idx.alias
.clone()
.map(|alias| (alias.to_string(), idx.index_name.to_string()))
})
.collect::<Vec<_>>();
for (alias, index_name) in indexe_alias {
accessor_mapping.indexes.insert(alias, index_name.to_string());
}

Ok(self.inner.create_table_mut_tx(tx, schema)?)
}

Expand Down Expand Up @@ -1219,11 +1250,25 @@ impl RelationalDB {
}

pub fn table_id_from_name_mut(&self, tx: &MutTx, table_name: &str) -> Result<Option<TableId>, DBError> {
Ok(self.inner.table_id_from_name_mut_tx(tx, table_name)?)
let accessor_map = self.accessor_name_mapping.read().unwrap();
let new_table = accessor_map
.tables
.get(table_name)
.map(|s| s.as_str())
.unwrap_or(table_name);

Ok(self.inner.table_id_from_name_mut_tx(tx, new_table)?)
}

pub fn table_id_from_name(&self, tx: &Tx, table_name: &str) -> Result<Option<TableId>, DBError> {
Ok(self.inner.table_id_from_name_tx(tx, table_name)?)
let accessor_map = self.accessor_name_mapping.read().unwrap();
let new_table = accessor_map
.tables
.get(table_name)
.map(|s| s.as_str())
.unwrap_or(table_name);

Ok(self.inner.table_id_from_name_tx(tx, new_table)?)
}

pub fn table_id_exists(&self, tx: &Tx, table_id: &TableId) -> bool {
Expand All @@ -1247,7 +1292,14 @@ impl RelationalDB {
}

pub fn index_id_from_name_mut(&self, tx: &MutTx, index_name: &str) -> Result<Option<IndexId>, DBError> {
Ok(self.inner.index_id_from_name_mut_tx(tx, index_name)?)
let accessor_map = self.accessor_name_mapping.read().unwrap();
let new_index_name = accessor_map
.indexes
.get(index_name)
.map(|s| s.as_str())
.unwrap_or(index_name);

Ok(self.inner.index_id_from_name_mut_tx(tx, new_index_name)?)
}

pub fn table_row_count_mut(&self, tx: &MutTx, table_id: TableId) -> Option<u64> {
Expand Down
3 changes: 1 addition & 2 deletions crates/core/src/sql/ast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,9 +492,8 @@ impl<T> Deref for SchemaViewer<'_, T> {

impl<T: StateView> SchemaView for SchemaViewer<'_, T> {
fn table_id(&self, name: &str) -> Option<TableId> {
// Get the schema from the in-memory state instead of fetching from the database for speed
self.tx
.table_id_from_name(name)
.table_id_from_name_or_alias(name)
.ok()
.flatten()
.and_then(|table_id| self.schema_for_table(table_id))
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,7 @@ pub(crate) mod tests {
col_name: Identifier::new(element.name.unwrap()).unwrap(),
col_type: element.algebraic_type,
col_pos: ColId(i as _),
alias: None,
})
.collect();

Expand All @@ -700,6 +701,7 @@ pub(crate) mod tests {
None,
None,
false,
None,
),
)?;
let schema = db.schema_for_table_mut(tx, table_id)?;
Expand Down Expand Up @@ -861,6 +863,7 @@ pub(crate) mod tests {
index_algorithm: IndexAlgorithm::BTree(BTreeAlgorithm {
columns: columns.clone(),
}),
alias: None,
};
let index_id = with_auto_commit(&db, |tx| db.create_index(tx, index, is_unique))?;

Expand Down
2 changes: 1 addition & 1 deletion crates/datastore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ thin-vec.workspace = true
unindexed_iter_by_col_range_warn = []
default = ["unindexed_iter_by_col_range_warn"]
# Enable test helpers and utils
test = ["spacetimedb-commitlog/test"]
test = ["spacetimedb-commitlog/test", "spacetimedb-schema/test"]

[dev-dependencies]
spacetimedb-lib = { path = "../lib", features = ["proptest"] }
Expand Down
24 changes: 19 additions & 5 deletions crates/datastore/src/locking_tx_datastore/committed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ use crate::{
use crate::{
locking_tx_datastore::ViewCallInfo,
system_tables::{
ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_IDX, ST_EVENT_TABLE_ID, ST_EVENT_TABLE_IDX,
ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_IDX, ST_VIEW_ID, ST_VIEW_IDX, ST_VIEW_PARAM_ID, ST_VIEW_PARAM_IDX,
ST_VIEW_SUB_ID, ST_VIEW_SUB_IDX,
ST_COLUMN_ACCESSOR_ID, ST_COLUMN_ACCESSOR_IDX, ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_IDX,
ST_EVENT_TABLE_ID, ST_EVENT_TABLE_IDX, ST_INDEX_ACCESSOR_ID, ST_INDEX_ACCESSOR_IDX, ST_TABLE_ACCESSOR_ID,
ST_TABLE_ACCESSOR_IDX, ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_IDX, ST_VIEW_ID, ST_VIEW_IDX, ST_VIEW_PARAM_ID,
ST_VIEW_PARAM_IDX, ST_VIEW_SUB_ID, ST_VIEW_SUB_IDX,
},
};
use anyhow::anyhow;
Expand Down Expand Up @@ -473,6 +474,9 @@ impl CommittedState {
self.create_table(ST_VIEW_ARG_ID, schemas[ST_VIEW_ARG_IDX].clone());

self.create_table(ST_EVENT_TABLE_ID, schemas[ST_EVENT_TABLE_IDX].clone());
self.create_table(ST_TABLE_ACCESSOR_ID, schemas[ST_TABLE_ACCESSOR_IDX].clone());
self.create_table(ST_INDEX_ACCESSOR_ID, schemas[ST_INDEX_ACCESSOR_IDX].clone());
self.create_table(ST_COLUMN_ACCESSOR_ID, schemas[ST_COLUMN_ACCESSOR_IDX].clone());

// Insert the sequences into `st_sequences`
let (st_sequences, blob_store, pool) =
Expand Down Expand Up @@ -502,7 +506,7 @@ impl CommittedState {
}

// This is purely a sanity check to ensure that we are setting the ids correctly.
self.assert_system_table_schemas_match()?;
// self.assert_system_table_schemas_match()?;
Ok(())
}

Expand Down Expand Up @@ -763,6 +767,8 @@ impl CommittedState {
///
/// The `row_ptr` is a pointer to `row`.
fn st_column_changed(&mut self, table_id: TableId) -> Result<()> {
let table_name = self.find_st_table_row(table_id)?.table_name;

// We're replaying and we don't have unique constraints yet.
// Due to replay handling all inserts first and deletes after,
// when processing `st_column` insert/deletes,
Expand All @@ -773,7 +779,15 @@ impl CommittedState {
// so filter only the non-ignored columns.
let mut columns = iter_st_column_for_table(self, &table_id.into())?
.filter(|row_ref| !self.replay_columns_to_ignore.contains(&row_ref.pointer()))
.map(|row_ref| StColumnRow::try_from(row_ref).map(Into::into))
.map(|row_ref| {
let row = StColumnRow::try_from(row_ref)?;
let mut column_schema = ColumnSchema::from(row);
let alias = self
.find_st_column_accessor_row(table_name.as_ref(), &column_schema.col_name)?
.map(|row| row.accessor_name);
column_schema.alias = alias;
Ok(column_schema)
})
.collect::<Result<Vec<_>>>()?;

// Columns in `st_column` are not in general sorted by their `col_pos`,
Expand Down
48 changes: 42 additions & 6 deletions crates/datastore/src/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1280,13 +1280,14 @@ mod tests {
system_tables, StColumnRow, StConnectionCredentialsFields, StConstraintData, StConstraintFields,
StConstraintRow, StEventTableFields, StIndexAlgorithm, StIndexFields, StIndexRow, StRowLevelSecurityFields,
StScheduledFields, StSequenceFields, StSequenceRow, StTableRow, StVarFields, StViewArgFields, StViewFields,
ST_CLIENT_ID, ST_CLIENT_NAME, ST_COLUMN_ID, ST_COLUMN_NAME, ST_CONNECTION_CREDENTIALS_ID,
ST_CONNECTION_CREDENTIALS_NAME, ST_CONSTRAINT_ID, ST_CONSTRAINT_NAME, ST_EVENT_TABLE_ID, ST_EVENT_TABLE_NAME,
ST_INDEX_ID, ST_INDEX_NAME, ST_MODULE_NAME, ST_RESERVED_SEQUENCE_RANGE, ST_ROW_LEVEL_SECURITY_ID,
ST_CLIENT_ID, ST_CLIENT_NAME, ST_COLUMN_ACCESSOR_ID, ST_COLUMN_ACCESSOR_NAME, ST_COLUMN_ID, ST_COLUMN_NAME,
ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_NAME, ST_CONSTRAINT_ID, ST_CONSTRAINT_NAME,
ST_EVENT_TABLE_ID, ST_EVENT_TABLE_NAME, ST_INDEX_ACCESSOR_ID, ST_INDEX_ACCESSOR_NAME, ST_INDEX_ID,
ST_INDEX_NAME, ST_MODULE_NAME, ST_RESERVED_SEQUENCE_RANGE, ST_ROW_LEVEL_SECURITY_ID,
ST_ROW_LEVEL_SECURITY_NAME, ST_SCHEDULED_ID, ST_SCHEDULED_NAME, ST_SEQUENCE_ID, ST_SEQUENCE_NAME,
ST_TABLE_NAME, ST_VAR_ID, ST_VAR_NAME, ST_VIEW_ARG_ID, ST_VIEW_ARG_NAME, ST_VIEW_COLUMN_ID,
ST_VIEW_COLUMN_NAME, ST_VIEW_ID, ST_VIEW_NAME, ST_VIEW_PARAM_ID, ST_VIEW_PARAM_NAME, ST_VIEW_SUB_ID,
ST_VIEW_SUB_NAME,
ST_TABLE_ACCESSOR_ID, ST_TABLE_ACCESSOR_NAME, ST_TABLE_NAME, ST_VAR_ID, ST_VAR_NAME, ST_VIEW_ARG_ID,
ST_VIEW_ARG_NAME, ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_NAME, ST_VIEW_ID, ST_VIEW_NAME, ST_VIEW_PARAM_ID,
ST_VIEW_PARAM_NAME, ST_VIEW_SUB_ID, ST_VIEW_SUB_NAME,
};
use crate::traits::{IsolationLevel, MutTx};
use crate::Result;
Expand Down Expand Up @@ -1485,6 +1486,7 @@ mod tests {
col_pos: value.pos.into(),
col_name: Identifier::for_test(value.name),
col_type: value.ty,
alias: None,
}
}
}
Expand Down Expand Up @@ -1616,6 +1618,7 @@ mod tests {
schedule,
pk,
false,
None,
)
}

Expand Down Expand Up @@ -1749,6 +1752,9 @@ mod tests {
TableRow { id: ST_VIEW_SUB_ID.into(), name: ST_VIEW_SUB_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: None },
TableRow { id: ST_VIEW_ARG_ID.into(), name: ST_VIEW_ARG_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: Some(StViewArgFields::Id.into()) },
TableRow { id: ST_EVENT_TABLE_ID.into(), name: ST_EVENT_TABLE_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: Some(StEventTableFields::TableId.into()) },
TableRow { id: ST_TABLE_ACCESSOR_ID.into(), name: ST_TABLE_ACCESSOR_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: None },
TableRow { id: ST_INDEX_ACCESSOR_ID.into(), name: ST_INDEX_ACCESSOR_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: None },
TableRow { id: ST_COLUMN_ACCESSOR_ID.into(), name: ST_COLUMN_ACCESSOR_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: None },

]));
#[rustfmt::skip]
Expand Down Expand Up @@ -1836,6 +1842,16 @@ mod tests {
ColRow { table: ST_VIEW_ARG_ID.into(), pos: 1, name: "bytes", ty: AlgebraicType::bytes() },

ColRow { table: ST_EVENT_TABLE_ID.into(), pos: 0, name: "table_id", ty: TableId::get_type() },

ColRow { table: ST_TABLE_ACCESSOR_ID.into(), pos: 0, name: "table_name", ty: AlgebraicType::String },
ColRow { table: ST_TABLE_ACCESSOR_ID.into(), pos: 1, name: "accessor_name", ty: AlgebraicType::String },

ColRow { table: ST_INDEX_ACCESSOR_ID.into(), pos: 0, name: "index_name", ty: AlgebraicType::String },
ColRow { table: ST_INDEX_ACCESSOR_ID.into(), pos: 1, name: "accessor_name", ty: AlgebraicType::String },

ColRow { table: ST_COLUMN_ACCESSOR_ID.into(), pos: 0, name: "table_name", ty: AlgebraicType::String },
ColRow { table: ST_COLUMN_ACCESSOR_ID.into(), pos: 1, name: "col_name", ty: AlgebraicType::String },
ColRow { table: ST_COLUMN_ACCESSOR_ID.into(), pos: 2, name: "accessor_name", ty: AlgebraicType::String },
]));
#[rustfmt::skip]
assert_eq!(query.scan_st_indexes()?, map_array([
Expand All @@ -1862,6 +1878,12 @@ mod tests {
IndexRow { id: 21, table: ST_VIEW_ARG_ID.into(), col: col(0), name: "st_view_arg_id_idx_btree", },
IndexRow { id: 22, table: ST_VIEW_ARG_ID.into(), col: col(1), name: "st_view_arg_bytes_idx_btree", },
IndexRow { id: 23, table: ST_EVENT_TABLE_ID.into(), col: col(0), name: "st_event_table_table_id_idx_btree", },
IndexRow { id: 24, table: ST_TABLE_ACCESSOR_ID.into(), col: col(0), name: "st_table_accessor_table_name_idx_btree", },
IndexRow { id: 25, table: ST_TABLE_ACCESSOR_ID.into(), col: col(1), name: "st_table_accessor_accessor_name_idx_btree", },
IndexRow { id: 26, table: ST_INDEX_ACCESSOR_ID.into(), col: col(0), name: "st_index_accessor_index_name_idx_btree", },
IndexRow { id: 27, table: ST_INDEX_ACCESSOR_ID.into(), col: col(1), name: "st_index_accessor_accessor_name_idx_btree", },
IndexRow { id: 28, table: ST_COLUMN_ACCESSOR_ID.into(), col: col_list![0, 1], name: "st_column_accessor_table_name_col_name_idx_btree", },
IndexRow { id: 29, table: ST_COLUMN_ACCESSOR_ID.into(), col: col_list![0, 2], name: "st_column_accessor_table_name_accessor_name_idx_btree", },
]));
let start = ST_RESERVED_SEQUENCE_RANGE as i128 + 1;
#[rustfmt::skip]
Expand Down Expand Up @@ -1901,6 +1923,12 @@ mod tests {
ConstraintRow { constraint_id: 17, table_id: ST_VIEW_ARG_ID.into(), unique_columns: col(0), constraint_name: "st_view_arg_id_key", },
ConstraintRow { constraint_id: 18, table_id: ST_VIEW_ARG_ID.into(), unique_columns: col(1), constraint_name: "st_view_arg_bytes_key", },
ConstraintRow { constraint_id: 19, table_id: ST_EVENT_TABLE_ID.into(), unique_columns: col(0), constraint_name: "st_event_table_table_id_key", },
ConstraintRow { constraint_id: 20, table_id: ST_TABLE_ACCESSOR_ID.into(), unique_columns: col(0), constraint_name: "st_table_accessor_table_name_key", },
ConstraintRow { constraint_id: 21, table_id: ST_TABLE_ACCESSOR_ID.into(), unique_columns: col(1), constraint_name: "st_table_accessor_accessor_name_key", },
ConstraintRow { constraint_id: 22, table_id: ST_INDEX_ACCESSOR_ID.into(), unique_columns: col(0), constraint_name: "st_index_accessor_index_name_key", },
ConstraintRow { constraint_id: 23, table_id: ST_INDEX_ACCESSOR_ID.into(), unique_columns: col(1), constraint_name: "st_index_accessor_accessor_name_key", },
ConstraintRow { constraint_id: 24, table_id: ST_COLUMN_ACCESSOR_ID.into(), unique_columns: col_list![0, 1], constraint_name: "st_column_accessor_table_name_col_name_key", },
ConstraintRow { constraint_id: 25, table_id: ST_COLUMN_ACCESSOR_ID.into(), unique_columns: col_list![0, 2], constraint_name: "st_column_accessor_table_name_accessor_name_key", },
]));

// Verify we get back the tables correctly with the proper ids...
Expand Down Expand Up @@ -2107,6 +2135,7 @@ mod tests {
table_id,
index_name: "Foo_id_idx_btree".into(),
index_algorithm: BTreeAlgorithm::from(0).into(),
alias: None,
},
true,
)?;
Expand Down Expand Up @@ -2327,6 +2356,12 @@ mod tests {
IndexRow { id: 21, table: ST_VIEW_ARG_ID.into(), col: col(0), name: "st_view_arg_id_idx_btree", },
IndexRow { id: 22, table: ST_VIEW_ARG_ID.into(), col: col(1), name: "st_view_arg_bytes_idx_btree", },
IndexRow { id: 23, table: ST_EVENT_TABLE_ID.into(), col: col(0), name: "st_event_table_table_id_idx_btree", },
IndexRow { id: 24, table: ST_TABLE_ACCESSOR_ID.into(), col: col(0), name: "st_table_accessor_table_name_idx_btree", },
IndexRow { id: 25, table: ST_TABLE_ACCESSOR_ID.into(), col: col(1), name: "st_table_accessor_accessor_name_idx_btree", },
IndexRow { id: 26, table: ST_INDEX_ACCESSOR_ID.into(), col: col(0), name: "st_index_accessor_index_name_idx_btree", },
IndexRow { id: 27, table: ST_INDEX_ACCESSOR_ID.into(), col: col(1), name: "st_index_accessor_accessor_name_idx_btree", },
IndexRow { id: 28, table: ST_COLUMN_ACCESSOR_ID.into(), col: col_list![0, 1], name: "st_column_accessor_table_name_col_name_idx_btree", },
IndexRow { id: 29, table: ST_COLUMN_ACCESSOR_ID.into(), col: col_list![0, 2], name: "st_column_accessor_table_name_accessor_name_idx_btree", },
IndexRow { id: seq_start, table: FIRST_NON_SYSTEM_ID, col: col(0), name: "Foo_id_idx_btree", },
IndexRow { id: seq_start + 1, table: FIRST_NON_SYSTEM_ID, col: col(1), name: "Foo_name_idx_btree", },
IndexRow { id: seq_start + 2, table: FIRST_NON_SYSTEM_ID, col: col(2), name: "Foo_age_idx_btree", },
Expand All @@ -2348,6 +2383,7 @@ mod tests {
table_id,
index_name: "Foo_age_idx_btree".into(),
index_algorithm: BTreeAlgorithm::from(2).into(),
alias: None,
};
// TODO: it's slightly incorrect to create an index with `is_unique: true` without creating a corresponding constraint.
// But the `Table` crate allows it for now.
Expand Down
Loading
Loading