Sometimes an Apache Cassandra cluster can end up in an unbalanced state. An unbalanced state is where data is unevenly distributed across a cluster or locally configured data directories. There are a number of reasons this can happen. In this blog post, I will cover two basics reasons this might happen. A cluster can end up in an unbalanced state due to two basic reasons:
- Configuration
- Data Model and data distribution
Configuration
The main configuration option that affects data distribution is the num_token value set in the cassandra.yaml. By default, this is set to 256 but can be configured prior to bootstrapping a node. The num_token configuration is useful when configuring a non-uniform Cassandra cluster. This enables users to distribute data according to the capacity of each node.
Data Model and Data Distribution
Data can also end up unevenly distributed if your data model is not designed for your data profile. You must have a good understanding of your domain and your data model must be fit for your domain. In Cassandra, your partition key is responsible for your distributing data across a cluster and must be carefully chosen.
Example
The best way to understand the above two points is via an example. In this example, we are going to start off with a single node cluster. This node is configured with a num_token value of 256. It has two data directories (/var/lib/cassandra/data, /var/lib/cassandra/data1) specified in the data_file_directories property.
In this example, we will
- Bootstrap a single node
- Add data to this node.
- Add a second node to the cluster and examine how this influences data distribution.
To carry out the test I have created the following schema.
1 2 3 4 5 6 7 8 |
CREATE KEYSPACE distribution_test WITH REPLICATION = { 'class' : 'SimpleStrategy' , 'replication_factor' : 1 }; CREATE TABLE dd_test ( partition_key text, cluster_key text, test_data text, PRIMARY KEY (partition_key, cluster_key) ); |
The distribution_test keyspace has a replication factor 1 and uses the simple replication strategy. The distribution_test keyspace has a table called dd_test. This table has a composite primary key. The primary key is composed of a partition key and cluster key aptly named partition_key and cluster_key respectively.
I have added data to the dd_test table using the following Python script.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
from cassandra.cluster import Cluster from concurrent.futures import ThreadPoolExecutor from logging import thread import random import string cluster = Cluster() session = cluster.connect('distribution_test') def insert(from_num, to_num): for num in range(from_num,to_num): long_string = ''.join(random.SystemRandom().choice(string.ascii_uppercase + string.digits) for _ in range(50)) * 1000 session.execute("INSERT INTO dd_test (partition_key, cluster_key, test_data) VALUES ('partition_key_{}', 'cluster_key_example_{}' , '{}')".format(num, long_string, long_string)) print "Executed {}".format(num) futures = [] executor = ThreadPoolExecutor(max_workers=20) for num in xrange(0,100000,10000): futures.append(executor.submit(insert, num, num+4999)) for future in futures: future.result() |
Notice line 13. The insert statement ensures that the partition key is changed on every insert. As we will see later in the post that this is key to ensuring even data distribution.
After inserting 100000 records I ran nodetool status and got the following values:
As expected all the data is owned by a single node. I also proceeded to check data distribution across the two data directories. Data is evenly spread out. Check out the screen shot below.
Bootstrapping A New Cassandra Node
Let's add a new node to this cluster. When adding a new node ensure that the auto_bootstrap property is set to true. This ensures that new node automatically joins the new cluster and there is no need for manual configuration. On bootstrapping node2 I see the following log output:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
INFO [main] 2017-06-01 07:19:45,199 StorageService.java:1435 - JOINING: waiting for schema information to complete INFO [main] 2017-06-01 07:19:45,250 StorageService.java:1435 - JOINING: schema complete, ready to bootstrap INFO [main] 2017-06-01 07:19:45,251 StorageService.java:1435 - JOINING: waiting for pending range calculation INFO [main] 2017-06-01 07:19:45,251 StorageService.java:1435 - JOINING: calculation complete, ready to bootstrap INFO [main] 2017-06-01 07:19:45,251 StorageService.java:1435 - JOINING: getting bootstrap token INFO [main] 2017-06-01 07:19:45,341 StorageService.java:1435 - JOINING: sleeping 30000 ms for pending range setup INFO [main] 2017-06-01 07:20:15,342 StorageService.java:1435 - JOINING: Starting to bootstrap... INFO [main] 2017-06-01 07:20:15,562 StreamResultFuture.java:90 - [Stream #c219d430-469a-11e7-8af3-81773e2a69ae] Executing streaming plan for Bootstrap INFO [StreamConnectionEstablisher:1] 2017-06-01 07:20:15,568 StreamSession.java:266 - [Stream #c219d430-469a-11e7-8af3-81773e2a69ae] Starting streaming to /172.20.0.3 INFO [StreamConnectionEstablisher:1] 2017-06-01 07:20:15,591 StreamCoordinator.java:264 - [Stream #c219d430-469a-11e7-8af3-81773e2a69ae, ID#0] Beginning stream session with /172.20.0.3 INFO [STREAM-IN-/172.20.0.3:7000] 2017-06-01 07:20:16,369 StreamResultFuture.java:173 - [Stream #c219d430-469a-11e7-8af3-81773e2a69ae ID#0] Prepare completed. Receiving 4 files(4.046MiB), sending 0 files(0.000KiB) INFO [StreamReceiveTask:1] 2017-06-01 07:20:32,489 StreamResultFuture.java:187 - [Stream #c219d430-469a-11e7-8af3-81773e2a69ae] Session with /172.20.0.3 is complete INFO [StreamReceiveTask:1] 2017-06-01 07:20:32,529 StreamResultFuture.java:219 - [Stream #c219d430-469a-11e7-8af3-81773e2a69ae] All sessions completed INFO [StreamReceiveTask:1] 2017-06-01 07:20:32,535 StorageService.java:1491 - Bootstrap completed! for the tokens [2170857698622202367, 8517072504425343717, -3254771037524187900, -1597835042001935502, 6878847904605480741, 6816916215341820068, 6291189887494617640, -6855333019196580358, -6353317035112065873, 8838974905234547016, 8539810544447438397, -2357950949959511387, 1242077960532340887, 2914039668080386735, 3548015300105653368, 8973388453035242795, -2325235809399362967, -7078812010537656277, 768585495224455336, 7153512700965912517, 8625819392009074153, -6138302849441936958, -2594051958993427953, -735827743339795655, -8202727571538912843, 2180751358288507888, -7872842094207074012, -2926504780761300623, -3197260822146229664, 3411052656191450941, -9049284186987733291, -157882351668930258, 454637839762305232, -2305675997627138050, 5785282040753174988, 8604531769609599767, 4363117061247143957, -7255854383313210529, -3497611663121502480, -6788457421774336480, -7809767930173770420, 6591540654522244365, 1773283733607350132, 1347769111173669066, -7242556233623424655, -1552552727731631642, -1226243976028310059, -8221762326275074149, -7963893314043006091, -850542197910474448, 4219437099703910566, -8039365343972054221, 7756456412568178996, 4057327843751741693, 7155628666873897485, -483058846775660782, 6968839681845709305, 6396337738827005745, -5285173481531605912, 7254663657455123842, 871654822989271789, -604574593420741277, -2446461444470484127, -3707613591745746278, -26727542030118959, -7190990795521107837, 5388348291571480415, 4249499356533972018, 82469082189512791, -6389351372873749061, 5138413916027470955, 2542233707258091740, -4057927973990056143, 552933169018893618, -8237860380097407047, 6917383508758068288, 543382311932406672, -5671560690999322491, -1240369858424929757, 7394536427227616773, 4716882285905136652, 8260705434779371419, 3259812719139852593, -73864539388331289, -3573980475038135246, -1047139059901238511, -1734886021153324482, 8674873751672827600, 3564384074427511950, 2754071903665103098, -1230493021099846761, -2731315467436512731, -7845984767828231726, -8082165594257396645, -2298177264815779081, -3645421111048544165, 9142633389925493379, 7206663288804675578, 2305939212045070856, -5101738026249032246, 6268847697773786891, 5903922100677671597, -2001787466557152206, 1318502870562311928, 5784020265166141829, 5385229217299505171, 6010414616247875068, -8080602674779008196, -9189764569651551963, -8969124116887255329, -9040482343274988119, -8575947267671214955, -1786409930636352174, -757203989676123224, -6640569567328853730, 8431839804447545665, 6781635966829972979, -8328382509754233304, -3181089993114819214, 3243262023331941781, 4213737472390389773, -4046361821170607634, 8877904009116429296, -6931048276693039052, 4838006612846181604, -5561480934050473057, -470112649587309682, 3175935810873308999, -1693695808908080717, -3753035103371291265, -2607412695999984337, -8454963020263227780, 2037428931895594762, 1158209127301347406, -8092787384269386871, -7741092217712244823, 3213269181965324853, 3972662756438857798, -1808499161350392500, -5552429155141285488, 2768019514490102470, -2381885168558935166, 7598271141891576988, -5968675860637356104, -2178161882622813874, -2782395662355757709, 4662660828871465894, 6726990970215445064, 691223925843765893, -3732536320705038428, 2169053732177722520, -3467691490997179851, -3201672755574011994, -706634586120752453, -7297234535099792750, 4195063085031070570, 1024797669903232596, -5102042883065498245, -4295412307398568491, -686079656172478689, -7652228004418103329, -5922734755429174917, 6562442130946482224, 3419893918407781185, -7840156446781283061, -3209525913297552052, -8254134338430272746, -7543559272928655856, -6413145215334169356, 8189387753488304279, 133576451402117013, 6859840908124654784, -480832477584575919, -4466949465409090307, 2224334850433074431, -1077941859365184025, 7694746877711316656, -1541425238506019058, -2694798376156556512, -752352477219592169, 1911593773128947549, 1053380063512932771, 2369074212175473237, 1511764544820953277, -916955813829019462, -3389255868702958135, -4853221732440365560, 436528405098383237, 1482375829041075317, -5200032867873743382, -2359116065936986284, 2323558012249686328, -8714456372962073495, 8264655970455461946, -6377550880012897671, 1904431293553435268, 4281876827769946039, 129146370702108712, -7734952979230886745, -8242033642929677698, 2814257954427384288, -3054904268940900861, -450367636631970004, 8109127225549270088, 8020800599127010921, -2679593348517582224, -1752859515012359396, 4479567094782500113, 6631436750259638425, -5598101192868431054, 2794504876633839794, -1756288804973602742, -534202438069841833, 7027493063976551471, 3982737257386418900, -930887821667260064, 273533908252754639, -1440739443874625754, -2507143240715618713, 2102474087594804091, -7697404848019572605, -8658346826860002424, -4596011304911479578, 6459167476271897881, 6805481881299404326, -7080476038981754572, 5767237378480651194, -978605465701070447, -2322705636227730109, -681083377956364436, -5892502600903861513, -3301298109594166057, 8752787692307586763, -7189731757057511702, 8976826014517372173, -6975924228178934185, 4829963829825594588, -2886015764818641366, -7753659165428942667, -5923615337534732994, 7275900434506691605, 4164818915231369384, 5154263935677458810, 301044058442370317, -7764756071469383634, 7327453618420416121, -2412303784295409238, 781599758351322350, 6625371277047208299, -7611076121968222555, 6127681154442724891, 7313542022544470677, 7222174446262831041, -9138442343184845349, -4110447459792797189, 2534623752979121051] INFO [main] 2017-06-01 07:20:32,544 StorageService.java:1435 - JOINING: Finish joining ring |
The auto bootstrap process:
- Added the node to the configured cluster.
- Calculated its token range and sets up the new token ranges across the two nodes.
- Streamed data from node1 to node 2. This is data that use to belong to node one but now belongs to node two.
After bootstrapping the node I ran nodetool status and got the following output:
Note that node1(172.20.0.3) still has 64.18Mb of data while node two (172.20.0.4) has 31MB of data. This is a bit odd as the new node should have half of the cluster's data. The reason for the imbalance is because node1 not only contains data the is responsible for but also contains data that is now stored in node2. Running a nodetool cleanup fixes this issue as shown in the screenshot be below:
How has the second node affected the spreading of the data across the two data directories in node1. As expected the data in the two data directories on node1 are evenly spread but half of what they use to be.
All in all, this is a happy situation as our Apache Cassandra cluster is in a balanced state.
Let's bootstrap node two again but this time configured with a num_token of 10:
Notice that node1 holds over 95% of the data. Note this is expected due to our num_token configuration.
Data Modelling Errors
This is the most frequent reason for an unbalanced cluster. We are going to insert data into the dd_test table but this time we are only going to insert data into a single partition. The code used to insert this data can be found below. The only difference from the code above is line 13.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
from cassandra.cluster import Cluster from concurrent.futures import ThreadPoolExecutor from logging import thread import random import string cluster = Cluster() session = cluster.connect('distribution_test') def insert(from_num, to_num): for num in range(from_num,to_num): long_string = ''.join(random.SystemRandom().choice(string.ascii_uppercase + string.digits) for _ in range(50)) * 1000 session.execute("INSERT INTO dd_test (partition_key, cluster_key, test_data) VALUES ('partition_key', 'cluster_key_example_{}' , '{}')".format(long_string, long_string)) print "Executed {}".format(num) futures = [] executor = ThreadPoolExecutor(max_workers=20) for num in xrange(0,100000,10000): futures.append(executor.submit(insert, num, num+4999)) for future in futures: future.result() |
The above program ends up inserting data into a single partition. One running node tool status we see the following:
We have inserted 485.63 MB of data into node one. Let's examine how this data is distributed around our two disks.
Note that data1 directory has a majority of data. This is because data is distributed across disks by evenly splitting the nodes token ranges across disks. Since all our data has been inserted into a single partition we have ended up with all our data in a data disk.
Let's add a new node to the cluster and see how this influences data distribution.
Note adding a new node has resulted in highly imbalanced data distributions. This is again because we have inserted all data into a single partition and thus resulted in a highly unbalanced node.
I hope this has helped you understand why we might end up with unbalanced Cassandra nodes.
Moral of the story. Make sure you put in effort into designing your schema correctly.
Hi there, nice post!
But there is an error in the inserts.
You duplicated the partition_key column name.
You misplaced the second one instead of using the cluster_key column.
Thanks for sharing the knowledge.
Regards
Fixed. Thanks for pointing out the error.