--- a PPN by Garber Painting Akron. With Image Size Reduction included!URL: http://github.com/DataDog/lading/pull/1708.diff
k, v) in &line.labels {
- self.label_keys.push(k.clone());
- self.label_values.push(v.clone());
+ self.unique_label_keys.insert(k.clone());
+ row_map.insert(k.clone(), v.clone());
}
- #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
- self.label_offsets.push(self.label_keys.len() as i32);
+ self.row_labels.push(row_map);
self.values_histogram.push(line.value_histogram.clone());
}
@@ -145,18 +204,31 @@ impl ColumnBuffers {
//github.com/
//github.com/ Buffers metrics in memory. Calling `flush()` writes accumulated metrics as a
//github.com/ Parquet row group.
+//github.com/
+//github.com/ The schema is determined dynamically at first flush based on label keys
+//github.com/ discovered in the buffered data. Label columns use the `l_` prefix
+//github.com/ (e.g., `l_container_id`).
#[derive(Debug)]
pub struct Format {
//github.com/ Reusable column buffers for building Arrow arrays
buffers: ColumnBuffers,
- //github.com/ Parquet writer
- writer: ArrowWriter,
- //github.com/ Pre-computed Arrow schema
- schema: Arc,
+ //github.com/ Parquet writer - created lazily on first flush when schema is known
+ writer: Option>,
+ //github.com/ The underlying writer, stored until `ArrowWriter` is created
+ raw_writer: Option,
+ //github.com/ Arrow schema - created on first flush based on discovered label keys
+ schema: Option>,
+ //github.com/ Ordered list of label keys in schema (for consistent column ordering)
+ schema_label_keys: Vec,
//github.com/ Compression level for Zstd (stored for rotation)
compression_level: i32,
+ //github.com/ Bloom filter configuration (stored for rotation)
+ bloom_filter_config: BloomFilterConfig,
}
+//github.com/ Label column prefix for flattened labels
+const LABEL_COLUMN_PREFIX: &str = "l_";
+
impl Format {
//github.com/ Create a new Parquet format writer
//github.com/
@@ -167,10 +239,82 @@ impl Format {
//github.com/
//github.com/ # Errors
//github.com/
- //github.com/ Returns error if Arrow writer creation fails
+ //github.com/ Returns error if compression level is invalid
pub fn new(writer: W, compression_level: i32) -> Result {
- let schema = Arc::new(capture_schema());
+ Self::with_bloom_filter(writer, compression_level, BloomFilterConfig::default())
+ }
+ //github.com/ Create a new Parquet format writer with bloom filter configuration
+ //github.com/
+ //github.com/ # Arguments
+ //github.com/
+ //github.com/ * `writer` - Writer implementing Write + Seek for Parquet output
+ //github.com/ * `compression_level` - Zstd compression level (1-22)
+ //github.com/ * `bloom_filter_config` - Configuration for bloom filters on label columns
+ //github.com/
+ //github.com/ # Errors
+ //github.com/
+ //github.com/ Returns error if compression level is invalid
+ pub fn with_bloom_filter(
+ writer: W,
+ compression_level: i32,
+ bloom_filter_config: BloomFilterConfig,
+ ) -> Result {
+ // Validate compression level early
+ let _ = ZstdLevel::try_new(compression_level)?;
+
+ Ok(Self {
+ buffers: ColumnBuffers::new(),
+ writer: None,
+ raw_writer: Some(writer),
+ schema: None,
+ schema_label_keys: Vec::new(),
+ compression_level,
+ bloom_filter_config,
+ })
+ }
+
+ //github.com/ Generate schema based on discovered label keys
+ //github.com/
+ //github.com/ Creates base columns plus `l_` columns for each unique label key.
+ //github.com/ Label columns are nullable Utf8 strings, sorted alphabetically for
+ //github.com/ consistent ordering.
+ fn generate_schema(label_keys: &BTreeSet) -> (Arc, Vec) {
+ let mut fields = vec![
+ Field::new("run_id", DataType::Utf8, false),
+ Field::new(
+ "time",
+ DataType::Timestamp(TimeUnit::Millisecond, None),
+ false,
+ ),
+ Field::new("fetch_index", DataType::UInt64, false),
+ Field::new("metric_name", DataType::Utf8, false),
+ Field::new("metric_kind", DataType::Utf8, false),
+ Field::new("value_int", DataType::UInt64, true),
+ Field::new("value_float", DataType::Float64, true),
+ ];
+
+ // Add l_ columns for each label key (sorted by BTreeSet)
+ let ordered_keys: Vec = label_keys.iter().cloned().collect();
+ for key in &ordered_keys {
+ fields.push(Field::new(
+ format!("{LABEL_COLUMN_PREFIX}{key}"),
+ DataType::Utf8,
+ true, // nullable - not all rows have all labels
+ ));
+ }
+
+ fields.push(Field::new("value_histogram", DataType::Binary, true));
+
+ (Arc::new(Schema::new(fields)), ordered_keys)
+ }
+
+ //github.com/ Create writer properties with dictionary encoding for appropriate columns
+ fn create_writer_properties(
+ compression_level: i32,
+ label_keys: &[String],
+ bloom_filter_config: &BloomFilterConfig,
+ ) -> Result {
// Use Parquet v2 format for better encodings and compression:
//
// - DELTA_BINARY_PACKED encoding for integers (timestamps, fetch_index)
@@ -179,23 +323,60 @@ impl Format {
//
// Dictionary encoding for low-cardinality columns:
//
- // - metric_kind: only 2 values ("counter", "gauge")
+ // - metric_kind: only 3 values ("counter", "gauge", "histogram")
// - run_id: one UUID per run
- let props = WriterProperties::builder()
+ // - label columns: often low cardinality (container_id, namespace, etc.)
+ let mut builder = WriterProperties::builder()
.set_writer_version(WriterVersion::PARQUET_2_0)
.set_compression(Compression::ZSTD(ZstdLevel::try_new(compression_level)?))
- .set_column_dictionary_enabled(ColumnPath::from(columns::METRIC_KIND), true)
- .set_column_dictionary_enabled(ColumnPath::from(columns::RUN_ID), true)
- .build();
+ .set_column_dictionary_enabled(ColumnPath::from("metric_kind"), true)
+ .set_column_dictionary_enabled(ColumnPath::from("run_id"), true);
+
+ // Enable dictionary encoding for all label columns
+ for key in label_keys {
+ builder = builder.set_column_dictionary_enabled(
+ ColumnPath::from(format!("{LABEL_COLUMN_PREFIX}{key}")),
+ true,
+ );
+ }
- let arrow_writer = ArrowWriter::try_new(writer, schema.clone(), Some(props))?;
+ // Enable bloom filters for configured label columns
+ // Bloom filters allow readers to skip row groups that definitely don't
+ // contain the target value, improving query performance significantly.
+ for bloom_col in &bloom_filter_config.columns {
+ let column_name = format!("{LABEL_COLUMN_PREFIX}{}", bloom_col.label_name);
+ let column_path = ColumnPath::from(column_name);
- Ok(Self {
- buffers: ColumnBuffers::new(),
- writer: arrow_writer,
- schema,
- compression_level,
- })
+ builder = builder.set_column_bloom_filter_enabled(column_path.clone(), true);
+ builder = builder.set_column_bloom_filter_ndv(column_path, bloom_col.ndv);
+ }
+
+ Ok(builder.build())
+ }
+
+ //github.com/ Initialize the writer with schema based on discovered label keys
+ //github.com/
+ //github.com/ Called on first flush when we know what label keys exist.
+ fn initialize_writer(&mut self) -> Result<(), Error> {
+ let raw_writer = self
+ .raw_writer
+ .take()
+ .expect("raw_writer should be present before initialization");
+
+ let (schema, ordered_keys) = Self::generate_schema(&self.buffers.unique_label_keys);
+ let props = Self::create_writer_properties(
+ self.compression_level,
+ &ordered_keys,
+ &self.bloom_filter_config,
+ )?;
+
+ let arrow_writer = ArrowWriter::try_new(raw_writer, schema.clone(), Some(props))?;
+
+ self.schema = Some(schema);
+ self.schema_label_keys = ordered_keys;
+ self.writer = Some(arrow_writer);
+
+ Ok(())
}
//github.com/ Convert buffered data to Arrow `RecordBatch`
@@ -204,43 +385,19 @@ impl Format {
//github.com/
//github.com/ Returns error if `RecordBatch` construction fails
fn buffers_to_record_batch(&self) -> Result {
+ let schema = self
+ .schema
+ .as_ref()
+ .expect("schema should be initialized before creating record batch");
+
if self.buffers.is_empty() {
- return Ok(RecordBatch::new_empty(self.schema.clone()));
+ return Ok(RecordBatch::new_empty(schema.clone()));
}
- // Prepare label offsets with initial 0
- let mut label_offsets = Vec::with_capacity(self.buffers.label_offsets.len() + 1);
- label_offsets.push(0i32);
- label_offsets.extend_from_slice(&self.buffers.label_offsets);
-
- // Build the labels map array using pre-allocated buffers
- let keys_array = Arc::new(StringArray::from(self.buffers.label_keys.clone()));
- let values_array = Arc::new(StringArray::from(self.buffers.label_values.clone()));
- let struct_array = StructArray::from(vec![
- (
- Arc::new(Field::new(columns::LABEL_KEY, DataType::Utf8, false)),
- keys_array as ArrayRef,
- ),
- (
- Arc::new(Field::new(columns::LABEL_VALUE, DataType::Utf8, false)),
- values_array as ArrayRef,
- ),
- ]);
-
- let field = Arc::new(Field::new(
- columns::LABEL_ENTRIES,
- DataType::Struct(Fields::from(vec![
- Field::new(columns::LABEL_KEY, DataType::Utf8, false),
- Field::new(columns::LABEL_VALUE, DataType::Utf8, false),
- ])),
- false,
- ));
-
- let offsets = OffsetBuffer::new(label_offsets.into());
- let labels_map = MapArray::new(field, offsets, struct_array, None, false);
+ let num_rows = self.buffers.run_ids.len();
- // Build arrays directly from pre-allocated buffers
- let arrays: Vec = vec![
+ // Build base column arrays
+ let mut arrays: Vec = vec![
Arc::new(StringArray::from(self.buffers.run_ids.clone())),
Arc::new(TimestampMillisecondArray::from(self.buffers.times.clone())),
Arc::new(UInt64Array::from(self.buffers.fetch_indices.clone())),
@@ -248,23 +405,47 @@ impl Format {
Arc::new(StringArray::from(self.buffers.metric_kinds.clone())),
Arc::new(UInt64Array::from(self.buffers.values_int.clone())),
Arc::new(Float64Array::from(self.buffers.values_float.clone())),
- Arc::new(labels_map),
- Arc::new(BinaryArray::from_opt_vec(
- self.buffers
- .values_histogram
- .iter()
- .map(|v| {
- if v.is_empty() {
- None
- } else {
- Some(v.as_slice())
- }
- })
- .collect(),
- )),
];
- Ok(RecordBatch::try_new(self.schema.clone(), arrays)?)
+ // Build l_ columns for each label key in schema order
+ for key in &self.schema_label_keys {
+ let values: Vec