[feat](dynamic table) support stream part 1: stream meta data & basic DDL#61382
[feat](dynamic table) support stream part 1: stream meta data & basic DDL#61382TsukiokaKogane wants to merge 20 commits intoapache:masterfrom
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
8800590 to
02dbb12
Compare
|
run buildall |
|
/review |
|
run buildall |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
FE UT Coverage ReportIncrement line coverage |
|
run buildall |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
FE UT Coverage ReportIncrement line coverage |
TPC-H: Total hot run time: 27041 ms |
TPC-DS: Total hot run time: 168631 ms |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
908e093 to
bf84473
Compare
|
run buildall |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
FE UT Coverage ReportIncrement line coverage |
TPC-H: Total hot run time: 26802 ms |
TPC-DS: Total hot run time: 168467 ms |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
|
|
||
| public void dropTable(String catalogName, String dbName, String tableName, boolean isView, boolean isMtmv, | ||
| boolean ifExists, boolean mustTemporary, boolean force) throws DdlException { | ||
| boolean isStream, boolean ifExists, boolean mustTemporary, boolean force) |
There was a problem hiding this comment.
put isStream in the last argument, and add another dropTable function like:
public void dropTable(String catalogName, String dbName, String tableName, boolean isView, boolean isMtmv,
boolean ifExists, boolean mustTemporary, boolean force) throws DdlException {
this function just call dropTable with isStream set false?
then you will decrease some changes.
There was a problem hiding this comment.
as dropTable is a interface of catalogIf, we may have to do about the same amount of works anyways. So I think the current modification is better.
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
Show resolved
Hide resolved
0951cf3 to
a2dd793
Compare
|
run buildall |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
FE UT Coverage ReportIncrement line coverage |
TPC-H: Total hot run time: 27228 ms |
TPC-DS: Total hot run time: 169014 ms |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
|
/review |
There was a problem hiding this comment.
Code Review Summary
This PR introduces the experimental "Table Stream" feature with DDL operations (CREATE/DROP/SHOW STREAM), information_schema tables, and persistence. The overall structure follows existing patterns (MTMV, View), but there are several bugs and issues that need to be addressed.
Critical Checkpoints Conclusions
1. Goal and Correctness: The PR accomplishes the stated goal of adding stream metadata and basic DDL. However, there is a critical data-corruption bug in MetadataGenerator.java that will cause the stream_consumption information_schema table to malfunction at runtime.
2. Modification minimality: The change is reasonably focused on stream DDL/metadata. The dropTable signature change across CatalogIf/ExternalCatalog/InternalCatalog adds an isStream boolean, which is the approach used by existing isView/isMtmv patterns.
3. Concurrency: StreamManager uses a MonitoredReentrantReadWriteLock, which is appropriate. However, the lock will be null after GSON deserialization (FE restart from image), causing NPE. This is a critical bug.
4. Lifecycle management: The rwLock in StreamManager is not initialized after deserialization. The class should implement GsonPostProcessable to reinitialize transient fields.
5. Configuration items: enable_table_stream is correctly marked as mutable = false and EXPERIMENTAL.
6. Incompatible changes: The dropTable method signature change across CatalogIf is an interface-breaking change but is handled by updating all implementations.
7. Parallel code paths: The removeStream() call is correctly placed in both dropTableInternal() and unprotectDropTable() code paths.
8. Test coverage: Unit tests cover basic create/drop streams. Regression tests cover information_schema queries. Missing: tests for persistence/restart, tests for dropping base table when streams exist, SHOW STREAMS tests with actual data.
9. Observability: LOG.warn is incorrectly guarded by LOG.isDebugEnabled() in StreamManager (2 occurrences). These warnings will never be logged in production.
10. Transaction/persistence: Stream creation is persisted via createTableWithLock -> EditLog. Replay path correctly calls StreamManager.addStream(). StreamManager itself is persisted via the image checkpoint mechanism.
11. Performance: No concerns for the DDL metadata operations introduced here.
Issues Found
| Severity | File | Issue |
|---|---|---|
| Critical | MetadataGenerator.java | Bug: streamsBuilder used instead of streamConsumptionBuilder when building STREAM_CONSUMPTION_COLUMN_TO_INDEX, resulting in (a) duplicate keys crash on ImmutableMap build, and (b) empty column index map |
| Critical | StreamManager.java | rwLock field is not annotated with @SerializedName and is not transient -- after GSON deserialization via StreamManager.read(), rwLock will be null, causing NPE on any subsequent operation. Class should implement GsonPostProcessable to reinitialize the lock. |
| Medium | StreamManager.java | LOG.warn() inside if (LOG.isDebugEnabled()) guard -- warnings about invalid stream IDs will never be logged (2 occurrences) |
| Low | BaseTableInfo.java | equals() overridden without hashCode(), violating the Java contract |
| Low | BaseStream.java | @SerializedName("streamType") on field streamConsumeType is misleading; the serialized name "streamType" stores a consume type, not a stream type |
| Minor | BE .cpp/.h files | Missing newline at end of file (4 new files) |
|
|
||
| ImmutableMap.Builder<String, Integer> streamsBuilder = new ImmutableMap.Builder(); | ||
| List<Column> streamsBuilderColList = SchemaTable.TABLE_MAP.get("streams") | ||
| .getFullSchema(); |
There was a problem hiding this comment.
Bug (Critical): This line uses streamsBuilder instead of streamConsumptionBuilder. Since streamsBuilder was already .build()-ed on line 265, calling .put() on it again will throw IllegalStateException at class-load time (ImmutableMap.Builder is single-use). Even if that didn't throw, STREAM_CONSUMPTION_COLUMN_TO_INDEX would be empty because streamConsumptionBuilder is never populated.
Fix:
streamConsumptionBuilder.put(streamConsumptionBuilderColList.get(i).getName().toLowerCase(), i);| @SerializedName(value = "dbStreamMap") | ||
| private Map<Long, Set<Long>> dbStreamMap; | ||
| protected MonitoredReentrantReadWriteLock rwLock; | ||
|
|
There was a problem hiding this comment.
Bug (Critical): rwLock is not a @SerializedName field (correctly), but it also lacks proper reinitialization after GSON deserialization. When StreamManager.read() calls GsonUtils.GSON.fromJson(), GSON uses Unsafe.allocateInstance() which bypasses the constructor, so rwLock will be null. Any subsequent call to addStream(), removeStream(), getStreamIds(), or fillStreamValues*() will NPE.
Fix: StreamManager should implement GsonPostProcessable and reinitialize rwLock in gsonPostProcess():
public class StreamManager implements Writable, GsonPostProcessable {
...
@Override
public void gsonPostProcess() throws IOException {
this.rwLock = new MonitoredReentrantReadWriteLock(true);
}
}| if (!table.isPresent()) { | ||
| if (LOG.isDebugEnabled()) { | ||
| LOG.warn("invalid stream id: {}, db: {}", tableId, db.get().getFullName()); | ||
| } |
There was a problem hiding this comment.
Bug (Medium): LOG.warn() is guarded by if (LOG.isDebugEnabled()). This means the warning about an invalid stream ID will never be logged unless debug logging is enabled, which defeats the purpose of a WARN-level message. Either:
- Remove the
isDebugEnabled()guard, or - Change
LOG.warntoLOG.debug
Same issue at the second occurrence in fillStreamConsumptionValuesMetadataResult().
|
|
||
| @Override | ||
| public boolean equals(Object o) { | ||
| if (this == o) { |
There was a problem hiding this comment.
Issue (Low): equals() is overridden without hashCode(), which violates the Java equals/hashCode contract. If BaseTableInfo objects are ever placed in a HashSet or used as HashMap keys, they will behave incorrectly. Consider adding a hashCode() implementation consistent with the equals() logic.
| private static ImmutableList<TableType> supportedTableTypeList = ImmutableList.of(TableType.OLAP); | ||
|
|
||
| @SerializedName("streamType") | ||
| protected StreamConsumeType streamConsumeType = StreamConsumeType.DEFAULT; |
There was a problem hiding this comment.
Naming concern (Low): The serialized name "streamType" stores StreamConsumeType, not the stream type (which would be e.g. OLAP_TABLE_STREAM). This is confusing and could cause compatibility issues if a true "stream type" field is added later. Consider renaming to "streamConsumeType" or "consumeType" before this becomes a persisted format commitment.
fe/fe-core/src/main/java/org/apache/doris/catalog/stream/BaseStream.java
Outdated
Show resolved
Hide resolved
| return supportedTableTypeList.contains(tableIf.getType()); | ||
| } | ||
|
|
||
| public void getProperties(StringBuilder sb) { |
There was a problem hiding this comment.
not sure what does this mean, i change class names from tream to tableStream
fe/fe-core/src/main/java/org/apache/doris/catalog/stream/BaseStream.java
Outdated
Show resolved
Hide resolved
fe/fe-core/src/main/java/org/apache/doris/catalog/stream/BaseStream.java
Outdated
Show resolved
Hide resolved
fe/fe-core/src/main/java/org/apache/doris/catalog/stream/BaseTableInfo.java
Outdated
Show resolved
Hide resolved
|
run buildall |
| schema.add(changeTypeColumn); | ||
| schema.add(sequenceColumn); | ||
| switch (params.baseTable.getType()) { | ||
| case OLAP: |
There was a problem hiding this comment.
not support base table is MTMV ?
| public static final String VERSION_COL = "__DORIS_VERSION_COL__"; | ||
| public static final String SKIP_BITMAP_COL = "__DORIS_SKIP_BITMAP_COL__"; | ||
| // table stream columns | ||
| public static final String STREAM_CHANGE_TYPE_COL = "__DORIS_STREAM_CHANGE_TYPE__"; |
There was a problem hiding this comment.
maybe use _COL__ as suffix is better because those new added cols are use too.
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
TPC-H: Total hot run time: 27123 ms |
TPC-DS: Total hot run time: 168606 ms |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
What problem does this PR solve?
Issue Number: close #xxx
Related PR: #57921
Problem Summary:
stream as basic building blocks for dynamic computing
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)