ClickHouse中的物化视图:增量处理的强大力量
1. MV在ClickHouse中的工作原理——它是INSERT触发器,而非SELECT缓存
在熟悉的数据库(PostgreSQL、Oracle)中,物化视图(MV)是存储在磁盘上并按计划(REFRESH)更新的SELECT结果。你决定何时重新计算它。
在ClickHouse中,情况完全不同。在这里,MV是一个INSERT触发器。你基于SELECT创建MV,当向源表插入数据时,ClickHouse会自动对插入的行执行该SELECT,并将结果插入目标表。
关键区别:
- 在PostgreSQL中:你手动或按计划更新MV,读取整个源表。
- 在ClickHouse中:MV在每次插入时触发,仅处理新行。
现实类比: 普通视图就像带放大镜的眼镜。你透过它查看源数据,每次重新计算结果。ClickHouse中的物化视图就像坐在旁边的秘书,每当有新文档到达,她就将其以所需格式复制到特殊文件柜中。你不需要每次都翻阅所有文档,只需去文件柜取即可。
为什么这很重要: ClickHouse没有传统意义上的触发器。MV是唯一内置的响应数据插入的机制。使用它,你可以:
- 实时聚合数据(每小时总计)。
- 反规范化数据(从字典中查找运动名称)。
- 将相同的原始数据分发到多个具有不同结构的表(聚合、去重、完整归档)。
2. 简单MV:按小时汇总投注
让我们从最常见的场景开始:你有一个包含投注的bets表,需要按运动和小时统计。
步骤1:创建目标表(数据将在此聚合)
CREATE TABLE hourly_sport_stats
(
hour DateTime, -- 小时开始时间 (2025-06-01 14:00:00)
sport_id UInt8, -- 运动类型
bets_count UInt64, -- 每小时投注数
total_amount Decimal(18,2) -- 投注总金额
)
ENGINE = SummingMergeTree() -- 将按小时+sport_id求和
ORDER BY (hour, sport_id);
步骤2:创建填充该表的MV
CREATE MATERIALIZED VIEW mv_hourly_stats TO hourly_sport_stats AS
SELECT
toStartOfHour(created_at) AS hour, -- 将时间舍入到小时开始
sport_id,
count() AS bets_count, -- 组内投注数
sum(amount) AS total_amount -- 组内投注总额
FROM bets
GROUP BY hour, sport_id;
逐行解释:
TO hourly_sport_stats— 结果插入的位置。如果MV结构与SELECT结构匹配,可以省略,但最好显式指定。AS SELECT ...— 对bets表中每个插入批次执行的查询。不是对整个表,仅对新行。GROUP BY hour, sport_id— 批次内的聚合。如果一次插入有1000行属于同一小时,MV会为该小时计算一个汇总行。
如何使用:
-- 插入100个14:00的投注和50个15:00的投注
INSERT INTO bets (created_at, sport_id, amount) VALUES
('2025-06-01 14:15:00', 1, 100),
('2025-06-01 14:30:00', 1, 200),
('2025-06-01 15:00:00', 2, 50),
... (另外147行) ...;
-- MV自动在hourly_sport_stats中添加或更新行
-- 对于14:00小时,sport_id=1:bets_count增加插入的投注数
-- 对于15:00小时,sport_id=2:类似
-- 现在立即读取聚合结果,无需GROUP BY!
SELECT * FROM hourly_sport_stats
WHERE hour = '2025-06-01 14:00:00';
重要限制: ClickHouse中的MV无法在重复插入同一小时时更新目标表中的现有行。但由于我们使用SummingMergeTree,在后台合并期间,具有相同ORDER BY(hour, sport_id)的行将被求和。因此,SELECT * FROM hourly_sport_stats可能返回每个组的多个行——始终使用SUM和GROUP BY包装,就像使用SummingMergeTree一样。
3. 使用AggregatingMergeTree的MV——用于复杂聚合
如果你不仅需要求和,还需要例如唯一用户(uniq)、平均投注(avg),则需要使用AggregatingMergeTree和*State / *Merge函数。
-- 包含AggregateFunction列的目标表
CREATE TABLE hourly_sport_advanced
(
hour DateTime,
sport_id UInt8,
bets_count AggregateFunction(count, UInt64), -- count的状态
total_amount AggregateFunction(sum, Decimal(18,2)), -- sum的状态
unique_users AggregateFunction(uniq, UInt64), -- uniq的状态
avg_amount AggregateFunction(avg, Decimal(18,2)) -- avg的状态
)
ENGINE = AggregatingMergeTree()
ORDER BY (hour, sport_id);
-- 使用*State函数的MV
CREATE MATERIALIZED VIEW mv_hourly_advanced TO hourly_sport_advanced AS
SELECT
toStartOfHour(created_at) AS hour,
sport_id,
countState(user_id) AS bets_count, -- 不仅仅是count,而是countState
sumState(amount) AS total_amount,
uniqState(user_id) AS unique_users, -- 近似唯一值
avgState(amount) AS avg_amount
FROM bets
GROUP BY hour, sport_id;
为什么这样: AggregatingMergeTree存储的不是最终值,而是中间状态。这允许正确合并来自不同插入的聚合(例如,同一小时的两个批次)。uniqState存储唯一值的哈希表,而不仅仅是一个数字。
读取数据:
SELECT
hour,
sport_id,
countMerge(bets_count) AS bets_count,
sumMerge(total_amount) AS total_amount,
uniqMerge(unique_users) AS unique_users,
avgMerge(avg_amount) AS avg_amount
FROM hourly_sport_advanced
GROUP BY hour, sport_id;
4. 使用SummingMergeTree的MV用于简单计数器
对于简单的求和和计数器,SummingMergeTree比AggregatingMergeTree更简单、更快。第2节的示例非常适合这种情况。
另一个示例——按用户统计:
-- 目标表:每个用户每天投注次数
CREATE TABLE user_daily_counts
(
user_id UInt64,
day Date,
bets_count UInt64, -- 将被求和
total_amount Decimal(18,2) -- 将被求和
)
ENGINE = SummingMergeTree()
ORDER BY (user_id, day);
-- MV
CREATE MATERIALIZED VIEW mv_user_daily TO user_daily_counts AS
SELECT
user_id,
toDate(created_at) AS day,
count() AS bets_count,
sum(amount) AS total_amount
FROM bets
GROUP BY user_id, day;
SummingMergeTree的优势: 简单,读取时无需*Merge函数。为了安全(如果数据尚未合并),使用简单的SUM和GROUP BY就足够了。
缺点: 仅支持求和和计数器。不支持唯一用户、最大值、平均值。
5. MV链:事件 → 分钟聚合 → 小时 → 天
这是一种强大的模式,用于逐步降低粒度。不是直接从原始数据聚合到日统计(这需要重新计算数十亿行),而是构建一个链。
架构:
- 原始投注(表
raw_bets)——存储7天。 - 分钟聚合(表
minute_stats)——存储30天。 - 小时聚合(表
hourly_stats)——存储365天。 - 天聚合(表
daily_stats)——存储5年。
-- 第1层:从原始数据到分钟聚合
CREATE TABLE minute_stats
(
minute DateTime, -- 舍入到分钟
sport_id UInt8,
bets_count UInt64,
total_amount Decimal(18,2)
)
ENGINE = SummingMergeTree()
ORDER BY (minute, sport_id);
CREATE MATERIALIZED VIEW mv_minute_from_raw TO minute_stats AS
SELECT
toStartOfMinute(created_at) AS minute,
sport_id,
count() AS bets_count,
sum(amount) AS total_amount
FROM raw_bets
GROUP BY minute, sport_id;
-- 第2层:从分钟到小时聚合
CREATE TABLE hourly_stats
(
hour DateTime,
sport_id UInt8,
bets_count UInt64,
total_amount Decimal(18,2)
)
ENGINE = SummingMergeTree()
ORDER BY (hour, sport_id);
CREATE MATERIALIZED VIEW mv_hourly_from_minute TO hourly_stats AS
SELECT
toStartOfHour(minute) AS hour, -- 将分钟舍入到小时
sport_id,
sum(bets_count) AS bets_count, -- 对分钟计数器求和
sum(total_amount) AS total_amount
FROM minute_stats
GROUP BY hour, sport_id;
-- 第3层:从小时到天聚合
CREATE TABLE daily_stats
(
day Date,
sport_id UInt8,
bets_count UInt64,
total_amount Decimal(18,2)
)
ENGINE = SummingMergeTree()
ORDER BY (day, sport_id);
CREATE MATERIALIZED VIEW mv_daily_from_hourly TO daily_stats AS
SELECT
toDate(hour) AS day,
sport_id,
sum(bets_count) AS bets_count,
sum(total_amount) AS total_amount
FROM hourly_stats
GROUP BY day, sport_id;
链的优势:
- 每一层处理的数据量更小。
- 你可以删除低层级的旧数据(TTL),并保留多年的聚合数据。
- 查询月度报告时,你读取
daily_stats(每年365行),而不是raw_bets(数十亿行)。
类比: 就像复述一本书的情节:首先你读完整本书(原始数据),然后写章节摘要(分钟),然后写部分摘要(小时),最后写注释(天)。要回忆主要思想,你不需要重读整本书。
6. Null + MV模式用于Kafka:将数据分发到多个表
这是高负载系统的专业模式。不是写入一个表,而是创建一个Null表(黑洞)和多个从它读取并写入不同目标表的MV。
-- 步骤1:接收表(Null,不存储任何内容)
CREATE TABLE raw_events_null
(
user_id UInt64,
sport_id UInt8,
amount Decimal(18,2),
created_at DateTime,
ip String
)
ENGINE = Null;
-- 步骤2:目标表1 — 所有原始事件(用于调查)
CREATE TABLE raw_events_archive
(
user_id UInt64,
sport_id UInt8,
amount Decimal(18,2),
created_at DateTime,
ip String
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(created_at)
ORDER BY (created_at, user_id);
-- 步骤3:目标表2 — 去重投注(不含IP,符合GDPR)
CREATE TABLE bets_dedup
(
user_id UInt64,
sport_id UInt8,
amount Decimal(18,2),
created_at DateTime,
bet_id String
)
ENGINE = ReplacingMergeTree(created_at)
ORDER BY (user_id, bet_id);
-- 步骤4:目标表3 — 小时聚合
CREATE TABLE hourly_agg
(
hour DateTime,
sport_id UInt8,
total_amount AggregateFunction(sum, Decimal(18,2))
)
ENGINE = AggregatingMergeTree()
ORDER BY (hour, sport_id);
-- 步骤5:MV — 原始归档(所有行)
CREATE MATERIALIZED VIEW mv_archive TO raw_events_archive AS
SELECT * FROM raw_events_null;
-- 步骤6:MV — 去重投注(动态生成bet_id)
CREATE MATERIALIZED VIEW mv_dedup TO bets_dedup AS
SELECT
user_id,
sport_id,
amount,
created_at,
concat(toString(user_id), '_', toString(sipHash64(created_at))) AS bet_id
FROM raw_events_null
WHERE amount != 0;
-- 步骤7:MV — 小时聚合
CREATE MATERIALIZED VIEW mv_hourly_agg TO hourly_agg AS
SELECT
toStartOfHour(created_at) AS hour,
sport_id,
sumState(amount) AS total_amount
FROM raw_events_null
GROUP BY hour, sport_id;
实际工作原理:
-- Kafka消费者插入到raw_events_null(快速,无磁盘操作)
INSERT INTO raw_events_null VALUES (123, 1, 100.00, now(), '192.168.1.1');
-- 三个MV并行接收此数据并写入各自的表:
-- 1. mv_archive → raw_events_archive(完整副本,包括IP)
-- 2. mv_dedup → bets_dedup(去重投注,不含IP)
-- 3. mv_hourly_agg → hourly_agg(更新聚合)
为什么这很出色:
- 一次插入,三种不同的转换。
- 源表不存储任何内容(Null),没有写入开销。
- 每个MV可以独立启用/禁用。
- 容易添加第四个MV(例如,用于导出为其他格式)。
7. 问题:MV仅处理创建后的新数据
一个关键限制:MV看不到创建时源表中已有的数据。MV仅从创建时刻开始工作,只处理新的INSERT。
-- 我们已经在bets中有十亿行
SELECT count() FROM bets; -- 1,000,000,000
-- 创建MV
CREATE MATERIALIZED VIEW mv_hourly TO hourly_stats AS ...;
-- 插入一个新投注
INSERT INTO bets VALUES (now(), 1, 100);
-- 只有这个新投注会出现在hourly_stats中。
-- 之前的十亿行仍未处理!
如何解决——回填历史数据:
-- 方法1:使用与MV相同的SELECT手动INSERT
INSERT INTO hourly_stats
SELECT
toStartOfHour(created_at) AS hour,
sport_id,
count() AS bets_count,
sum(amount) AS total_amount
FROM bets
WHERE created_at < '2025-06-01' -- 旧数据
GROUP BY hour, sport_id;
-- 现在hourly_stats包含新旧数据
方法2:使用POPULATE重新创建MV(危险!)
-- 危险:使用CREATE ... POPULATE,MV处理整个现有表
-- 在处理过程中,表可能被锁定,数据可能丢失
CREATE MATERIALIZED VIEW mv_hourly TO hourly_stats POPULATE AS
SELECT ... FROM bets ...;
为什么POPULATE在生产中危险:
- 在SELECT执行期间,源表可能被锁定。
- 如果在POPULATE期间向源表插入新数据,这些数据可能会丢失(既不被旧MV捕获,也不被新MV捕获)。
- 对于大表,POPULATE可能需要数小时。
正确方法: 创建MV时不使用POPULATE,然后通过INSERT手动回填历史数据,使用与MV相同的SELECT。这安全、可预测,且不阻塞插入。
8. 监控和调试MV
检查MV是否活跃
-- 列出所有MV及其目标表
SELECT
name,
engine,
total_rows,
formatReadableSize(total_bytes) AS size
FROM system.tables
WHERE engine = 'MaterializedView'
AND database = 'default';
检查MV是否在INSERT时触发
-- 启用详细日志(仅调试)
SET log_queries = 1;
-- 向源表插入一行
INSERT INTO bets (created_at, sport_id, amount) VALUES (now(), 1, 100);
-- 查找与MV相关的查询
SELECT
query,
query_duration_ms,
read_rows,
written_rows
FROM system.query_log
WHERE type = 'QueryFinish'
AND query LIKE '%MV%'
AND event_time > now() - INTERVAL 1 MINUTE
ORDER BY event_time DESC;
检查目标表中的数据
-- 比较源表和目标表中唯一组的数量
-- (回填后应匹配)
-- 源表:唯一 (hour, sport) 数量
SELECT count(*) FROM (
SELECT toStartOfHour(created_at) AS hour, sport_id
FROM bets
GROUP BY hour, sport_id
) AS src;
-- 目标表:
SELECT count(*) FROM hourly_stats;
识别类型问题
如果MV未触发,通常问题是类型不兼容:
-- 错误:源表中sport_id类型为UInt8,但MV目标中为Int32
-- 解决方案:在SELECT中显式转换类型
SELECT
toStartOfHour(created_at) AS hour,
toUInt8(sport_id) AS sport_id, -- 显式转换
count() AS bets_count
FROM bets
GROUP BY hour, sport_id;
9. 博彩平台的真实架构
让我们组装一个在线赌场的完整架构,满足多个需求:
- 详细事件用于调查(存储7天)。
- 去重投注用于分析(存储90天)。
- 小时聚合用于仪表板(存储2年)。
- 天聚合用于财务报告(存储10年)。
-- ===== 第0层:接收器(Null) =====
CREATE TABLE raw_stream
(
user_id UInt64,
sport_id UInt8,
bet_id String,
amount Decimal(18,2),
created_at DateTime,
ip String
)
ENGINE = Null;
-- ===== 目标表 =====
-- 1. 归档7天(MergeTree + TTL)
CREATE TABLE raw_archive
(
user_id UInt64, sport_id UInt8, bet_id String,
amount Decimal(18,2), created_at DateTime, ip String
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(created_at)
ORDER BY created_at
TTL created_at + INTERVAL 7 DAY DELETE;
-- 2. 去重投注(ReplacingMergeTree)
CREATE TABLE bets_dedup
(
user_id UInt64, sport_id UInt8, bet_id String,
amount Decimal(18,2), created_at DateTime
)
ENGINE = ReplacingMergeTree(created_at)
ORDER BY (user_id, bet_id)
TTL created_at + INTERVAL 90 DAY DELETE;
-- 3. 小时聚合(SummingMergeTree)
CREATE TABLE hourly_agg
(
hour DateTime, sport_id UInt8, total_amount Decimal(18,2), bet_count UInt64
)
ENGINE = SummingMergeTree()
ORDER BY (hour, sport_id)
TTL hour + INTERVAL 2 YEAR DELETE;
-- 4. 天聚合(AggregatingMergeTree用于uniq)
CREATE TABLE daily_agg
(
day Date, sport_id UInt8, total_amount AggregateFunction(sum, Decimal(18,2)),
unique_users AggregateFunction(uniq, UInt64)
)
ENGINE = AggregatingMergeTree()
ORDER BY (day, sport_id)
TTL day + INTERVAL 10 YEAR DELETE;
-- ===== 物化视图 =====
CREATE MATERIALIZED VIEW mv_raw_archive TO raw_archive AS
SELECT * FROM raw_stream;
CREATE MATERIALIZED VIEW mv_bets_dedup TO bets_dedup AS
SELECT user_id, sport_id, bet_id, amount, created_at
FROM raw_stream
WHERE bet_id != '';
CREATE MATERIALIZED VIEW mv_hourly_agg TO hourly_agg AS
SELECT
toStartOfHour(created_at) AS hour,
sport_id,
sum(amount) AS total_amount,
count() AS bet_count
FROM raw_stream
GROUP BY hour, sport_id;
CREATE MATERIALIZED VIEW mv_daily_agg TO daily_agg AS
SELECT
toDate(created_at) AS day,
sport_id,
sumState(amount) AS total_amount,
uniqState(user_id) AS unique_users
FROM raw_stream
GROUP BY day, sport_id;
数据流:
- 应用程序写入
raw_stream(Null)——毫秒级。 - 四个MV并行处理每个插入。
- 每个目标表以其聚合/转换后的形式接收数据。
- TTL自动删除每个层级的过期数据。
相对于单个MERGETREE的优势:
- 最大插入速度(Null + MV在内存中工作)。
- 不同的存储级别(7天、90天、2年、10年)。
- 针对不同任务使用不同引擎(MergeTree、ReplacingMergeTree、SummingMergeTree、AggregatingMergeTree)。
- 查询旧聚合时无性能损失。
下一步
现在你知道如何使用MV构建复杂的数据处理管道。接下来的主题:
- 使用外部源更新的MV——如何结合字典和MV进行数据丰富。
- 调试复杂的MV链——如何理解数据为何未到达目标表。
- 集群中的MV复制——MV在分布式表上的行为。
总结: ClickHouse中的物化视图不仅仅是缓存。它们是一种强大的增量数据处理机制,允许在数据库内部构建真正的事件溯源架构。规则很简单:如果需要在插入时聚合、转换或复制数据,请使用MV。并且永远不要忘记手动回填历史数据,而不是在生产中使用POPULATE。
← 上一篇: 特殊ClickHouse引擎:当MergeTree不适用时
→ 下一篇: ClickHouse中的二级(跳跃)索引:当ORDER BY索引不够用时
— Editorial Team
暂无评论。