-
Notifications
You must be signed in to change notification settings - Fork 10
Unified TimeSeriesStore with pluggable backends, global rewrite of timed event storage #1080
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Conversation
- quality_barrier: Callable[[Observable[T]], Observable[T]] - sharpness_barrier: Callable[[Observable[Image]], Observable[Image]]
- Implement find_closest(), first_timestamp(), iterate(), iterate_ts(), iterate_realtime() methods using abstract _iter_items/_find_closest_timestamp - Add scheduler-based stream() with absolute time reference to prevent timing drift during long playback (ported from replay.py) - Move imports to top of file, add proper typing throughout - Fix pickledir.py mypy error (pickle.load returns Any)
- Single-file SQLite storage with indexed timestamp queries - BLOB storage for pickled sensor data - INSERT OR REPLACE for duplicate timestamp handling - Supports multiple tables per database (different sensors) - Added to parametrized tests (15 tests across 3 backends)
- PostgresStore implements SensorStore[T] + Resource for lifecycle management - Multiple stores can share same database with different tables - Tables created automatically on first save - Tests are optional - skip gracefully if PostgreSQL not available - Added psycopg2-binary and types-psycopg2 dependencies - Includes reset_db() helper for simple migrations (drop/recreate)
Greptile OverviewGreptile SummaryThis PR introduces a comprehensive refactoring that unifies timestamped data storage across the codebase through a new Key Changes:
Architecture Benefits:
SQL Injection Issues Resolved: Testing: Confidence Score: 5/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant App as Application
participant Store as TimeSeriesStore[T]
participant Backend as Backend Implementation
participant Storage as Storage Layer
Note over App,Storage: Saving Data
App->>Store: save(data)
Store->>Store: Extract data.ts
Store->>Backend: _save(timestamp, data)
Backend->>Storage: Persist to storage
Storage-->>Backend: Success
Backend-->>Store: Complete
Note over App,Storage: Finding Closest Match
App->>Store: find_closest(timestamp, tolerance)
Store->>Backend: _find_closest_timestamp(ts, tol)
Backend->>Storage: Query with index/binary search
Storage-->>Backend: closest_timestamp
Backend-->>Store: Return timestamp
Store->>Backend: _load(closest_timestamp)
Backend->>Storage: Retrieve data
Storage-->>Backend: data
Backend-->>Store: Return data
Store-->>App: Return T
Note over App,Storage: Streaming Replay
App->>Store: stream(speed=1.0, seek=10.0)
Store->>Store: Create Observable
Store->>Backend: iterate_items(seek=10.0)
loop For each item
Backend->>Storage: Load next item
Storage-->>Backend: (timestamp, data)
Backend-->>Store: Yield item
Store->>Store: Schedule emission with timing
Store-->>App: on_next(data)
end
Store-->>App: on_completed()
Note over App,Storage: Backend Examples
Note over Backend: InMemoryStore: SortedKeyList
Note over Backend: SqliteStore: Indexed DB queries
Note over Backend: PostgresStore: Distributed storage
Note over Backend: PickleDirStore: File per timestamp
Note over Backend: LegacyPickleStore: Backward compat
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
25 files reviewed, 7 comments
- Validate table/database names in SqliteStore and PostgresStore using regex (alphanumeric/underscore, not starting with digit) - Fix Transform.to_pose() return type using TYPE_CHECKING import - Add return type annotation to TF.get_pose() - Fix ambiguous doclink in transports.md
SqliteStore now accepts a name (e.g. "recordings/lidar") that gets resolved via get_data_dir to data/recordings/lidar.db. Still supports absolute paths and :memory: for backward compatibility.
- Add TypeVar bound: T = TypeVar("T", bound=Timestamped)
- Simplify save() to always use data.ts (no more optional timestamp)
- Update tests to use SampleData(Timestamped) instead of strings
- SqliteStore accepts str | Path for backward compatibility
# Conflicts: # dimos/models/manipulation/contact_graspnet_pytorch/inference.py
Required when cupy/contact_graspnet are installed locally without type stubs.
Resolve conflicts: - Accept dev's Image refactor (numpy-only, no CudaImage/AbstractImage) - Keep spatial_db2's psycopg2-binary dep, add dev's annotation-protocol + toolz - Keep spatial_db2's print_loss_heatmap in benchmark type - Keep spatial_db2's typed sharpness_barrier signature - Merge TYPE_CHECKING imports in Transform.py (rerun + PoseStamped) - Trivial: accept dev's type-ignore formatting (space after comma)
- Add import-untyped to xacro type: ignore comment in mesh_utils.py - Remove unused record/replay RPC methods from ModuleBase
Remove the Timestamped bound from SensorStore's TypeVar, enabling storage of arbitrary data types. Timestamps are now provided explicitly via save(ts, data), with Timestamped convenience methods (save_ts, pipe_save_ts, consume_stream_ts) as opt-in helpers. iterate_realtime() and stream() now use stored timestamps instead of data.ts.
Fix import sorting in treid.py and suppress B027 for optional warmup() in base.py.
Remove dead EmbeddingModel.warmup() method. Update go2.py to use consume_stream_ts() for the new SensorStore API.
save/pipe_save/consume_stream now work with Timestamped data by default. save_raw/pipe_save_raw/consume_stream_raw take explicit timestamps for non-Timestamped data.
Implement _delete for InMemoryStore, SqliteStore, PickleDirStore, PostgresStore (LegacyPickleStore raises NotImplementedError). Fix find_closest docstring placement and add get/add/prune_old convenience methods.
…tedKeyList Replace InMemoryStore's dict + sorted-cache (O(n log n) rebuild on every write) with SortedKeyList for O(log n) insert, delete, and range queries. Add collection methods to TimeSeriesStore base: __len__, __iter__, last/last_timestamp, start_ts/ end_ts, time_range, duration, find_before/find_after, slice_by_time. Implement backing abstract methods (_count, _last_timestamp, _find_before, _find_after) in all five backends. Performance benchmarks confirm InMemoryStore matches TimestampedCollection on 100k items.
…nmemory.py Rename the module to better reflect its purpose. Extract InMemoryStore from base.py into its own file (inmemory.py) to keep base.py focused on the abstract TimeSeriesStore class. Update all internal and external imports.
…, store T directly - Bound T to Timestamped — no more raw/non-Timestamped data paths - Removed save_raw, pipe_save_raw, consume_stream_raw - InMemoryStore stores T directly in SortedKeyList (no _Entry wrapper) - Removed duplicate-check on insert (same semantics as TimestampedCollection) - Performance now at parity with TimestampedCollection
- Delete TimestampedCollection class (replaced by InMemoryStore) - Rewrite TimestampedBufferCollection to inherit InMemoryStore - Remove TBuffer_old dead code from tf.py - Fix prune_old mutation-during-iteration bug in base.py - Break circular import with TYPE_CHECKING guard in base.py - Update Image.py to use public API instead of _items access - Update tests to use InMemoryStore directly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
42 files reviewed, no comments
|
Personally, I don't like the API. It's similar to what we do with Sessionsstore = SqliteStore("recordings/lidar")
store.save(data)Without a session, API calls race to create store = SqliteStore("recordings/lidar")
session = store.session()
with session:
session.save(data)Then SqliteStoreSession has Lack of ownerMost systems separate the database from the sessions. This makes it easy to close the database which closes all the sessions. store = SqliteStore("recordings/lidar")
session = store.session()
store.close()
Low cohesion in TimeSeriesStoreTimeSeriesStore is a database, a session, and a query set. Most of the methods belong to the query set. (QuerySet is a Django term, others use Manager or Collection for the object which does the querying.) But the code forces people to extend TimeSeriesStore to get what they need instead of TimeSeriesStore using what it needs. Example. class TimeSeriesStore(Generic[T], ABC):
@abstractmethod
def _save(self, timestamp: float, data: T) -> None: ..
def save(self, *data: T) -> None:
for item in data:
self._save(item.ts, item)
class MyStore(TimeSeriesStore[T]):
def _save(self, timestamp: float, data: T) -> None: ..I suggest this: class Session(Protocol[T]):
def save(self, timestamp: float, data: T) -> None: ...
class MySession(Session[T]):
def save(self, timestamp: float, data: T) -> None:
# implementation
...
class TimeSeriesStore(Generic[T])
def __init__(self, session: Session[T]):
self._session = session
def save(self, *data: T) -> None:
for item in data:
self._session.save(data)
VerbosityIf the concern is that this is too verbose, you can always have helper methods. But if the API mixes concerns too much that's not easy to untangle. Example helper method: with store_session("sqlite", "/path/to/sensors.db") as store:
store.save(...) |
greptile is saying 5/5 paul is saying 0/5, this is a classic good cop bad cop situation sounds good! will externalize the sessions |
Summary
This is a first pass on memory, cleans up the way we deal with timed events in dimos
Creates a
TimeSeriesStore[T]abstraction for timestamped data with multiple backend implementations.Backend implementations are very simple and TBH I don't expect to use psql but sqlite, implemented them more as examples.
TimeSeriesStore[T]Base Class (dimos/memory/timeseries/base.py)Generic abstract base — backends implement
_save,_load,_delete,_iter_items,_find_closest_timestamp,_count,_last_timestamp,_find_before,_find_afterAPI built on top:
save(),find_closest(),find_before(),find_after(),prune_old(),slice_by_time(),iterate(),iterate_realtime(),stream(),consume_stream()Tbound toTimestamped— timestamps come from.tsattributeBackend Implementations
InMemoryStoreinmemory.pyPickleDirStorepickledir.pySqliteStoresqlite.pyPostgresStorepostgres.pyResourcelifecycleLegacyPickleStorelegacy.pyTimedSensorReplayrecordingsUnified the way we treat storage of timed items
Replaced
TimestampedCollectionTimestampedCollectionand its subclassTimestampedBufferCollectionwere used for:TBuffer)align_timestamped)All now use
InMemoryStoredirectly.TimestampedBufferCollectionis a thin wrapper adding auto-prune on insert.TimestampedCollectionclass deleted entirely.Replaced Pickle Sensor Replay System with standard time series store interface
future recording and reply can go via sqlite for example
Replaced Transform service in-memory transform storage
they all depend on the same base, storing transforms in postgres (not that we want to) is a one line change
Other changes
Embeddingtype — all embedding models return the same typeUsage