Optimize Data Migration in MongoDB: Resharding Techniques for Speed and Scalability

Written by deadsix | Published 2023/06/07
Tech Story Tags: mongodb | sharding | horizontal-scaling | data | database-optimization | database-administration | document-database | hackernoon-top-story

TLDRA technique called “reshard-to-shard” uses resharding to spread data across the shards in your MongoDB cluster quickly. Allowing you to shard a collection and distribute it across multiple shards in hours, distributing your workload quickly without any hassle. via the TL;DR App

Need to shard a 2TB collection and distribute the data across all of your shards in under 24 hours? Leverage resharding to accelerate your data migration!

Disclaimer: I am a MongoDB employee, but all opinions and views expressed are my own

In this post, we’ll cover a technique called “reshard-to-shard” which uses resharding to spread data across the shards in your cluster quickly.

We will go over:

  1. Considerations before and during the migration.
  2. How to initiate, monitor, and abort the faster data migration.

If you are new to sharding or want a refresher on how MongoDB delivers horizontal scalability, please check out the MongoDB manual.

Table of Contents

  • Background
  • What are the benefits of reshard-to-shard?
  • When should I use reshard-to-shard?
  • When should I not use reshard-to-shard?
  • What are the reshard-to-shard prerequisites?
  • Reshard-to-shard technique overview
  • Reshard-to-shard example
  • FAQs

Background

When a collection is initially sharded on a multi-shard cluster, the balancer begins migrating data from the shard that holds the recently sharded collection to the other shards in the cluster to equally distribute the collection across the shards. When the balancer is migrating data, a shard can only participate in one migration at a time, no matter how many collections need migrating. Meaning that in a 3-shard cluster, only two shards at-a-time can migrate data between them. Resharding does not share the same limitations because of the internal execution differences.

Because resharding is rewriting all the data, it is able to write data in parallel across all of the shards in the cluster, increasing the throughput and greatly reducing the time to migrate data across the shards relative to what the balancer can accomplish. Resharding builds a new collection with a new shard key in the background on each of the shards while keeping your existing collection available for your application to utilize. Once all of the documents are cloned into the new collection, the cutover occurs. The existing collection with the old shard key is dropped in favor of the new collection built by the resharding operation.

What are the benefits of reshard-to-shard?

First, it’s much faster! By leveraging resharding, a customer was able to shard and distribute their 3.5TB collection across 4 shards in 22.5 hours. The same process would’ve taken 30 days if left to the balancer’s default method of chunk migrations.

Customers who do not use the default ObjectId() to generate values for the mandatory _id field may not see a speed increase due to changes in read patterns with custom values versus a value generated by ObjectId()

Second, it minimally impacts your workload! After the balancer migrates data, it has to conduct a clean-up operation called range deletion on the shard that donated the data as only one shard can own each specific document. Range deletion is an I/O-intensive operation that can impact your cluster’s performance. Resharding does not need to conduct range deletion since it conducts a drop of the old collection after it cuts over to the new collection with the new shard key.

Third, you automatically reclaim your disk space! By dropping the old collection, it frees up storage space to be used by any collection without having to execute an operation such as compact. This means you can more quickly and easily scale down storage after the operation if desired.

For example, a customer was consuming nearly 2.8TB on shard0 before the resharding operation of their largest collection was complete.

When resharding finished, 1.9TB of storage space was immediately returned! They went from consuming 2.7TB of storage to 873GB of storage.

When should I use reshard-to-shard?

Ans: When you’re initially sharding a collection of any size across any number of shards.

There are some scenarios where balancing may be faster (e.g. less than 100GB), but you still have to take into account range deletion and reclaiming the storage via compact or initial sync. Therefore, if you have the capacity, we recommend reshard-to-shard no matter how large the collection you want to shard is.

When should I not use reshard-to-shard?

You should not use the reshard-to-shard tactic when:

  • Your application cannot tolerate a blockage of writes for two seconds to allow for the cutover to the resharded collection.
    • The duration writes can be blocked for the collection being resharded by default is two seconds, there is a configurable parameter that can modify the blockage duration.

  • Your collection is a time-series collection
    • If you attempt to reshard a time-series collection you’ll receive an error message stating that resharding a time-series collection is not supported.

For the scenarios listed above, use the traditional method of sharding a collection and letting the balancer migrate data.

What are the reshard-to-shard prerequisites?

  1. A MongoDB cluster running MongoDB 5.0 or greater

    1. If running MongoDB 5.0 or 6.0 ensure your cluster is running a patch release greater than 5.0.14 or 6.0.3
  2. You must select an appropriate shard key for your collection.

  3. Build the indexes needed to support both the temporary shard key and the desired shard key.

  4. Additionally, since you’ll be using resharding to accelerate data migration speeds, please familiarize yourself with the resharding requirements and limitations.

To successfully execute a resharding operation your cluster should have:

  • 2.2x the collection size per shard available on each shard.
    • For example, if you’re sharding a 1TB collection across 4 shards, the sharded collection size is 250GB per shard when distributed across 4 shards. You want to have a minimum of 550GB of storage available on each shard.
  • I/O capacity below 50%
  • CPU usage below 80%

Customers using Atlas whose cluster does not meet the storage, I/O, and CPU requirements to execute resharding can easily temporarily scale their cluster and/or customize storage to increase their cluster’s resources to allow for a successful resharding operation. They can return to normal once the operation has been completed.

Reshard-to-shard technique overview

There are two very simple steps to execute a reshard-to-shard operation:

  1. Shard Into a Temporary Shard Key
  2. Reshard Into Your Desired Shard Key

Why would I first shard into a temporary shard key, and won’t that harm my application?

Let’s explain!

Step one: shard with a temporary shard key

Currently, resharding does not support resharding into the same shard key (it will succeed as a “no-op” because you’re already in the desired state). To get around this limitation the reshard-to-shard technique requires intentionally sharding into a temporary shard key that is different from the desired shard key. Due to MongoDB’s support for both range sharding and hashed sharding the temporary shard key can be very slightly modified from the desired shard key you selected for your collection.

The temporary shard key should select a different partitioning strategy for only one of your shard key fields. Due to limitations for certain queries such as updateOne(), updateMany(), deleteOne(), etc that require the query to include the shard key, you’ll be using a different partitioning strategy. MongoDB uses partitioning strategies only as a way to determine how to distribute your data across the shards in your cluster and it does not change the values in the document. Meaning that your application can utilize an updateOne or another query that requires the shard key with both partitioning strategies.

For example, if the desired shard key you selected for your collection is:

{"_id": "hashed"}

The temporary shard key to initially use for your collection should be:

{"_id": 1}

For compound shard keys, you can use the prefix of the desired shard key for the temporary shard key. For example, if the desired shard key you selected for your collection is:

{ launch_vehicle: 1, payload: 1}

Your temporary shard key should be:

{ launch_vehicle: 1}

The reshard-to-shard tactic calls for a nearly immediate resharding into the shard key you’ll use long-term after the initial sharding of the collection with the temporary shard key is complete. This keeps over 99% of the data on one shard while the resharding operation executes, very significantly reducing the impact of broadcast queries.

Since you’ll have built both indexes for your temporary and desired shard key, while the resharding operation is being conducted, queries utilizing your desired shard key will be performant as they can leverage the index for the desired shard key while your collection is temporarily partitioned by your temporary shard key.

Step two: reshard into your desired shard key

The second step is executing a normal resharding operation except you’re leveraging a side effect of how resharding works to your benefit.

Resharding has four major phases:

  • Initialization - the collection undergoing resharding is sampled and a new distribution of data based on the new shard key is determined.

  • Index - the resharding operation creates a new empty temporary sharded collection on all of the shards based on the new shard key and builds the indexes including the non-shard key indexes supporting the existing collection.

  • Clone, Catch-Up, and Apply - documents are cloned to the shards according to the new shard key, and any modifications to documents while the resharding operation was executed are applied.

  • Commit - the temporary collection is renamed and takes the place of the collection being resharded and the now old collection is dropped.

After reviewing the phases above, you can see how you’ll gain the benefit of fast data movement, a sharded collection that is equally distributed across your shards once the operation is complete, and freed storage space all in one go.

Once the resharding operation is complete, you can conduct clean-up operations such as dropping the temporary shard key index and scaling down your cluster and/or your storage to fit your steady-state needs.

Reshard-to-shard example

Let’s say you’re working on an application that will track commercial aircraft so that customers can be notified if it’s likely that their flight will be delayed. You’ve studied your application’s query patterns and reviewed what attributes contribute to a good shard key.

The shard key you’ve selected for your collection is:

{ airline: 1, flight_number: 1, date: "hashed" }

With the shard key determined, you can start checking off the prerequisites to execute a reshard-to-shard operation. First, you generate your temporary shard key. As stated previously, you want your temporary shard key to be a very slightly modified version of the desired shard key.

So the temporary shard key you’ve selected is:

{ airline: 1, flight_number: 1 }

Next, you build the indexes to support both the temporary and final shard keys.

Indexes can be created using Mongo shell via db.collection.createIndex() or db.collection.createIndexes().

Since the desired shard key is a compound shard key you only have to create one index via db.collection.createIndexes():

db.flight_tracker.createIndex(
  { "airline": 1, "flight_number": 1, date: "hashed" }
)

The index builds can be monitored using mongo shell via the following command:

db.adminCommand({
  currentOp: true,
  $or: [
    { op: "command", "command.createIndexes": { $exists: true }  },
    { op: "none", "msg" : /^Index Build/ }
  ]
})

If your MongoDB cluster is deployed on Atlas, you can use the Atlas UI to easily review the available metrics which inform you that your cluster has enough free storage available plus the CPU and I/O headroom to conduct a resharding operation.

If there isn’t enough storage space available or I/O headroom, you can scale the storage. For a lack of CPU headroom you can scale the cluster. Scaling both storage and the cluster is easily done via the Atlas UI.

Accessing the cluster configuration is simple and can be done from your group’s overview screen which displays all of the deployed clusters for the group.

With all of the prerequisites met, you can start the first part of the reshard-to-shard process - sharding the collection with the temporary shard key.

You can then use Mongo shell and the sh.shardCollection() command to shard the collection:

sh.shardCollection("main.flight_tracker", { airline: 1, flight_number: 1 })

sh.shardCollection() will return a document when complete, the field ok will have a value of 1 if the operation was successful.

{
  collectionsharded: 'main.flight_tracker',
  ok: 1,
  '$clusterTime': {
    clusterTime: Timestamp({ t: 1684160896, i: 25 }),
    signature: {
      hash: Binary(Buffer.from("7cb424a56cacd56e47bf155bc036e4a4da4ad6b6", "hex"), 0),
      keyId: Long("7233411438331559942")
    }
  },
  operationTime: Timestamp({ t: 1684160896, i: 21 })
}

Once the collection is sharded, wait for one chunk to be migrated to every shard in the cluster. You can check if each shard has a chunk via sh.status() in mongo shell:

sh.status()

To reshard your collection into the desired shard key use  sh.reshardCollection() in mongo shell:

sh.reshardCollection("main.flight_tracker", { airline: 1, flight_number: 1, date: "hashed" })

If you run the sh.status() command in mongo shell you’ll see a new collection in the output with the format of the name for the new collection being <db_name>.system.resharding.<UUID>. This is the collection that resharding is building and distributing the data according to your desired shard key

To monitor the status of the resharding operation for the flight_tracker collection you can use the following command

db.getSiblingDB("admin").aggregate([
  { $currentOp: { allUsers: true, localOps: false } },
  {
    $match: {
      type: "op",
      "originatingCommand.reshardCollection": "main.flight_tracker"
    }
  }
])

The output of the command will inform you which stage the resharding operation is currently executing and the estimated time to completion via the remainingOperationTimeEstimatedSecs field. Please note that the resharing estimated time to completion is pessimistic and resharding operations take significantly less time than reported!

The resharding operation should be nearing completion when each shard’s data size has increased by the size of the collection being resharded divided by the number of shards in the cluster. For example a 1TB collection being resharded across 4 shards the resharding operation should be completing when each shard has written 250GB (not accounting for other data being inserted, updated, or deleted on the shard).

If your cluster is deployed on Atlas you can also monitor the progress of the resharding operation via the Atlas UI using the Metrics tab of the cluster.

  • For Atlas clusters running MongoDB 6.0 and greater - you can use the shard data size display option and then select a collection with a syntax of <db_name>.system.resharding.<UUID> . This view isolates the temporary collection and will only display the data size growth of the new collection.

  • For Atlas clusters running MongoDB 5.0 - you can use the db logical data size display option. This view does not allow for collection-level isolation.

While the resharding is executing, the three metrics from the cluster you should be primarily monitoring are:

  • Available Storage - consuming all of the available storage can lead to cluster instability
  • CPU Utilization - high CPU utilization can lead to degraded cluster performance
  • I/O Usage - I/O usage above your capacity can lead to degraded cluster performance

If you’re ever concerned about resharding negatively affecting your cluster you can instantly abort the resharding operation before it reaches the commit portion of the process via the following command:

sh.abortReshardCollection("main.flight_tracker")

The resharding operation when it concludes will return if the operation was successful or not to the invoking client.

Since resharding is a long-running operation and you may have closed that Mongo shell session, you can check if the sharding operation is still executing by using the resharding monitoring aggregation if you want details or __sh.status()__to see if the temporary collection is still present in the output.  If the resharding aggregation does not return anything or if you don’t see a temporary collection any more in the output of sh.status(), the resharding operation has ended.

You can usedb.collection.getShardDistribution to ascertain whether the operation was successful:

db.flight_tracker.getShardDistribution()

If resharding successfully completed you should see an output where the distribution is equal across the shards.

  • For MongoDB 6.0 and greater evenness is determined by data size per shard so you should see a nearly equal amount of data on each of the shards in the output of db.collection.getShardDistribution.

  • For MongoDB 5.0 evenness is determined by the number of chunks per shard so you should see an equal number of chunks on each of the shards in the output of db.collection.getShardDistribution.

If your cluster is deployed on Atlas you can use the Atlas UI via the Metrics tab to determine whether the resharding operation is successful.

  • For Atlas clusters running MongoDB 6.0 and greater - you can use the shard data size display option and then select your collection that underwent resharding. You should see an equal amount of data per shard displayed.

  • For Atlas clusters running MongoDB 5.0 - you can use the chunks display option and then select your collection that underwent resharding. You should see a nearly equal number of chunks displayed across all of the shards in your cluster.

For both shard data size and the number of chunks the Atlas UI will display a sharp increase in the relevant metric due to resharding using a collection name format of <db_name>.system.resharding.<UUID> temporarily before renaming it and dropping the old collection with the old shard key.

If resharding is aborted, the output of db.collection.getShardDistribution will likely show most of the data on the shard that the collection was initially sharded on. Aborts with resharding are rare and are likely because the resharding could not conduct the collection cutover in 2 seconds or less.

If that is the case, we recommend timing the start of resharding so that it tries to commit during a period of lower traffic for your cluster.

Once the resharding operation is complete you can conduct clean-up operations such as dropping the temporary shard key index and scaling down your cluster and/or your storage to fit your steady-state needs.

FAQs

  1. How long does reshard-to-shard take?

    • It depends on your collection size, the number and size of indexes for your collection, and the number of shards in your cluster, but we’re confident that you can reshard-to-shard a 4TB collection with 10 indexes across 4 shards in 48 hours or less. Letting the balancer handle the migrations would take 30 days or more.

  2. Why is resharding faster than letting the balancer migrate data?

    • The internals of how the balancer and resharding migrate data are different. Resharding reads documents in a different order than chunk migrations and since resharding concludes with a drop of the old collection there is no waiting for range deletion to release the disk space.

  3. I want to use reshard-to-shard on a collection that has a uniqueness constraint and hash indexes don’t support enforcing uniqueness.

    • If your collection has a uniqueness constraint you can use reshard-to-shard, but will have to take a different approach. By adding an additional field to your temporary shard key instead of changing the partitioning strategy you unlock the ability to reshard into your desired shard key. For example if  your desired shard key is:

      { launch_vehicle: 1, payload: 1}
      

    • Your temporary shard key would be:

      { launch_vehicle: 1, payload: 1, launch_pad: 1}
      

    • Please be aware of the limitations for queries (ex: updateOne(), updateMany(), deleteOne()) that require the query to include the full shard key. Your application must include the temporary shard key in all scenarios where a query requires the full shard key to execute successfully until the resharding operation is complete.

  4. How do I monitor an ongoing resharding operation?

    • Run the following command:

      db.getSiblingDB("admin").aggregate([
        { $currentOp: { allUsers: true, localOps: false } },
        {
          $match: {
            type: "op",
            "originatingCommand.reshardCollection": "<database>.<collection>"
          }
        }
      ])
      

  5. How do I stop an ongoing resharding operation?

    • Run the following command, which instantly aborts the resharding operation:

      sh.abortReshardCollection("<database>.<collection>")
      

  6. I am concerned about resharding affecting my cluster performance.

    • If you meet the previously outlined resharding requirements, the operation should not impact your cluster performance. However, if your cluster is deployed on Atlas, you can scale your cluster up temporarily while executing a reshard-to-shard operation and scale the cluster back down after the operation is complete.

  7. What metrics of my cluster should I be monitoring while the resharding operation is executing?

    • Storage space available - if any of your shards have less than 100GB of storage available you should abort resharding

    • CPU utilization - if your cluster is consuming all of the available compute resources you may cause resource contention and should abort resharding

    • I/O consumption - if your cluster is consuming all of the available IOPS you may cause resource contention and should abort resharding.

  8. The temporary collection is seemingly evenly distributed across all of my shards, why is resharding not complete?

    • Before resharding can cut over to the collection with your desired shard key, it must catch-up with all of the writes that occurred since the resharding was initiated. If your write workload is heavy it can take an extended amount of time for the catch-up phase to conclude.

Headline photo by panumas nikhomkhai found on Pexels.



Written by deadsix | PM for sharding, avid runner, unabashed avgeek, and a fan of all things that go fast.
Published by HackerNoon on 2023/06/07