MongoDB scatter-gather queries: when mongos fans out to every shard

In a sharded MongoDB cluster, mongos reads chunk-to-shard mappings from the config servers and routes each query to the smallest set of shards that can satisfy it. When the predicate includes the shard key, mongos targets one shard or a bounded subset. When it does not, mongos broadcasts the query to every shard that owns chunks for the collection and merges the results. This is a scatter-gather query.

On a cluster with fifty shards, a scatter-gather query can consume fifty times the CPU, memory, and I/O of a targeted query. Latency is gated by the slowest responding shard. Aggregations, reporting queries, and some multi-document operations legitimately need to touch all shards. The operational problem is when scatter-gather queries dominate OLTP traffic, turning horizontal scale into a tightly coupled liability.

What it is and why it matters

A scatter-gather query, or broadcast operation, is any read or write where the query filter lacks the shard key and mongos cannot narrow execution to a specific shard. Mongos opens a cursor on every shard that owns chunks for the collection, collects results, and merges them.

For aggregations, blocking stages such as $group, $bucket, $bucketAuto, $count, and $sort split the pipeline. Shards execute the first half in parallel, but the merge phase runs on mongos or on a single shard.

This removes the isolation that sharding is meant to provide. Every shard pays the cost of parsing the query, acquiring WiredTiger tickets, scanning indexes, and returning results. Each shard uses its own ticket pool, so a broadcast consumes tickets on every node simultaneously and can block targeted queries on otherwise cold shards. Mongos holds a client cursor for each shard cursor; if one shard stalls, the whole operation blocks until that shard responds or the cursor times out.

How it works

Mongos targets or broadcasts based on shard key metadata in the config servers. For a compound shard key such as { a: 1, b: 1, c: 1 }, a query containing { a: 1 } or { a: 1, b: 1 } routes to a subset of shards. A query on { b: 1 } alone broadcasts because it misses the prefix.

flowchart TD
    Client([Client query])
    Mongos[mongos router]
    Config[config servers]
    Decision{Shard key
in predicate?} Target[Targeted shard subset] Broadcast[Every shard] Merge[Merge on mongos
or single shard] Client -->|sends query| Mongos Mongos -->|reads metadata| Config Mongos -->|routes| Decision Decision -->|yes| Target Decision -->|no| Broadcast Target -->|returns| Client Broadcast -->|returns cursors| Merge Merge -->|returns| Client

Multi-document writes such as updateMany() and deleteMany() broadcast to all shards unless the query contains an equality match on the full shard key. Single-document operations such as updateOne(), replaceOne(), and deleteOne() must include the shard key or _id to target; otherwise MongoDB returns an error rather than broadcasting.

Warning: Untargeted updateMany() and deleteMany() can cause sudden load spikes and replication lag on all secondaries. Test the predicate on a staging cluster before running against production.

Untargeted count() and distinct() operations also broadcast to all shards.

Use explain("executionStats") on a suspect aggregation to see the routing decision. The output includes mergeType ("anyShard", "specificShard", or "router"), splitPipeline, and per-shard execution stats. A shards array containing every shard in the cluster confirms a broadcast. When mergeType is "specificShard", the mergeShard field names the node that performs the final merge.

Write concern and read concern are enforced independently on each shard. A scatter-gather write with writeConcern: "majority" must be majority-committed on every shard before mongos acknowledges success. A read with readConcern: "snapshot" or "majority" waits for the slowest shard to satisfy the guarantee, adding latency tails.

Skip and limit introduce merge work that cannot be pushed to shards. Mongos applies skip() locally after retrieving results. When a query combines skip(n) with limit(m), mongos passes (n+m) to each shard to reduce the merge volume, but the skip still happens on the router. Paginated APIs that rely on deep skip values perform poorly on sharded collections.

Pipelines that target a single shard can still scatter on the joined collection. $lookup, $merge, or $graphLookup referencing a second collection may pin the merge stage to the primary shard holding that collection, serializing execution. With allowDiskUse: true and a grouping stage, the merger runs on a chosen shard to get local disk headroom. The final phase of a split $sort is a streaming merge. The final phase of $group is blocking, which means the chosen merger shard can become a bottleneck.

