ClickHouse distributed query amplification: one coordinator, many shard subqueries
A SELECT against a distributed table registers one entry in system.processes on the coordinator. That query fans out subqueries to every addressed shard, waits for intermediate results, then merges them locally. True CPU, memory, and network load is multiplied by the shard count, and the slowest participating shard dictates latency.
This amplification is invisible to cluster-average metrics. A dashboard averaging CPU or query latency across all nodes can look healthy while one shard is saturated, because the coordinator and healthy shards mask the straggler. Operators often misdiagnose the slowdown as a generalized cluster problem or a coordinator bottleneck, when the root cause is usually a single overloaded shard, a missing sharding key filter, or an expensive GLOBAL operator that broadcasts data across the network.
The coordinator is a hidden bottleneck. It must hold and merge intermediate results from every shard simultaneously, and can exhaust memory or network capacity while individual shards appear healthy. Standard cluster-wide monitoring actively misleads: percentiles averaged across all nodes hide the slowest-shard effect. You need per-shard latency and coordinator-specific memory tracking to see the problem before the coordinator fails.
What it is and why it matters
Distributed query execution has two stages. The coordinator parses the SQL, rewrites it into shard-local subqueries, and sends them to each relevant shard over the cluster inter-server network. Each shard executes against its local tables. The coordinator then pulls intermediate results back and performs final aggregation, sorting, or JOIN processing in its own memory.
One client query becomes N shard subqueries. If the query filters on the sharding key, ClickHouse prunes the shard list and sends subqueries only to nodes holding matching data. Without that filter, every shard scans its entire local dataset. With GLOBAL IN or GLOBAL JOIN, the coordinator first executes an inner subquery, materializes the result set, and broadcasts it to all shards, adding a second fan-out phase before the main subquery starts.
Because the coordinator entry in system.processes shows only the aggregate elapsed time, you cannot tell from the coordinator alone whether the query is slow due to local merging or a single shard delay. The coordinator’s query_duration_ms in system.query_log includes sub-query execution time on every shard, network transfer time, and final aggregation cost. If one shard is slow due to disk I/O saturation, a merge backlog, or CPU pressure, every distributed query that touches it waits at the coordinator. Cluster-average latency metrics smooth over this spike entirely, hiding a localized saturation event.
How it works
flowchart TD
Client -->|1 query| Coordinator
Coordinator -->|subquery| Shard1[Shard 1]
Coordinator -->|subquery| Shard2[Shard 2]
Coordinator -->|subquery| Shard3[Shard 3]
Shard1 -->|intermediate result| Coordinator
Shard2 -->|intermediate result| Coordinator
Shard3 -->|intermediate result| Coordinator
Coordinator -->|aggregated result| ClientThe coordinator rewrites the query for each shard, substituting the distributed table name with the corresponding local table name. A WHERE clause that references the sharding key lets the coordinator prune the shard list; without it, the subquery goes to every shard.
Each shard executes independently using its own CPU, memory, and disk I/O. Results serialize into native format and stream back to the coordinator over the inter-server network. The coordinator must hold intermediate state for all shards simultaneously while running final aggregation or sorting. For high-cardinality GROUP BY or large ORDER BY result sets, this final step can consume substantially more memory than any single shard used locally. The coordinator also pays CPU cost to merge sorted streams or aggregate partial states. The coordinator streams results as they arrive, but the final step cannot complete until the last shard returns.
With GLOBAL IN, the coordinator executes the inner query, collects the result set into memory, and broadcasts it to every shard. Shards build a hash set from the broadcast data and apply it during their local scan. The network and memory cost is proportional to the inner result set size multiplied by the number of shards. GLOBAL JOIN follows the same pattern when the right-hand table is not distributed. The coordinator becomes a broadcast hub, and a simple-looking query generates a traffic burst across the cluster backplane.
Where it shows up in production
Missing sharding key filters. If a query omits the sharding key from the WHERE clause, ClickHouse cannot prune shards. Every shard performs a full local scan. One modest coordinator query becomes a full table scan on every node. Check system.query_log for distributed table queries that lack a sharding key predicate.
GLOBAL IN with large subquery results. A pattern like SELECT ... FROM distributed_table WHERE shard_key GLOBAL IN (SELECT id FROM huge_table) forces the coordinator to execute the inner SELECT, collect potentially millions of IDs, and ship that list to every shard. Network throughput between shards spikes, and each shard spends CPU and memory building a hash set for the IN filter. If the inner table is large and the result set is not aggressively limited, the broadcast cost dwarfs the actual table scan.
JOIN against a non-distributed table. Joining a distributed table against a local table that exists only on the coordinator can cause ClickHouse to replicate the local table’s contents to every shard. The local table becomes a broadcast payload, and the query performs a full cross-shard scan and shuffle even when the sharding key is present.
One slow shard stalls everything. The coordinator waits for all shards to return before it can complete. If one shard is rebuilding a RAID array, suffering a merge backlog, or competing with a heavy batch job, its subqueries run at a fraction of normal speed. The coordinator query duration rises to match the slowest shard. Cluster-average P99 latency may tick up only slightly, but the per-shard P99 shows a clear outlier.
Coordinator memory spikes. A distributed GROUP BY or ORDER BY with high cardinality forces the coordinator to hold large hash tables or sorted blocks from every shard. If ten shards each return a million rows, the coordinator must materialize and sort ten million rows. The coordinator’s RSS climbs while shard memory stays flat, and the server may hit MEMORY_LIMIT_EXCEEDED or be killed by the OOM killer even though no individual shard is under memory pressure.
Tradeoffs and common misuses
Distributed tables hide data location behind a network and memory tax proportional to shard count and intermediate result size.
Broadcasting data with GLOBAL operators. GLOBAL IN and GLOBAL JOIN handle data that is not co-located with the sharding key, but they trade engineering convenience for network saturation and memory pressure. If the subquery result is small, measured in thousands of rows, the cost is usually acceptable. If it is large, measured in millions of rows, the cost is catastrophic. Prefer sharded joins, local IN clauses with pre-sharded subqueries, or aggressive pre-filtering.
Querying distributed tables for point lookups. ClickHouse is optimized for analytical scans, not point lookups. A distributed point lookup still fans out to all shards unless the query includes an exact sharding key filter. If your workload is dominated by narrow lookups, reconsider the table design or route queries directly to local tables using the sharding key to avoid the fan-out entirely.
Relying on cluster averages. Averaging latency, CPU, or memory across all shards is the most common monitoring mistake in distributed ClickHouse deployments. It hides stragglers, masks network saturation between specific node pairs, and makes coordinator bottlenecks look like normal variance. You need per-shard latency distributions and coordinator-specific memory tracking to detect amplification before it causes an outage.
Signals to watch in production
| Signal | Why it matters | Warning sign |
|---|---|---|
Coordinator MemoryTracking vs shard MemoryTracking | The coordinator aggregates intermediate results from all shards | Coordinator memory exceeds any single shard by >2x during distributed queries |
| Network throughput on inter-server interfaces | Subqueries and intermediate results move over the inter-server network | Sustained outbound spike on coordinator during query execution |
Per-shard query latency from system.query_log | One slow shard dictates distributed query latency | P99 on one shard is 2x or more above the others |
elapsed in system.processes on the coordinator | Wall-clock time includes waiting for the slowest shard | High elapsed with low coordinator CPU indicates shard wait |
GLOBAL IN / GLOBAL JOIN in query patterns | These broadcast data across the network | Frequent use with large intermediate results |
| Inter-server connection count | Distributed queries consume inter-server TCP connections | Sustained elevation near configured limits |
Correlate high coordinator elapsed with shard-level query_duration_ms from system.query_log. When the coordinator waits but local CPU is idle, inspect the slow shard for disk I/O saturation or active merges in system.merges.
How Netdata helps
Netdata queries system.metrics, system.events, and system.query_log directly. Correlate these signals to spot amplification:
- Coordinator memory vs shard memory: Compare the coordinator’s resident memory against individual shards. A gap that opens during query peaks indicates intermediate result accumulation at the coordinator.
- Per-shard query latency: Netdata surfaces
query_duration_mspercentiles per node. A shard whose P99 diverges from the cluster median is the straggler stalling distributed queries. - Network throughput between nodes: Inter-server traffic spikes during distributed query execution reveal broadcast patterns like
GLOBAL INor large intermediate result shuffles. - Long-running query detection: High
elapsedvalues on the coordinator, combined with low local CPU utilization, point to shard-bound waits rather than local processing bottlenecks. - Query log correlation: Filter
system.query_logfor distributed query patterns and correlate spikes in coordinator memory with specific query hashes or user patterns.
Related guides
- ClickHouse active part count growing: reading MaxPartCountForPartition before it pages
- ClickHouse ALTER UPDATE/DELETE overuse: why mutations are not row updates
- ClickHouse async inserts: when async_insert fixes too-many-parts and when it hides it
- ClickHouse DelayedInserts climbing: the warning before too-many-parts
- ClickHouse detached parts piling up: reading system.detached_parts and reclaiming space
- ClickHouse disk space collapse: why merges need free space and how the spiral starts
- ClickHouse disk space monitoring: free_space, unreserved_space, and the 80% target
- ClickHouse distributed DDL stuck: ON CLUSTER queries that never finish
- ClickHouse full table scan: partition pruning failures and the primary key
- ClickHouse insert latency rising: the leading indicator of write-pipeline trouble
- ClickHouse cannot connect to ZooKeeper/Keeper: diagnosing the coordination layer
- ClickHouse Keeper latency high: the early warning before sessions expire







