pFad - Phone/Frame/Anonymizer/Declutterfier! Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

URL: http://github.com/DataDog/lading/pull/1708.patch

impl ColumnBuffers { //github.com/ //github.com/ Buffers metrics in memory. Calling `flush()` writes accumulated metrics as a //github.com/ Parquet row group. +//github.com/ +//github.com/ The schema is determined dynamically at first flush based on label keys +//github.com/ discovered in the buffered data. Label columns use the `l_` prefix +//github.com/ (e.g., `l_container_id`). #[derive(Debug)] pub struct Format { //github.com/ Reusable column buffers for building Arrow arrays buffers: ColumnBuffers, - //github.com/ Parquet writer - writer: ArrowWriter, - //github.com/ Pre-computed Arrow schema - schema: Arc, + //github.com/ Parquet writer - created lazily on first flush when schema is known + writer: Option>, + //github.com/ The underlying writer, stored until `ArrowWriter` is created + raw_writer: Option, + //github.com/ Arrow schema - created on first flush based on discovered label keys + schema: Option>, + //github.com/ Ordered list of label keys in schema (for consistent column ordering) + schema_label_keys: Vec, //github.com/ Compression level for Zstd (stored for rotation) compression_level: i32, } +//github.com/ Label column prefix for flattened labels +const LABEL_COLUMN_PREFIX: &str = "l_"; + impl Format { //github.com/ Create a new Parquet format writer //github.com/ @@ -167,10 +191,61 @@ impl Format { //github.com/ //github.com/ # Errors //github.com/ - //github.com/ Returns error if Arrow writer creation fails + //github.com/ Returns error if compression level is invalid pub fn new(writer: W, compression_level: i32) -> Result { - let schema = Arc::new(capture_schema()); + // Validate compression level early + let _ = ZstdLevel::try_new(compression_level)?; + + Ok(Self { + buffers: ColumnBuffers::new(), + writer: None, + raw_writer: Some(writer), + schema: None, + schema_label_keys: Vec::new(), + compression_level, + }) + } + + //github.com/ Generate schema based on discovered label keys + //github.com/ + //github.com/ Creates base columns plus `l_` columns for each unique label key. + //github.com/ Label columns are nullable Utf8 strings, sorted alphabetically for + //github.com/ consistent ordering. + fn generate_schema(label_keys: &BTreeSet) -> (Arc, Vec) { + let mut fields = vec![ + Field::new("run_id", DataType::Utf8, false), + Field::new( + "time", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new("fetch_index", DataType::UInt64, false), + Field::new("metric_name", DataType::Utf8, false), + Field::new("metric_kind", DataType::Utf8, false), + Field::new("value_int", DataType::UInt64, true), + Field::new("value_float", DataType::Float64, true), + ]; + + // Add l_ columns for each label key (sorted by BTreeSet) + let ordered_keys: Vec = label_keys.iter().cloned().collect(); + for key in &ordered_keys { + fields.push(Field::new( + format!("{LABEL_COLUMN_PREFIX}{key}"), + DataType::Utf8, + true, // nullable - not all rows have all labels + )); + } + + fields.push(Field::new("value_histogram", DataType::Binary, true)); + (Arc::new(Schema::new(fields)), ordered_keys) + } + + //github.com/ Create writer properties with dictionary encoding for appropriate columns + fn create_writer_properties( + compression_level: i32, + label_keys: &[String], + ) -> Result { // Use Parquet v2 format for better encodings and compression: // // - DELTA_BINARY_PACKED encoding for integers (timestamps, fetch_index) @@ -179,23 +254,45 @@ impl Format { // // Dictionary encoding for low-cardinality columns: // - // - metric_kind: only 2 values ("counter", "gauge") + // - metric_kind: only 3 values ("counter", "gauge", "histogram") // - run_id: one UUID per run - let props = WriterProperties::builder() + // - label columns: often low cardinality (container_id, namespace, etc.) + let mut builder = WriterProperties::builder() .set_writer_version(WriterVersion::PARQUET_2_0) .set_compression(Compression::ZSTD(ZstdLevel::try_new(compression_level)?)) - .set_column_dictionary_enabled(ColumnPath::from(columns::METRIC_KIND), true) - .set_column_dictionary_enabled(ColumnPath::from(columns::RUN_ID), true) - .build(); + .set_column_dictionary_enabled(ColumnPath::from("metric_kind"), true) + .set_column_dictionary_enabled(ColumnPath::from("run_id"), true); + + // Enable dictionary encoding for all label columns + for key in label_keys { + builder = builder.set_column_dictionary_enabled( + ColumnPath::from(format!("{LABEL_COLUMN_PREFIX}{key}")), + true, + ); + } - let arrow_writer = ArrowWriter::try_new(writer, schema.clone(), Some(props))?; + Ok(builder.build()) + } - Ok(Self { - buffers: ColumnBuffers::new(), - writer: arrow_writer, - schema, - compression_level, - }) + //github.com/ Initialize the writer with schema based on discovered label keys + //github.com/ + //github.com/ Called on first flush when we know what label keys exist. + fn initialize_writer(&mut self) -> Result<(), Error> { + let raw_writer = self + .raw_writer + .take() + .expect("raw_writer should be present before initialization"); + + let (schema, ordered_keys) = Self::generate_schema(&self.buffers.unique_label_keys); + let props = Self::create_writer_properties(self.compression_level, &ordered_keys)?; + + let arrow_writer = ArrowWriter::try_new(raw_writer, schema.clone(), Some(props))?; + + self.schema = Some(schema); + self.schema_label_keys = ordered_keys; + self.writer = Some(arrow_writer); + + Ok(()) } //github.com/ Convert buffered data to Arrow `RecordBatch` @@ -204,43 +301,19 @@ impl Format { //github.com/ //github.com/ Returns error if `RecordBatch` construction fails fn buffers_to_record_batch(&self) -> Result { + let schema = self + .schema + .as_ref() + .expect("schema should be initialized before creating record batch"); + if self.buffers.is_empty() { - return Ok(RecordBatch::new_empty(self.schema.clone())); + return Ok(RecordBatch::new_empty(schema.clone())); } - // Prepare label offsets with initial 0 - let mut label_offsets = Vec::with_capacity(self.buffers.label_offsets.len() + 1); - label_offsets.push(0i32); - label_offsets.extend_from_slice(&self.buffers.label_offsets); - - // Build the labels map array using pre-allocated buffers - let keys_array = Arc::new(StringArray::from(self.buffers.label_keys.clone())); - let values_array = Arc::new(StringArray::from(self.buffers.label_values.clone())); - let struct_array = StructArray::from(vec![ - ( - Arc::new(Field::new(columns::LABEL_KEY, DataType::Utf8, false)), - keys_array as ArrayRef, - ), - ( - Arc::new(Field::new(columns::LABEL_VALUE, DataType::Utf8, false)), - values_array as ArrayRef, - ), - ]); - - let field = Arc::new(Field::new( - columns::LABEL_ENTRIES, - DataType::Struct(Fields::from(vec![ - Field::new(columns::LABEL_KEY, DataType::Utf8, false), - Field::new(columns::LABEL_VALUE, DataType::Utf8, false), - ])), - false, - )); - - let offsets = OffsetBuffer::new(label_offsets.into()); - let labels_map = MapArray::new(field, offsets, struct_array, None, false); + let num_rows = self.buffers.run_ids.len(); - // Build arrays directly from pre-allocated buffers - let arrays: Vec = vec![ + // Build base column arrays + let mut arrays: Vec = vec![ Arc::new(StringArray::from(self.buffers.run_ids.clone())), Arc::new(TimestampMillisecondArray::from(self.buffers.times.clone())), Arc::new(UInt64Array::from(self.buffers.fetch_indices.clone())), @@ -248,23 +321,47 @@ impl Format { Arc::new(StringArray::from(self.buffers.metric_kinds.clone())), Arc::new(UInt64Array::from(self.buffers.values_int.clone())), Arc::new(Float64Array::from(self.buffers.values_float.clone())), - Arc::new(labels_map), - Arc::new(BinaryArray::from_opt_vec( - self.buffers - .values_histogram - .iter() - .map(|v| { - if v.is_empty() { - None - } else { - Some(v.as_slice()) - } - }) - .collect(), - )), ]; - Ok(RecordBatch::try_new(self.schema.clone(), arrays)?) + // Build l_ columns for each label key in schema order + for key in &self.schema_label_keys { + let values: Vec> = self + .buffers + .row_labels + .iter() + .map(|row_map| row_map.get(key).map(String::as_str)) + .collect(); + arrays.push(Arc::new(StringArray::from(values))); + } + + // Add histogram column last + arrays.push(Arc::new( + self.buffers + .values_histogram + .iter() + .map(|v| { + if v.is_empty() { + None + } else { + Some(v.as_slice()) + } + }) + .collect::(), + )); + + debug_assert_eq!( + arrays.len(), + schema.fields().len(), + "array count ({}) must match schema field count ({})", + arrays.len(), + schema.fields().len() + ); + debug_assert!( + arrays.iter().all(|a| a.len() == num_rows), + "all arrays must have {num_rows} rows", + ); + + Ok(RecordBatch::try_new(schema.clone(), arrays)?) } //github.com/ Write buffered metrics as a Parquet row group @@ -277,8 +374,16 @@ impl Format { return Ok(()); } + // Initialize writer on first flush when we know the label keys + if self.writer.is_none() { + self.initialize_writer()?; + } + let batch = self.buffers_to_record_batch()?; - self.writer.write(&batch)?; + self.writer + .as_mut() + .expect("writer should be initialized") + .write(&batch)?; self.buffers.clear(); Ok(()) } @@ -321,8 +426,12 @@ impl Format { pub fn close(mut self) -> Result<(), Error> { // Write any remaining buffered data as a final row group self.write_parquet()?; - // Close the ArrowWriter which consumes it - self.writer.close()?; + + // Close the ArrowWriter if it was created + if let Some(writer) = self.writer { + writer.close()?; + } + // If writer was never created (no data written), nothing to close Ok(()) } } @@ -350,7 +459,7 @@ impl Format> { //github.com/ # Errors //github.com/ //github.com/ Returns an error if closing the current file or creating the new file fails. - pub fn rotate_to(self, path: std::path::PathBuf) -> Result { + pub fn rotate_to(self, path: &std::path::Path) -> Result { // Store compression level before closing let compression_level = self.compression_level; @@ -358,14 +467,15 @@ impl Format> { self.close()?; // Create new file and writer - let file = std::fs::File::create(&path)?; - let writer = std::io::BufWriter::new(file); + let file = File::create(path)?; + let writer = BufWriter::new(file); let format = Self::new(writer, compression_level)?; Ok(format) } //github.com/ Get the compression level for this format + #[must_use] pub fn compression_level(&self) -> i32 { self.compression_level } @@ -462,4 +572,114 @@ mod tests { assert!(!buffer.get_ref().is_empty(), "should have written data"); } + + #[test] + fn writes_label_columns() { + use arrow_array::{Array, RecordBatchReader}; + use bytes::Bytes; + use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + + let mut buffer = Cursor::new(Vec::new()); + + { + let mut format = Format::new(&mut buffer, 3).expect("create format"); + + // Write metric with labels + let mut labels = FxHashMap::default(); + labels.insert("container_id".to_string(), "abc123".to_string()); + labels.insert("namespace".to_string(), "default".to_string()); + labels.insert("qos_class".to_string(), "Guaranteed".to_string()); + + let line = Line { + run_id: Uuid::new_v4(), + time: 1000, + fetch_index: 0, + metric_name: "test_metric".into(), + metric_kind: MetricKind::Gauge, + value: LineValue::Float(42.0), + labels, + value_histogram: Vec::new(), + }; + + format.write_metric(&line).expect("write should succeed"); + + // Write another metric with different labels + let mut labels2 = FxHashMap::default(); + labels2.insert("container_id".to_string(), "def456".to_string()); + labels2.insert("namespace".to_string(), "kube-system".to_string()); + // Note: no qos_class label + + let line2 = Line { + run_id: Uuid::new_v4(), + time: 2000, + fetch_index: 1, + metric_name: "test_metric".into(), + metric_kind: MetricKind::Gauge, + value: LineValue::Float(100.0), + labels: labels2, + value_histogram: Vec::new(), + }; + + format.write_metric(&line2).expect("write should succeed"); + format.close().expect("close should succeed"); + } + + // Read back and verify schema has l_* columns + let data = Bytes::from(buffer.into_inner()); + let reader = ParquetRecordBatchReaderBuilder::try_new(data) + .expect("create reader") + .build() + .expect("build reader"); + + let schema = reader.schema(); + + // Check that l_* columns exist (sorted alphabetically) + assert!( + schema.field_with_name("l_container_id").is_ok(), + "should have l_container_id column" + ); + assert!( + schema.field_with_name("l_namespace").is_ok(), + "should have l_namespace column" + ); + assert!( + schema.field_with_name("l_qos_class").is_ok(), + "should have l_qos_class column" + ); + + // Check no labels MapArray column + assert!( + schema.field_with_name("labels").is_err(), + "should NOT have labels column (replaced by l_* columns)" + ); + + // Read data and verify values + let batches: Vec<_> = reader.into_iter().collect(); + assert_eq!(batches.len(), 1, "should have one batch"); + let batch = batches[0].as_ref().expect("batch should be ok"); + + assert_eq!(batch.num_rows(), 2, "should have 2 rows"); + + // Check l_container_id values + let container_col = batch + .column_by_name("l_container_id") + .expect("l_container_id column"); + let container_arr = container_col + .as_any() + .downcast_ref::() + .expect("string array"); + assert_eq!(container_arr.value(0), "abc123"); + assert_eq!(container_arr.value(1), "def456"); + + // Check l_qos_class values (second row should be null) + let qos_col = batch + .column_by_name("l_qos_class") + .expect("l_qos_class column"); + let qos_arr = qos_col + .as_any() + .downcast_ref::() + .expect("string array"); + assert_eq!(qos_arr.value(0), "Guaranteed"); + assert!(qos_arr.is_null(1), "second row should have null qos_class"); + } } diff --git a/lading_capture/src/manager.rs b/lading_capture/src/manager.rs index 66de09796..4f22274aa 100644 --- a/lading_capture/src/manager.rs +++ b/lading_capture/src/manager.rs @@ -463,7 +463,7 @@ impl std::fmt::Debug for RotationRequest { } } -//github.com/ Handle for sending rotation requests to a running CaptureManager +//github.com/ Handle for sending rotation requests to a running [`CaptureManager`] pub type RotationSender = mpsc::Sender; impl CaptureManager>, RealClock> { @@ -512,6 +512,10 @@ impl CaptureManager>, RealCloc //github.com/ the event loop runs. The `JoinHandle` can be awaited to ensure the //github.com/ CaptureManager has fully drained and closed before shutdown. //github.com/ + //github.com/ # Panics + //github.com/ + //github.com/ Panics if the shutdown watcher is missing (should never happen in normal use). + //github.com/ //github.com/ # Errors //github.com/ //github.com/ Returns an error if there is already a global recorder set. @@ -545,7 +549,10 @@ impl CaptureManager>, RealCloc let global_labels = self.global_labels; let clock = self.clock; let recv = self.recv; - let shutdown = self.shutdown.take().expect("shutdown watcher must be present"); + let shutdown = self + .shutdown + .take() + .expect("shutdown watcher must be present"); let handle = tokio::spawn(async move { if let Err(e) = Self::rotation_event_loop( @@ -572,6 +579,7 @@ impl CaptureManager>, RealCloc //github.com/ Internal event loop with rotation support #[allow(clippy::too_many_arguments)] + #[allow(clippy::cast_possible_truncation)] async fn rotation_event_loop( expiration: Duration, format: formats::parquet::Format>, @@ -651,7 +659,7 @@ impl CaptureManager>, RealCloc // Swap formats - this flushes any buffered data let old_format = state_machine .replace_format(new_format) - .map_err(|e| formats::Error::Io(io::Error::new(io::ErrorKind::Other, e.to_string())))?; + .map_err(|e| formats::Error::Io(io::Error::other(e.to_string())))?; // Close old format to write Parquet footer old_format.close()?; diff --git a/lading_capture/src/validate/parquet.rs b/lading_capture/src/validate/parquet.rs index a2eb49aae..d46fbf980 100644 --- a/lading_capture/src/validate/parquet.rs +++ b/lading_capture/src/validate/parquet.rs @@ -10,10 +10,7 @@ use std::fs::File; use std::hash::{BuildHasher, Hasher}; use std::path::Path; -use arrow_array::{ - Array, MapArray, StringArray, StructArray, TimestampMillisecondArray, UInt64Array, -}; -use lading_capture_schema::columns; +use arrow_array::{Array, StringArray, TimestampMillisecondArray, UInt64Array}; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use crate::validate::ValidationResult; @@ -95,51 +92,57 @@ pub fn validate_parquet>( } let time_array = batch - .column_by_name(columns::TIME) - .ok_or_else(|| Error::MissingColumn(columns::TIME.to_string()))? + .column_by_name("time") + .ok_or_else(|| Error::MissingColumn("time".to_string()))? .as_any() .downcast_ref::() .ok_or_else(|| { - Error::InvalidColumnType(format!( - "'{}' column is not TimestampMillisecond", - columns::TIME - )) + Error::InvalidColumnType("'time' column is not TimestampMillisecond".to_string()) })?; let fetch_index_array = batch - .column_by_name(columns::FETCH_INDEX) - .ok_or_else(|| Error::MissingColumn(columns::FETCH_INDEX.to_string()))? + .column_by_name("fetch_index") + .ok_or_else(|| Error::MissingColumn("fetch_index".to_string()))? .as_any() .downcast_ref::() .ok_or_else(|| { - Error::InvalidColumnType(format!("'{}' column is not UInt64", columns::FETCH_INDEX)) + Error::InvalidColumnType("'fetch_index' column is not UInt64".to_string()) })?; let metric_name_array = batch - .column_by_name(columns::METRIC_NAME) - .ok_or_else(|| Error::MissingColumn(columns::METRIC_NAME.to_string()))? + .column_by_name("metric_name") + .ok_or_else(|| Error::MissingColumn("metric_name".to_string()))? .as_any() .downcast_ref::() .ok_or_else(|| { - Error::InvalidColumnType(format!("'{}' column is not String", columns::METRIC_NAME)) + Error::InvalidColumnType("'metric_name' column is not String".to_string()) })?; - let labels_array = batch - .column_by_name(columns::LABELS) - .ok_or_else(|| Error::MissingColumn(columns::LABELS.to_string()))? - .as_any() - .downcast_ref::() - .ok_or_else(|| { - Error::InvalidColumnType(format!("'{}' column is not Map", columns::LABELS)) - })?; + // Collect l_* columns for label extraction (new schema uses flat columns) + let schema = batch.schema(); + let l_columns: Vec<(&str, &StringArray)> = schema + .fields() + .iter() + .filter_map(|field| { + let name = field.name(); + if name.starts_with("l_") { + batch + .column_by_name(name) + .and_then(|c| c.as_any().downcast_ref::()) + .map(|arr| (name.strip_prefix("l_").unwrap_or(name), arr)) + } else { + None + } + }) + .collect(); let metric_kind_array = batch - .column_by_name(columns::METRIC_KIND) - .ok_or_else(|| Error::MissingColumn(columns::METRIC_KIND.to_string()))? + .column_by_name("metric_kind") + .ok_or_else(|| Error::MissingColumn("metric_kind".to_string()))? .as_any() .downcast_ref::() .ok_or_else(|| { - Error::InvalidColumnType(format!("'{}' column is not String", columns::METRIC_KIND)) + Error::InvalidColumnType("'metric_kind' column is not String".to_string()) })?; // Validate invariants: fetch_index uniquely maps to time, @@ -178,27 +181,13 @@ pub fn validate_parquet>( fetch_index_to_time.insert(fetch_index, time); } - let labels_slice: StructArray = labels_array.value(row); - let key_array = labels_slice - .column(0) - .as_any() - .downcast_ref::() - .ok_or_else(|| { - Error::InvalidColumnType("Labels keys are not StringArray".to_string()) - })?; - let value_array = labels_slice - .column(1) - .as_any() - .downcast_ref::() - .ok_or_else(|| { - Error::InvalidColumnType("Labels values are not StringArray".to_string()) - })?; - + // Extract labels from l_* columns let mut sorted_labels: BTreeSet = BTreeSet::new(); - for i in 0..key_array.len() { - let key = key_array.value(i); - let value = value_array.value(i); - sorted_labels.insert(format!("{key}:{value}")); + for (key, arr) in &l_columns { + if !arr.is_null(row) { + let value = arr.value(row); + sorted_labels.insert(format!("{key}:{value}")); + } } let mut hasher = hash_builder.build_hasher(); From 410a2dc527854033244187fdda8e65d28863faa6 Mon Sep 17 00:00:00 2001 From: Scott Opell Date: Fri, 2 Jan 2026 15:39:08 -0500 Subject: [PATCH 2/2] Add bloom filter configuration API for parquet format MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add BloomFilterConfig and BloomFilterColumn types to configure bloom filters on label columns. Bloom filters enable efficient query-time filtering by allowing readers to skip row groups that definitely don't contain a target value. New APIs: - Format::with_bloom_filter() - create writer with bloom filter config - format.bloom_filter_config() - getter for rotation - CaptureManager::new_parquet_with_bloom_filter() - CaptureManager::new_multi_with_bloom_filter() Backwards compatible - existing Format::new() and new_parquet() still work unchanged using BloomFilterConfig::default(). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- lading_capture/src/formats/parquet.rs | 101 ++++++++++++++++++++++++-- lading_capture/src/manager.rs | 97 ++++++++++++++++++++++++- 2 files changed, 189 insertions(+), 9 deletions(-) diff --git a/lading_capture/src/formats/parquet.rs b/lading_capture/src/formats/parquet.rs index 6db3bd38b..5cc652771 100644 --- a/lading_capture/src/formats/parquet.rs +++ b/lading_capture/src/formats/parquet.rs @@ -33,6 +33,52 @@ use parquet::{ use crate::line; +//github.com/ Configuration for a single bloom filter column +#[derive(Debug, Clone)] +pub struct BloomFilterColumn { + //github.com/ Label name (e.g., `container_id` will be applied to `l_container_id` column) + pub label_name: String, + //github.com/ Expected number of distinct values (NDV) for sizing the bloom filter. + //github.com/ Higher values create larger filters with lower false positive rates. + pub ndv: u64, +} + +impl BloomFilterColumn { + //github.com/ Create a new bloom filter column configuration + #[must_use] + pub fn new(label_name: impl Into, ndv: u64) -> Self { + Self { + label_name: label_name.into(), + ndv, + } + } +} + +//github.com/ Configuration for bloom filters on label columns +//github.com/ +//github.com/ Bloom filters enable efficient query-time filtering by allowing readers +//github.com/ to skip row groups that definitely don't contain a target value. +//github.com/ This is especially useful for high-cardinality label columns like +//github.com/ `container_id` where queries often filter for a specific value. +//github.com/ +//github.com/ # Example +//github.com/ +//github.com/ ``` +//github.com/ use lading_capture::formats::parquet::{BloomFilterConfig, BloomFilterColumn}; +//github.com/ +//github.com/ let config = BloomFilterConfig { +//github.com/ columns: vec![ +//github.com/ BloomFilterColumn::new("container_id", 100), +//github.com/ ], +//github.com/ }; +//github.com/ ``` +#[derive(Debug, Clone, Default)] +pub struct BloomFilterConfig { + //github.com/ Label columns to enable bloom filters on. + //github.com/ Each column specifies the label name (without `l_` prefix) and expected NDV. + pub columns: Vec, +} + //github.com/ Parquet format errors #[derive(thiserror::Error, Debug)] pub enum Error { @@ -176,6 +222,8 @@ pub struct Format { schema_label_keys: Vec, //github.com/ Compression level for Zstd (stored for rotation) compression_level: i32, + //github.com/ Bloom filter configuration (stored for rotation) + bloom_filter_config: BloomFilterConfig, } //github.com/ Label column prefix for flattened labels @@ -193,6 +241,25 @@ impl Format { //github.com/ //github.com/ Returns error if compression level is invalid pub fn new(writer: W, compression_level: i32) -> Result { + Self::with_bloom_filter(writer, compression_level, BloomFilterConfig::default()) + } + + //github.com/ Create a new Parquet format writer with bloom filter configuration + //github.com/ + //github.com/ # Arguments + //github.com/ + //github.com/ * `writer` - Writer implementing Write + Seek for Parquet output + //github.com/ * `compression_level` - Zstd compression level (1-22) + //github.com/ * `bloom_filter_config` - Configuration for bloom filters on label columns + //github.com/ + //github.com/ # Errors + //github.com/ + //github.com/ Returns error if compression level is invalid + pub fn with_bloom_filter( + writer: W, + compression_level: i32, + bloom_filter_config: BloomFilterConfig, + ) -> Result { // Validate compression level early let _ = ZstdLevel::try_new(compression_level)?; @@ -203,6 +270,7 @@ impl Format { schema: None, schema_label_keys: Vec::new(), compression_level, + bloom_filter_config, }) } @@ -245,6 +313,7 @@ impl Format { fn create_writer_properties( compression_level: i32, label_keys: &[String], + bloom_filter_config: &BloomFilterConfig, ) -> Result { // Use Parquet v2 format for better encodings and compression: // @@ -271,6 +340,17 @@ impl Format { ); } + // Enable bloom filters for configured label columns + // Bloom filters allow readers to skip row groups that definitely don't + // contain the target value, improving query performance significantly. + for bloom_col in &bloom_filter_config.columns { + let column_name = format!("{LABEL_COLUMN_PREFIX}{}", bloom_col.label_name); + let column_path = ColumnPath::from(column_name); + + builder = builder.set_column_bloom_filter_enabled(column_path.clone(), true); + builder = builder.set_column_bloom_filter_ndv(column_path, bloom_col.ndv); + } + Ok(builder.build()) } @@ -284,7 +364,11 @@ impl Format { .expect("raw_writer should be present before initialization"); let (schema, ordered_keys) = Self::generate_schema(&self.buffers.unique_label_keys); - let props = Self::create_writer_properties(self.compression_level, &ordered_keys)?; + let props = Self::create_writer_properties( + self.compression_level, + &ordered_keys, + &self.bloom_filter_config, + )?; let arrow_writer = ArrowWriter::try_new(raw_writer, schema.clone(), Some(props))?; @@ -454,22 +538,23 @@ impl Format> { //github.com/ Rotate to a new output file //github.com/ //github.com/ Closes the current Parquet file (writing footer) and opens a new file - //github.com/ at the specified path with the same compression settings. + //github.com/ at the specified path with the same compression and bloom filter settings. //github.com/ //github.com/ # Errors //github.com/ //github.com/ Returns an error if closing the current file or creating the new file fails. pub fn rotate_to(self, path: &std::path::Path) -> Result { - // Store compression level before closing + // Store settings before closing let compression_level = self.compression_level; + let bloom_filter_config = self.bloom_filter_config.clone(); // Close current file (writes footer) self.close()?; - // Create new file and writer + // Create new file and writer with same settings let file = File::create(path)?; let writer = BufWriter::new(file); - let format = Self::new(writer, compression_level)?; + let format = Self::with_bloom_filter(writer, compression_level, bloom_filter_config)?; Ok(format) } @@ -479,6 +564,12 @@ impl Format> { pub fn compression_level(&self) -> i32 { self.compression_level } + + //github.com/ Get the bloom filter configuration for this format + #[must_use] + pub fn bloom_filter_config(&self) -> &BloomFilterConfig { + &self.bloom_filter_config + } } #[cfg(test)] diff --git a/lading_capture/src/manager.rs b/lading_capture/src/manager.rs index 4f22274aa..b3e095783 100644 --- a/lading_capture/src/manager.rs +++ b/lading_capture/src/manager.rs @@ -481,13 +481,55 @@ impl CaptureManager>, RealCloc experiment_started: lading_signal::Watcher, target_running: lading_signal::Watcher, expiration: Duration, + ) -> Result { + Self::new_parquet_with_bloom_filter( + capture_path, + flush_seconds, + compression_level, + parquet::BloomFilterConfig::default(), + shutdown, + experiment_started, + target_running, + expiration, + ) + .await + } + + //github.com/ Create a new [`CaptureManager`] with file-based Parquet writer and bloom filter config + //github.com/ + //github.com/ # Arguments + //github.com/ + //github.com/ * `capture_path` - Path to the output Parquet file + //github.com/ * `flush_seconds` - How often to flush buffered data + //github.com/ * `compression_level` - Zstd compression level (1-22) + //github.com/ * `bloom_filter_config` - Configuration for bloom filters on label columns + //github.com/ * `shutdown` - Signal to gracefully shut down the capture manager + //github.com/ * `experiment_started` - Signal that the experiment has started + //github.com/ * `target_running` - Signal that the target is running + //github.com/ * `expiration` - Duration after which metrics expire + //github.com/ + //github.com/ # Errors + //github.com/ + //github.com/ Function will error if the underlying capture file cannot be opened or + //github.com/ if Parquet writer creation fails. + #[allow(clippy::too_many_arguments)] + pub async fn new_parquet_with_bloom_filter( + capture_path: PathBuf, + flush_seconds: u64, + compression_level: i32, + bloom_filter_config: parquet::BloomFilterConfig, + shutdown: lading_signal::Watcher, + experiment_started: lading_signal::Watcher, + target_running: lading_signal::Watcher, + expiration: Duration, ) -> Result { let fp = fs::File::create(&capture_path) .await .map_err(formats::Error::Io)?; let fp = fp.into_std().await; let writer = BufWriter::new(fp); - let format = parquet::Format::new(writer, compression_level)?; + let format = + parquet::Format::with_bloom_filter(writer, compression_level, bloom_filter_config)?; Ok(Self::new_with_format( format, @@ -539,6 +581,7 @@ impl CaptureManager>, RealCloc self.clock.mark_start(); let compression_level = self.format.compression_level(); + let bloom_filter_config = self.format.bloom_filter_config().clone(); // Run the event loop in a spawned task so we can return the sender immediately let expiration = self.expiration; @@ -567,6 +610,7 @@ impl CaptureManager>, RealCloc shutdown, rotation_rx, compression_level, + bloom_filter_config, ) .await { @@ -592,6 +636,7 @@ impl CaptureManager>, RealCloc shutdown: lading_signal::Watcher, mut rotation_rx: mpsc::Receiver, compression_level: i32, + bloom_filter_config: parquet::BloomFilterConfig, ) -> Result<(), Error> { let mut flush_interval = clock.interval(Duration::from_millis(TICK_DURATION_MS as u64)); let shutdown_wait = shutdown.recv(); @@ -624,6 +669,7 @@ impl CaptureManager>, RealCloc &mut state_machine, rotation_req.path, compression_level, + &bloom_filter_config, ).await; // Send result back to caller (ignore send error if receiver dropped) let _ = rotation_req.response.send(result); @@ -647,14 +693,19 @@ impl CaptureManager>, RealCloc >, new_path: PathBuf, compression_level: i32, + bloom_filter_config: &parquet::BloomFilterConfig, ) -> Result<(), formats::Error> { - // Create new file and format + // Create new file and format with same settings let fp = fs::File::create(&new_path) .await .map_err(formats::Error::Io)?; let fp = fp.into_std().await; let writer = BufWriter::new(fp); - let new_format = parquet::Format::new(writer, compression_level)?; + let new_format = parquet::Format::with_bloom_filter( + writer, + compression_level, + bloom_filter_config.clone(), + )?; // Swap formats - this flushes any buffered data let old_format = state_machine @@ -693,6 +744,40 @@ impl experiment_started: lading_signal::Watcher, target_running: lading_signal::Watcher, expiration: Duration, + ) -> Result { + Self::new_multi_with_bloom_filter( + base_path, + flush_seconds, + compression_level, + parquet::BloomFilterConfig::default(), + shutdown, + experiment_started, + target_running, + expiration, + ) + .await + } + + //github.com/ Create a new [`CaptureManager`] with file-based multi-format writer and bloom filter config + //github.com/ + //github.com/ Writes to both JSONL and Parquet formats simultaneously. The base path + //github.com/ is used to generate two output files: `{base_path}.jsonl` and + //github.com/ `{base_path}.parquet`. + //github.com/ + //github.com/ # Errors + //github.com/ + //github.com/ Function will error if either capture file cannot be opened or if + //github.com/ format creation fails. + #[allow(clippy::too_many_arguments)] + pub async fn new_multi_with_bloom_filter( + base_path: PathBuf, + flush_seconds: u64, + compression_level: i32, + bloom_filter_config: parquet::BloomFilterConfig, + shutdown: lading_signal::Watcher, + experiment_started: lading_signal::Watcher, + target_running: lading_signal::Watcher, + expiration: Duration, ) -> Result { let jsonl_path = base_path.with_extension("jsonl"); let parquet_path = base_path.with_extension("parquet"); @@ -709,7 +794,11 @@ impl .map_err(formats::Error::Io)?; let parquet_file = parquet_file.into_std().await; let parquet_writer = BufWriter::new(parquet_file); - let parquet_format = parquet::Format::new(parquet_writer, compression_level)?; + let parquet_format = parquet::Format::with_bloom_filter( + parquet_writer, + compression_level, + bloom_filter_config, + )?; let format = multi::Format::new(jsonl_format, parquet_format); pFad - Phonifier reborn

Pfad - The Proxy pFad © 2024 Your Company Name. All rights reserved.





Check this box to remove all script contents from the fetched content.



Check this box to remove all images from the fetched content.


Check this box to remove all CSS styles from the fetched content.


Check this box to keep images inefficiently compressed and original size.

Note: This service is not intended for secure transactions such as banking, social media, email, or purchasing. Use at your own risk. We assume no liability whatsoever for broken pages.


Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy