Skip to content

Commit

Permalink
Merge pull request #99 from meilisearch/store-the-updated-id-in-lmdb
Browse files Browse the repository at this point in the history
Store the list of updated IDs directly in LMDB instead of a roaring bitmap
  • Loading branch information
irevoire authored Oct 1, 2024
2 parents 24083df + d9a5694 commit d72b469
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 58 deletions.
1 change: 1 addition & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ impl Error {
NodeMode::Item => "Item",
NodeMode::Tree => "Tree",
NodeMode::Metadata => "Metadata",
NodeMode::Updated => "Updated",
},
item: key.node.item,
}
Expand Down
11 changes: 8 additions & 3 deletions src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ use heed::BoxedError;
use crate::{NodeId, NodeMode};

/// This whole structure must fit in an u64 so we can tell LMDB to optimize its storage.
/// The `prefix` is specified by the user and is used to differentiate between multiple arroy indexes.
/// The `index` is specified by the user and is used to differentiate between multiple arroy indexes.
/// The `mode` indicates what we're looking at.
/// The `item` point to a specific node.
/// If the mode is:
/// - `Item`: we're looking at a `Leaf` node.
/// - `Tree`: we're looking at one of the internal generated node from arroy. Could be a descendants or a split plane.
/// - `Updated`: The list of items that has been updated since the last build of the database.
/// - `Metadata`: There is only one item at `0` that contains the header required to read the index.
#[derive(Debug, Copy, Clone)]
pub struct Key {
Expand All @@ -32,8 +33,8 @@ impl Key {
Self::new(index, NodeId::metadata())
}

pub const fn updated(index: u16) -> Self {
Self::new(index, NodeId::updated())
pub const fn updated(index: u16, item: u32) -> Self {
Self::new(index, NodeId::updated(item))
}

pub const fn item(index: u16, item: u32) -> Self {
Expand Down Expand Up @@ -98,6 +99,10 @@ impl Prefix {
pub const fn tree(index: u16) -> Self {
Self { index, mode: Some(NodeMode::Tree) }
}

pub const fn updated(index: u16) -> Self {
Self { index, mode: Some(NodeMode::Updated) }
}
}

pub enum PrefixCodec {}
Expand Down
24 changes: 18 additions & 6 deletions src/node_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,15 @@ use crate::ItemId;
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[repr(u8)]
pub enum NodeMode {
/// Stores the metadata under the `ItemId` 0
Metadata = 0,
Tree = 1,
Item = 2,
/// Stores the list of all the `ItemId` that have been updated.
/// We only stores `Unit` values under the keys.
Updated = 1,
/// The tree nodes are stored under this id.
Tree = 2,
/// The original vectors are stored under this id in `Leaf` structures.
Item = 3,
}

impl TryFrom<u8> for NodeMode {
Expand All @@ -21,6 +27,7 @@ impl TryFrom<u8> for NodeMode {
match v {
v if v == NodeMode::Item as u8 => Ok(NodeMode::Item),
v if v == NodeMode::Tree as u8 => Ok(NodeMode::Tree),
v if v == NodeMode::Updated as u8 => Ok(NodeMode::Updated),
v if v == NodeMode::Metadata as u8 => Ok(NodeMode::Metadata),
v => Err(format!("Could not convert {v} as a `NodeMode`.")),
}
Expand All @@ -47,8 +54,8 @@ impl NodeId {
Self { mode: NodeMode::Metadata, item: 0 }
}

pub const fn updated() -> Self {
Self { mode: NodeMode::Metadata, item: 1 }
pub const fn updated(item: u32) -> Self {
Self { mode: NodeMode::Updated, item }
}

pub const fn tree(item: u32) -> Self {
Expand Down Expand Up @@ -107,11 +114,16 @@ mod test {
assert!(NodeId::tree(1) > NodeId::tree(0));
assert!(NodeId::tree(0) < NodeId::tree(1));

assert!(NodeId::updated(0) == NodeId::updated(0));
assert!(NodeId::updated(1) > NodeId::updated(0));
assert!(NodeId::updated(0) < NodeId::updated(1));

// tree < item whatever is the value
assert!(NodeId::tree(u32::MAX) < NodeId::item(0));

assert!(NodeId::metadata() == NodeId::metadata());
assert!(NodeId::metadata() < NodeId::tree(u32::MAX));
assert!(NodeId::metadata() < NodeId::item(u32::MAX));
assert!(NodeId::metadata() < NodeId::tree(u32::MIN));
assert!(NodeId::metadata() < NodeId::updated(u32::MIN));
assert!(NodeId::metadata() < NodeId::item(u32::MIN));
}
}
10 changes: 8 additions & 2 deletions src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::iter::repeat;
use std::marker;
use std::num::NonZeroUsize;

use heed::types::{Bytes, DecodeIgnore};
use heed::types::DecodeIgnore;
use heed::RoTxn;
use ordered_float::OrderedFloat;
use roaring::RoaringBitmap;
Expand Down Expand Up @@ -146,7 +146,13 @@ impl<'t, D: Distance> Reader<'t, D> {
received: D::name(),
});
}
if database.remap_data_type::<Bytes>().get(rtxn, &Key::updated(index))?.is_some() {
if database
.remap_types::<PrefixCodec, DecodeIgnore>()
.prefix_iter(rtxn, &Prefix::updated(index))?
.remap_key_type::<KeyCodec>()
.next()
.is_some()
{
return Err(Error::NeedBuild(index));
}

Expand Down
4 changes: 1 addition & 3 deletions src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,7 @@ impl<D: Distance> fmt::Display for DatabaseHandle<D> {
.unwrap();
writeln!(f, "updated_item_ids: {updated_item_ids:?}")?;
}
NodeMode::Metadata => {
panic!()
}
NodeMode::Updated | NodeMode::Metadata => panic!(),
}
}

Expand Down
84 changes: 40 additions & 44 deletions src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::borrow::Cow;
use std::mem;
use std::path::PathBuf;

use heed::types::{Bytes, DecodeIgnore};
use heed::types::{Bytes, DecodeIgnore, Unit};
use heed::{MdbError, PutFlags, RoTxn, RwTxn};
use rand::{Rng, SeedableRng};
use rayon::iter::repeatn;
Expand All @@ -20,7 +20,6 @@ use crate::parallel::{
TmpNodesReader,
};
use crate::reader::item_leaf;
use crate::roaring::RoaringBitmapCodec;
use crate::unaligned_vector::UnalignedVector;
use crate::{
Database, Error, ItemId, Key, Metadata, MetadataCodec, Node, NodeCodec, NodeId, Prefix,
Expand Down Expand Up @@ -224,8 +223,10 @@ impl<D: Distance> Writer<D> {
pub fn need_build(&self, rtxn: &RoTxn) -> Result<bool> {
Ok(self
.database
.remap_data_type::<DecodeIgnore>()
.get(rtxn, &Key::updated(self.index))?
.remap_types::<PrefixCodec, DecodeIgnore>()
.prefix_iter(rtxn, &Prefix::updated(self.index))?
.remap_key_type::<KeyCodec>()
.next()
.is_some()
|| self
.database
Expand Down Expand Up @@ -266,17 +267,7 @@ impl<D: Distance> Writer<D> {
let vector = UnalignedVector::from_slice(vector);
let leaf = Leaf { header: D::new_header(&vector), vector };
self.database.put(wtxn, &Key::item(self.index, item), &Node::Leaf(leaf))?;
let mut updated = self
.database
.remap_data_type::<RoaringBitmapCodec>()
.get(wtxn, &Key::updated(self.index))?
.unwrap_or_default();
updated.insert(item);
self.database.remap_data_type::<RoaringBitmapCodec>().put(
wtxn,
&Key::updated(self.index),
&updated,
)?;
self.database.remap_data_type::<Unit>().put(wtxn, &Key::updated(self.index, item), &())?;

Ok(())
}
Expand All @@ -302,35 +293,19 @@ impl<D: Distance> Writer<D> {
Err(heed::Error::Mdb(MdbError::KeyExist)) => return Err(Error::InvalidItemAppend),
Err(e) => return Err(e.into()),
}
let mut updated = self
.database
.remap_data_type::<RoaringBitmapCodec>()
.get(wtxn, &Key::updated(self.index))?
.unwrap_or_default();
// We cannot append here because we may have removed an item with a larger id before
updated.insert(item);
self.database.remap_data_type::<RoaringBitmapCodec>().put(
wtxn,
&Key::updated(self.index),
&updated,
)?;
// We cannot append here because the items appear after the updated keys
self.database.remap_data_type::<Unit>().put(wtxn, &Key::updated(self.index, item), &())?;

Ok(())
}

/// Deletes an item stored in this database and returns `true` if it existed.
pub fn del_item(&self, wtxn: &mut RwTxn, item: ItemId) -> Result<bool> {
if self.database.delete(wtxn, &Key::item(self.index, item))? {
let mut updated = self
.database
.remap_data_type::<RoaringBitmapCodec>()
.get(wtxn, &Key::updated(self.index))?
.unwrap_or_default();
updated.insert(item);
self.database.remap_data_type::<RoaringBitmapCodec>().put(
self.database.remap_data_type::<Unit>().put(
wtxn,
&Key::updated(self.index),
&updated,
&Key::updated(self.index, item),
&(),
)?;

Ok(true)
Expand Down Expand Up @@ -430,7 +405,18 @@ impl<D: Distance> Writer<D> {
}

log::debug!("reset the updated items...");
self.database.delete(wtxn, &Key::updated(self.index))?;
let mut updated_iter = self
.database
.remap_types::<PrefixCodec, DecodeIgnore>()
.prefix_iter_mut(wtxn, &Prefix::updated(self.index))?
.remap_key_type::<KeyCodec>();
while updated_iter.next().transpose()?.is_some() {
// Safe because we don't hold any reference to the database currently
unsafe {
updated_iter.del_current()?;
}
}
drop(updated_iter);

log::debug!("write the metadata...");
let metadata = Metadata {
Expand All @@ -448,11 +434,23 @@ impl<D: Distance> Writer<D> {
return Ok(());
}

let updated_items = self
log::debug!("reset and retrieve the updated items...");
let mut updated_items = RoaringBitmap::new();
let mut updated_iter = self
.database
.remap_data_type::<RoaringBitmapCodec>()
.get(wtxn, &Key::updated(self.index))?
.unwrap_or_default();
.remap_types::<PrefixCodec, DecodeIgnore>()
.prefix_iter_mut(wtxn, &Prefix::updated(self.index))?
.remap_key_type::<KeyCodec>();
while let Some((key, _)) = updated_iter.next().transpose()? {
let inserted = updated_items.push(key.node.item);
debug_assert!(inserted, "The keys should be sorted by LMDB");
// Safe because we don't hold any reference to the database currently
unsafe {
updated_iter.del_current()?;
}
}
drop(updated_iter);

// while iterating on the nodes we want to delete all the modified element even if they are being inserted right after.
let to_delete = &updated_items;
let to_insert = &item_indices & &updated_items;
Expand Down Expand Up @@ -548,9 +546,6 @@ impl<D: Distance> Writer<D> {
roots.append(&mut thread_roots);
}

log::debug!("reset the updated items...");
self.database.delete(wtxn, &Key::updated(self.index))?;

log::debug!("write the metadata...");
let metadata = Metadata {
dimensions: self.dimensions.try_into().unwrap(),
Expand Down Expand Up @@ -774,6 +769,7 @@ impl<D: Distance> Writer<D> {
}
}
NodeMode::Metadata => unreachable!(),
NodeMode::Updated => todo!(),
}
}

Expand Down

0 comments on commit d72b469

Please sign in to comment.