When mongos fans out, it applies the configured read preference independently on each shard. The selected member is governed by both read preference and replication.localPingThresholdMs, re-evaluated per operation. Secondary reads in a scatter-gather query add replication lag to the overall response time on every shard.

Where it shows up in production

Check the mongos slow query log for nShards equal to the total shard count. On the shell, explain("executionStats") shows a shards array that includes every shard, or a top-level SHARD_MERGE stage. A mergeType field in aggregation explain output also signals a broadcast.

Uniform load across all shards for a query that should be isolated is another tell. If opcounters, ticket utilization, or opLatencies spike in lockstep across every replica set, a scatter-gather query is likely running. This is common with OLTP queries that filter on a non-shard-key field, such as looking up a user by email when the collection is sharded on customer_id. It also appears when collections are sharded by _id, where every query that does not know the exact document ID fans out. Background analytics jobs that query by created_at range without the shard key prefix will broadcast even for small time windows.

updateMany and deleteMany operations in application code are frequent culprits when they omit the shard key. Aggregation pipelines with $lookup to a sharded foreign collection force scatter-gather on the joined side. Queries using skip() for pagination without shard-key prefixes are another source.

Low-cardinality shard keys and monotonically increasing shard keys create related problems. A key on continent yields at most seven chunks regardless of cluster size, so adding shards provides no benefit. An ascending shard key such as a timestamp concentrates writes on the chunk bounded by MaxKey, creating a hot shard. Both issues force a re-examination of shard key design.

Tradeoffs and when to use it

Scatter-gather is correct behavior when the system cannot determine where data lives. Reporting queries, cross-shard aggregations, and administrative operations legitimately scan all shards. The concern is frequency.

When evaluating a shard key, remember that prefix matching matters. A compound key serves queries on its leading fields. If the application most often queries by email but the shard key is { tenant_id: 1, user_id: 1 }, email lookups broadcast unless they also include tenant_id.

The tradeoff is between write distribution and read targeting. A hashed shard key on _id spreads writes evenly but forces almost every read to broadcast unless the _id is known. A monotonically increasing key like a timestamp targets time-range queries but creates a hot insert shard. The ideal shard key has high cardinality, appears in the majority of query predicates, and distributes writes evenly.

If you must run scatter-gather queries in production, set maxTimeMS to bound the impact of a slow shard.

Signals to watch in production

SignalWhy it mattersWarning sign
Slow query log nShardsConfirms fan-out scopenShards equals total shard count on mongos
explain("executionStats") shards arrayReveals routing decisionArray contains all shards for OLTP patterns
db.serverStatus().shardingStatisticsmongos-level scatter-gather rateIncreasing untargeted query count
Per-shard opcounters and latencyBroadcast loads all shards uniformlyAll shards spike together for one query pattern
metrics.document.returned vs opcounters.queryHigh ratio means each query returns many documentsCombined with scatter-gather, indicates broad filters
Chunk distributionImbalance compounds routing inefficiencyGreater than 20% skew between most and least loaded shard
db.currentOp() on mongosShows active scatter-gather operationsLong-running find or aggregate without shard-key filters
replSetGetStatus.members[].optimeDateReplication lag adds to broadcast latencyAll secondaries lagging together after scatter-gather read phase
explain("executionStats") executionTimeMillisPer-shard vs total execution timeLarge gap indicates merge or network overhead on mongos

How Netdata helps

Netdata monitors opcounters and opLatencies per shard, making uniform load spikes from broadcast queries visible in one view.

Correlating mongos query latency with per-shard CPU, disk I/O, and WiredTiger ticket utilization reveals whether a latency spike comes from one hot shard or from all shards responding to a scatter-gather query.

Tracking globalLock.currentQueue across all shards exposes when a broadcast query is queuing on multiple replica sets simultaneously.

Process-level metrics on mongos highlight merge latency, helping determine whether slowness is in shard execution or in the router’s merge phase.