Showing posts with label hbase. Show all posts
Showing posts with label hbase. Show all posts

Wednesday, March 14, 2012

Hadoop, HBase, and Xceivers

Some of the configuration properties found in Hadoop have a direct effect on clients, such as HBase. One of those properties is called "dfs.datanode.max.xcievers", and belongs to the HDFS subproject. It defines the number of server side threads and - to some extent - sockets used for data connections. Setting this number too low can cause problems as you grow or increase utilization of your cluster. This post will help you to understand what happens between the client and server, and how to determine a reasonable number for this property.

The Problem

Since HBase is storing everything it needs inside HDFS, the hard upper boundary imposed by the "dfs.datanode.max.xcievers" configuration property can result in too few resources being available to HBase, manifesting itself as IOExceptions on either side of the connection. Here is an example from the HBase mailing list [1], where the following messages were initially logged on the RegionServer side: 

2008-11-11 19:55:52,451 INFO org.apache.hadoop.dfs.DFSClient: Exception in createBlockOutputStream java.io.IOException: Could not read from stream
2008-11-11 19:55:52,451 INFO org.apache.hadoop.dfs.DFSClient: Abandoning block blk_-5467014108758633036_595771
2008-11-11 19:55:58,455 WARN org.apache.hadoop.dfs.DFSClient: DataStreamer Exception: java.io.IOException: Unable to create new block.
2008-11-11 19:55:58,455 WARN org.apache.hadoop.dfs.DFSClient: Error Recovery for block blk_-5467014108758633036_595771 bad datanode[0]
2008-11-11 19:55:58,482 FATAL org.apache.hadoop.hbase.regionserver.Flusher: Replay of hlog required. Forcing server shutdown

Correlating this with the Hadoop DataNode logs revealed the following entry:

ERROR org.apache.hadoop.dfs.DataNode: DatanodeRegistration(10.10.10.53:50010,storageID=DS-1570581820-10.10.10.53-50010-1224117842339,infoPort=50075, ipcPort=50020):DataXceiver: java.io.IOException: xceiverCount 258 exceeds the limit of concurrent xcievers 256 

In this example, the low value of "dfs.datanode.max.xcievers" for the DataNodes caused the entire RegionServer to shut down. This is a really bad situation. Unfortunately, there is no hard-and-fast rule that explains how to compute the required limit. It is commonly advised to raise the number from the default of 256 to something like 4096 (see [1], [2], [3], [4], and [5] for reference). This is done by adding this property to the hdfs-site.xml file of all DataNodes (note that it is misspelled):

  <property>
    <name>dfs.datanode.max.xcievers</name>
    <value>4096</value>
  </property>

Note: You will need to restart your DataNodes after making this change to the configuration file.

This should help with the above problem, but you still might want to know more about how this all plays together, and what HBase is doing with these resources. We will discuss this in the remainder of this post. But before we do, we need to be clear about why you cannot simply set this number very high, say 64K and be done with it. There is a reason for an upper boundary, and it is twofold: first, threads need their own stack, which means they occupy memory. For current servers this means 1MB per thread[6] by default. In other words, if you use up all the 4096 DataXceiver threads, you need around 4GB of heap to accommodate them. This cuts into the space you have assigned for memstores and block caches, as well as all the other moving parts of the JVM. In a worst case scenario, you might run into an OutOfMemoryException, and the RegionServer process is toast. You want to set this property to a reasonably high number, but not too high either.

Second, having these many threads active you will also see your CPU becoming increasingly loaded. There will be many context switches happening to handle all the concurrent work, which takes away resources for the real work. As with the concerns about memory, you want the number of threads not grow boundlessly, but provide a reasonable upper boundary - and that is what "dfs.datanode.max.xcievers" is for.

Hadoop File System Details

From the client side, the HDFS library is providing the abstraction called Path. This class represents a file in a file system supported by Hadoop, represented by the FileSystem class. There are a few concrete implementations of the abstract FileSystem class, one of which is the DistributedFileSytem, representing HDFS. This class in turn wraps the actual DFSClient class that handles all interactions with the remote servers, i.e. the NameNode and the many DataNodes.

When a client, such as HBase, opens a file, it does so by, for example, calling the open() or create() methods of the FileSystem class, here the most simplistic incarnations

  public DFSInputStream open(String src) throws IOException
  public FSDataOutputStream create(Path f) throws IOException

The returned stream instance is what needs a server-side socket and thread, which are used to read and write blocks of data. They form part of the contract to exchange data between the client and server. Note that there are other, RPC-based protocols in use between the various machines, but for the purpose of this discussion they can be ignored.

The stream instance returned is a specialized DFSOutputStream or DFSInputStream class, which handle all of the interaction with the NameNode to figure out where the copies of the blocks reside, and the data communication per block per DataNode.

On the server side, the DataNode wraps an instance of DataXceiverServer, which is the actual class that reads the above configuration key and also throws the above exception when the limit is exceeded.

When the DataNode starts, it creates a thread group and starts the mentioned DataXceiverServer instance like so:

  this.threadGroup = new ThreadGroup("dataXceiverServer");
  this.dataXceiverServer = new Daemon(threadGroup,
      new DataXceiverServer(ss, conf, this));
  this.threadGroup.setDaemon(true); // auto destroy when empty 

Note that the DataXceiverServer thread is already taking up one spot of the thread group. The DataNode also has this internal class to retrieve the number of currently active threads in this group:

  /** Number of concurrent xceivers per node. */
  int getXceiverCount() {
    return threadGroup == null ? 0 : threadGroup.activeCount();
  }

Reading and writing blocks, as initiated by the client, causes for a connection to be made, which is wrapped by the DataXceiverServer thread into a DataXceiver instance. During this hand off, a thread is created and registered in the above thread group. So for every active read and write operation a new thread is tracked on the server side. If the count of threads in the group exceeds the configured maximum then the said exception is thrown and recorded in the DataNode's logs:

  if (curXceiverCount > dataXceiverServer.maxXceiverCount) {
    throw new IOException("xceiverCount " + curXceiverCount
                          + " exceeds the limit of concurrent xcievers "
                          + dataXceiverServer.maxXceiverCount);
  }

Implications for Clients

Now, the question is, how does the client reading and writing relate to the server side threads. Before we go into the details though, let's use the debug information that the DataXceiver class logs when it is created and closed

  LOG.debug("Number of active connections is: " + datanode.getXceiverCount());
  ...
  LOG.debug(datanode.dnRegistration + ":Number of active connections is: " 
    + datanode.getXceiverCount());

and monitor during a start of HBase what is logged on the DataNode. For simplicity's sake this is done on a pseudo distributed setup with a single DataNode and RegionServer instance. The following shows the top of the RegionServer's status page.

The important part is in the "Metrics" section, where it says "storefiles=22". So, assuming that HBase has at least that many files to handle, plus some extra files for the write-ahead log, we should see the above logs message state that we have at least 22 "active connections". Let's start HBase and check the DataNode and RegionServer log files:

Command Line:

$ bin/start-hbase.sh
...

DataNode Log:

2012-03-05 13:01:35,309 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 1
2012-03-05 13:01:35,315 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 2
12/03/05 13:01:35 INFO regionserver.MemStoreFlusher: globalMemStoreLimit=396.7m, globalMemStoreLimitLowMark=347.1m, maxHeap=991.7m
12/03/05 13:01:39 INFO http.HttpServer: Port returned by webServer.getConnectors()[0].getLocalPort() before open() is -1. Opening the listener on 60030
2012-03-05 13:01:40,003 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 1
12/03/05 13:01:40 INFO regionserver.HRegionServer: Received request to open region: -ROOT-,,0.70236052
2012-03-05 13:01:40,882 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 3
2012-03-05 13:01:40,884 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 4
2012-03-05 13:01:40,888 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 3
2012-03-05 13:01:40,902 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 4
2012-03-05 13:01:40,907 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 3
2012-03-05 13:01:40,909 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 4
2012-03-05 13:01:40,910 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 3
2012-03-05 13:01:40,911 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 4
2012-03-05 13:01:40,915 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 3
2012-03-05 13:01:40,917 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 4
2012-03-05 13:01:40,917 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 4
2012-03-05 13:01:40,919 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 4
12/03/05 13:01:40 INFO regionserver.HRegion: Onlined -ROOT-,,0.70236052; next sequenceid=63083
2012-03-05 13:01:40,982 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 3
2012-03-05 13:01:40,983 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 4
2012-03-05 13:01:40,985 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 3
2012-03-05 13:01:40,987 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 4
2012-03-05 13:01:40,987 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 4
2012-03-05 13:01:40,989 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 4
12/03/05 13:01:41 INFO regionserver.HRegionServer: Received request to open region: .META.,,1.1028785192
2012-03-05 13:01:41,026 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 3
2012-03-05 13:01:41,027 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 4
2012-03-05 13:01:41,028 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 3
2012-03-05 13:01:41,029 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 4
2012-03-05 13:01:41,035 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 3
2012-03-05 13:01:41,037 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 4
2012-03-05 13:01:41,038 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 3
2012-03-05 13:01:41,040 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 4
2012-03-05 13:01:41,044 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 3
2012-03-05 13:01:41,047 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 4
2012-03-05 13:01:41,048 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 3
2012-03-05 13:01:41,051 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 4
12/03/05 13:01:41 INFO regionserver.HRegion: Onlined .META.,,1.1028785192; next sequenceid=63082
2012-03-05 13:01:41,109 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 3
2012-03-05 13:01:41,114 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 4
2012-03-05 13:01:41,117 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 5
12/03/05 13:01:41 INFO regionserver.HRegionServer: Received request to open 16 region(s)
12/03/05 13:01:41 INFO regionserver.HRegionServer: Received request to open region: usertable,,1330944810191.62a312d67981c86c42b6bc02e6ec7e3f.
12/03/05 13:01:41 INFO regionserver.HRegionServer: Received request to open region: usertable,user1120311784,1330944810191.90d287473fe223f0ddc137020efda25d.
12/03/05 13:01:41 INFO regionserver.HRegionServer: Received request to open region: usertable,user1240813553,1330944811370.12c95922805e2cb5274396a723a94fa8.
12/03/05 13:01:41 INFO regionserver.HRegionServer: Received request to open region: usertable,user1361265841,1330944811370.80663fcf291e3ce00080599964f406ba.
12/03/05 13:01:41 INFO regionserver.HRegionServer: Received request to open region: usertable,user1481880893,1330944827566.fb3cc692757825e24295042fd42ff07c.
12/03/05 13:01:41 INFO regionserver.HRegionServer: Received request to open region: usertable,user1602709751,1330944827566.dbd84a9c2a2e450799b10e7408e3e12e.
12/03/05 13:01:41 INFO regionserver.HRegionServer: Received request to open region: usertable,user1723694337,1330944838296.cb9e191f0d8c0d8b64c192a52e35a6f0.
12/03/05 13:01:41 INFO regionserver.HRegionServer: Received request to open region: usertable,user1844378668,1330944838296.577cc1efe165859be1341a9e1c566b12.
12/03/05 13:01:41 INFO regionserver.HRegionServer: Received request to open region: usertable,user1964968041,1330944848231.dd89596e9129e1caa7e07f8a491c9734.
12/03/05 13:01:41 INFO regionserver.HRegionServer: Received request to open region: usertable,user2084845901,1330944848231.24413155fef16ebb00213b8072bc266b.
12/03/05 13:01:41 INFO regionserver.HRegionServer: Received request to open region: usertable,user273134820,1330944848768.86d2a0254822edc967c0f27fe53e7734.
12/03/05 13:01:41 INFO regionserver.HRegionServer: Received request to open region: usertable,user394249140,1330944848768.df37ded27f9ba0f9ed76b2eb05db6c4e.
12/03/05 13:01:41 INFO regionserver.HRegionServer: Received request to open region: usertable,user515290649,1330944849739.d23924dc9e9d5891f332c337977af83d.
12/03/05 13:01:41 INFO regionserver.HRegionServer: Received request to open region: usertable,user63650623,1330944849739.a3b9f64e6abee00a2f39c33efa7ae8a2.
12/03/05 13:01:41 INFO regionserver.HRegionServer: Received request to open region: usertable,user757669512,1330944850808.cd0d6f16d8ae9cf0c9277f5d6c6c6b9f.
12/03/05 13:01:41 INFO regionserver.HRegionServer: Received request to open region: usertable,user878854551,1330944850808.9b8667b6d3c0baaaea491527cd781357.
2012-03-05 13:01:41,246 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 6
2012-03-05 13:01:41,248 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 7
2012-03-05 13:01:41,249 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 8
2012-03-05 13:01:41,256 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 9
2012-03-05 13:01:41,257 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 10
2012-03-05 13:01:41,257 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 9
2012-03-05 13:01:41,257 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 8
2012-03-05 13:01:41,257 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 7
2012-03-05 13:01:41,259 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 8
2012-03-05 13:01:41,260 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 9
2012-03-05 13:01:41,264 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 8
2012-03-05 13:01:41,283 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 7
12/03/05 13:01:41 INFO regionserver.HRegion: Onlined usertable,user1120311784,1330944810191.90d287473fe223f0ddc137020efda25d.; next sequenceid=62917
12/03/05 13:01:41 INFO regionserver.HRegion: Onlined usertable,,1330944810191.62a312d67981c86c42b6bc02e6ec7e3f.; next sequenceid=62916
12/03/05 13:01:41 INFO regionserver.HRegion: Onlined usertable,user1240813553,1330944811370.12c95922805e2cb5274396a723a94fa8.; next sequenceid=62918
2012-03-05 13:01:41,360 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 6
2012-03-05 13:01:41,361 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 7
2012-03-05 13:01:41,366 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 8
2012-03-05 13:01:41,366 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 7
2012-03-05 13:01:41,366 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 8
2012-03-05 13:01:41,368 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 9
2012-03-05 13:01:41,369 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 8
2012-03-05 13:01:41,370 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 9
2012-03-05 13:01:41,370 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 8
2012-03-05 13:01:41,372 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 9
2012-03-05 13:01:41,375 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 8
2012-03-05 13:01:41,377 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 7
12/03/05 13:01:41 INFO regionserver.HRegion: Onlined usertable,user1481880893,1330944827566.fb3cc692757825e24295042fd42ff07c.; next sequenceid=62919
12/03/05 13:01:41 INFO regionserver.HRegion: Onlined usertable,user1602709751,1330944827566.dbd84a9c2a2e450799b10e7408e3e12e.; next sequenceid=62920
12/03/05 13:01:41 INFO regionserver.HRegion: Onlined usertable,user1361265841,1330944811370.80663fcf291e3ce00080599964f406ba.; next sequenceid=62919
2012-03-05 13:01:41,474 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 6
2012-03-05 13:01:41,491 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 7
2012-03-05 13:01:41,495 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 8
2012-03-05 13:01:41,508 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 7
2012-03-05 13:01:41,512 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 8
2012-03-05 13:01:41,513 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 7
2012-03-05 13:01:41,514 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 6
2012-03-05 13:01:41,521 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 7
2012-03-05 13:01:41,533 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 6
12/03/05 13:01:41 INFO regionserver.HRegion: Onlined usertable,user1844378668,1330944838296.577cc1efe165859be1341a9e1c566b12.; next sequenceid=62918
12/03/05 13:01:41 INFO regionserver.HRegion: Onlined usertable,user1723694337,1330944838296.cb9e191f0d8c0d8b64c192a52e35a6f0.; next sequenceid=62917
2012-03-05 13:01:41,540 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 7
2012-03-05 13:01:41,542 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 6
2012-03-05 13:01:41,548 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 7
12/03/05 13:01:41 INFO regionserver.HRegion: Onlined usertable,user1964968041,1330944848231.dd89596e9129e1caa7e07f8a491c9734.; next sequenceid=62920
2012-03-05 13:01:41,618 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 6
2012-03-05 13:01:41,621 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 7
2012-03-05 13:01:41,636 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 6
2012-03-05 13:01:41,649 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 7
2012-03-05 13:01:41,650 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 8
2012-03-05 13:01:41,654 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 7
2012-03-05 13:01:41,662 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 6
2012-03-05 13:01:41,665 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 7
12/03/05 13:01:41 INFO regionserver.HRegion: Onlined usertable,user2084845901,1330944848231.24413155fef16ebb00213b8072bc266b.; next sequenceid=62921
12/03/05 13:01:41 INFO regionserver.HRegion: Onlined usertable,user273134820,1330944848768.86d2a0254822edc967c0f27fe53e7734.; next sequenceid=62924
2012-03-05 13:01:41,673 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 6
2012-03-05 13:01:41,677 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 7
2012-03-05 13:01:41,679 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 6
2012-03-05 13:01:41,687 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 7
12/03/05 13:01:41 INFO regionserver.HRegion: Onlined usertable,user394249140,1330944848768.df37ded27f9ba0f9ed76b2eb05db6c4e.; next sequenceid=62925
2012-03-05 13:01:41,790 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 6
2012-03-05 13:01:41,800 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 7
2012-03-05 13:01:41,806 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 6
2012-03-05 13:01:41,815 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 7
2012-03-05 13:01:41,819 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 8
2012-03-05 13:01:41,821 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 7
2012-03-05 13:01:41,824 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 6
2012-03-05 13:01:41,825 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 7
2012-03-05 13:01:41,827 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 8
2012-03-05 13:01:41,829 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 7
12/03/05 13:01:41 INFO regionserver.HRegion: Onlined usertable,user515290649,1330944849739.d23924dc9e9d5891f332c337977af83d.; next sequenceid=62926
2012-03-05 13:01:41,832 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 6
2012-03-05 13:01:41,838 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 7
12/03/05 13:01:41 INFO regionserver.HRegion: Onlined usertable,user757669512,1330944850808.cd0d6f16d8ae9cf0c9277f5d6c6c6b9f.; next sequenceid=62929
12/03/05 13:01:41 INFO regionserver.HRegion: Onlined usertable,user63650623,1330944849739.a3b9f64e6abee00a2f39c33efa7ae8a2.; next sequenceid=62927
2012-03-05 13:01:41,891 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 6
2012-03-05 13:01:41,893 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 7
2012-03-05 13:01:41,894 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 6
2012-03-05 13:01:41,896 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 7
12/03/05 13:01:41 INFO regionserver.HRegion: Onlined usertable,user878854551,1330944850808.9b8667b6d3c0baaaea491527cd781357.; next sequenceid=62930
2012-03-05 14:01:39,514 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 5
2012-03-05 14:01:39,711 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 4
2012-03-05 22:48:41,945 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 4
12/03/05 22:48:41 INFO regionserver.HRegion: Onlined usertable,user757669512,1330944850808.cd0d6f16d8ae9cf0c9277f5d6c6c6b9f.; next sequenceid=62929
2012-03-05 22:48:41,963 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 4

You can see how the regions are opened one after the other, but what you also might notice is that the number of active connections never climbs to 22 - it barely even reaches 10. Why is that? To understand this better, we have to see how files in HDFS map to the server-side DataXceiver's instance - and the actual threads they represent. 

Hadoop Deep Dive

The aforementioned DFSInputStream and DFSOutputStream are really facades around the usual stream concepts. They wrap the client-server communication into these standard Java interfaces, while internally routing the traffic to a selected DataNode - which is the one that holds a copy of the current block. It has the liberty to open and close these connection as needed. As a client reads a file in HDFS, the client library classes switch transparently from block to block, and therefore from DataNode to DataNode, so it has to open and close connections as needed. 

The DFSInputStream has an instance of a DFSClient.BlockReader class, that opens the connection to the DataNode. The stream instance calls blockSeekTo() for every call to read() which takes care of opening the connection, if there is none already. Once a block is completely read the connection is closed. Closing the stream has the same effect of course. 

The DFSOutputStream has a similar helper class, the DataStreamer. It tracks the connection to the server, which is initiated by the nextBlockOutputStream() method. It has further internal classes that help with writing the block data out, which we omit here for the sake of brevity.

Both writing and reading blocks requires a thread to hold the socket and intermediate data on the server-side, wrapped in the DataXceiver instance. Depending what your client is doing, you will see the number of connections fluctuate around the number of currently accessed files in HDFS.

Back to the HBase riddle above: the reason you do not see up to 22 (and more) connections during the start is that while the regions open, the only required data is the HFile's info block. This block is read to gain vital details about each file, but then closed again. This means that the server-side resource is released in quick succession. The remaining four connections are harder to determine. You can use JStack to dump all threads on the DataNode, which in this example shows this entry:

"DataXceiver for client /127.0.0.1:64281 [sending block blk_5532741233443227208_4201]" daemon prio=5 tid=7fb96481d000 nid=0x1178b4000 runnable [1178b3000]
   java.lang.Thread.State: RUNNABLE
   ...

"DataXceiver for client /127.0.0.1:64172 [receiving block blk_-2005512129579433420_4199 client=DFSClient_hb_rs_10.0.0.29,60020,1330984111693_1330984118810]" daemon prio=5 tid=7fb966109000 nid=0x1169cb000 runnable [1169ca000]
   java.lang.Thread.State: RUNNABLE
   ...

These are the only DataXceiver entries (in this example), so the count in the thread group is a bit misleading. Recall that the DataXceiverServer daemon thread already accounts for one extra entry, which combined with the two above accounts for the three active connections - which in fact means three active threads. The reason the log states four instead, is that it logs the count from an active thread that is about to finish. So, shortly after the count of four is logged, it is actually one less, i.e. three and hence matching our head count of active threads.

Also note that the internal helper classes, such as the PacketResponder occupy another thread in the group while being active. The JStack output does indicate that fact, listing the thread as such:

"PacketResponder 0 for Block blk_-2005512129579433420_4199" daemon prio=5 tid=7fb96384d000 nid=0x116ace000 in Object.wait() [116acd000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
     at java.lang.Object.wait(Native Method)
     at org.apache.hadoop.hdfs.server.datanode.BlockReceiver$PacketResponder.lastDataNodeRun(BlockReceiver.java:779)
     - locked <7bc79c030> (a org.apache.hadoop.hdfs.server.datanode.BlockReceiver$PacketResponder)
     at org.apache.hadoop.hdfs.server.datanode.BlockReceiver$PacketResponder.run(BlockReceiver.java:870)
     at java.lang.Thread.run(Thread.java:680)

This thread is currently in TIMED_WAITING state and is not considered active. That is why the count emitted by the DataXceiver log statements is not including these kind of threads. If they become active due to the client sending sending data, the active thread count will go up again. Another thing to note its that this thread does not need a separate connection, or socket, between the client and the server. The PacketResponder is just a thread on the server side to receive block data and stream it to the next DataNode in the write pipeline.

The Hadoop fsck command also has an option to report what files are currently open for writing:

$ hadoop fsck /hbase -openforwrite
FSCK started by larsgeorge from /10.0.0.29 for path /hbase at Mon Mar 05 22:59:47 CET 2012
....../hbase/.logs/10.0.0.29,60020,1330984111693/10.0.0.29%3A60020.1330984118842 0 bytes, 1 block(s), OPENFORWRITE: ......................................Status: HEALTHY
 Total size:     2088783626 B
 Total dirs:     54
 Total files:    45
 ...

This does not immediately relate to an occupied server-side thread, as these are allocated by block ID. But you can glean from it, that there is one open block for writing. The Hadoop command has additional options to print out the actual files and block ID they are comprised of:

$ hadoop fsck /hbase -files -blocks
FSCK started by larsgeorge from /10.0.0.29 for path /hbase at Tue Mar 06 10:39:50 CET 2012
...
/hbase/.META./1028785192/.tmp <dir>
/hbase/.META./1028785192/info <dir>
/hbase/.META./1028785192/info/4027596949915293355 36517 bytes, 1 block(s):  OK
0. blk_5532741233443227208_4201 len=36517 repl=1
...
Status: HEALTHY
 Total size:     2088788703 B
 Total dirs:     54
 Total files:     45 (Files currently being written: 1)
 Total blocks (validated):     64 (avg. block size 32637323 B) (Total open file blocks (not validated): 1)
 Minimally replicated blocks:     64 (100.0 %)
 ...

This gives you two things. First, the summary states that there is one open file block at the time the command ran - matching the count reported by the "-openforwrite" option above. Secondly, the list of blocks next to each file lets you match the thread name to the file that contains the block being accessed. In this example the block with the ID "blk_5532741233443227208_4201" is sent from the server to the client, here a RegionServer. This block belongs to the HBase .META. table, as shown by the output of the Hadoop fsck command. The combination of JStack and fsck can serve as a poor mans replacement for lsof (a tool on the Linux command line to "list open files").

The JStack also reports that there is a DataXceiver thread, with an accompanying PacketResponder, for block ID "blk_-2005512129579433420_4199", but this ID is missing from the list of blocks reported by fsck. This is because the block is not yet finished and therefore not available to readers. In other words, Hadoop fsck only reports on complete (or synced[7][8], for Hadoop version that support this feature) blocks. 


Practical Example

We can verify this using the HBase JRuby shell. For this exercise we should stop HBase, which will close out all open files, remove all active DataXceiver threads from the JStack output, and reduce the number of active connections as reported by the DataNode's debug logs to one - the server thread, as you know by now.

Write Data

Let's start with the process of writing data. Open the HBase shell and in another terminal check the content of the file system:

$ hadoop dfs -ls /
Found 3 items
drwxr-xr-x   - larsgeorge supergroup          0 2012-03-04 14:13 /Users
drwxr-xr-x   - larsgeorge supergroup          0 2011-11-15 11:17 /Volumes
drwxr-xr-x   - larsgeorge supergroup          0 2012-03-05 11:41 /hbase

Now in the HBase shell, we enter these commands, while at the same time checking the output of the DataNode logs:

$ hbase shell
HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version 0.90.4-cdh3u2, r, Thu Oct 13 20:32:26 PDT 2011

hbase(main):001:0> import org.apache.hadoop.hdfs.DFSClient
=> Java::OrgApacheHadoopHdfs::DFSClient
hbase(main):002:0> import org.apache.hadoop.conf.Configuration
=> Java::OrgApacheHadoopConf::Configuration
hbase(main):003:0> dfs = DFSClient.new(Configuration.new)
=> #<Java::OrgApacheHadoopHdfs::DFSClient:0x552c937c>
hbase(main):004:0> s = dfs.create('/testfile', false)
12/03/06 11:54:51 DEBUG hdfs.DFSClient: /testfile: masked=rwxr-xr-x
12/03/06 11:54:51 DEBUG hdfs.DFSClient: computePacketChunkSize: src=/testfile, chunkSize=516, chunksPerPacket=127, packetSize=65557
=> #<#<Class:0x1521bfefb>:0x3abb1bc4>

It means we have created an output stream to a file in HDFS. So far this file only appears in the HDFS ls command, but not in the JStack threads, nor in the logs as an increased active connections count.

$ hadoop dfs -ls /
Found 4 items
drwxr-xr-x   - larsgeorge supergroup          0 2012-03-04 14:13 /Users
drwxr-xr-x   - larsgeorge supergroup          0 2011-11-15 11:17 /Volumes
drwxr-xr-x   - larsgeorge supergroup          0 2012-03-05 11:41 /hbase
-rw-r--r--   1 larsgeorge supergroup          0 2012-03-06 11:54 /testfile

The file size is zero bytes as expected. This operation we have just performed, is a pure meta operation, only involving the NameNode. We have not written anything yet, so no DataNode is involved. We start to write into the stream like so:

hbase(main):005:0> s.write(1)
hbase(main):006:0> s.write(1)
hbase(main):007:0> s.write(1)

Again, nothing changes, no block is being generated, because the data is buffered on the client side. We have the choice to close or sync the data to HDFS next to set the wheels in motion. Using close would quickly write the block and then close everything down - too quick for us to observe the thread creation. Only the logs should state the increase and decrease of the thread count. We rather use sync to flush out the few bytes we have written, but keep the block open for writing:

hbase(main):008:0> s.sync   
12/03/06 12:04:04 DEBUG hdfs.DFSClient: DFSClient writeChunk allocating new packet seqno=0, src=/testfile, packetSize=65557, chunksPerPacket=127, bytesCurBlock=0
12/03/06 12:04:04 DEBUG hdfs.DFSClient: DFSClient flush() : saveOffset 0 bytesCurBlock 3 lastFlushOffset 0
12/03/06 12:04:04 DEBUG hdfs.DFSClient: Allocating new block
12/03/06 12:04:04 DEBUG hdfs.DFSClient: pipeline = 127.0.0.1:50010
12/03/06 12:04:04 DEBUG hdfs.DFSClient: Connecting to 127.0.0.1:50010
12/03/06 12:04:04 DEBUG hdfs.DFSClient: Send buf size 131072
12/03/06 12:04:04 DEBUG hdfs.DFSClient: DataStreamer block blk_1029135823215149276_4209 wrote packet seqno:0 size:32 offsetInBlock:0 lastPacketInBlock:false
12/03/06 12:04:04 DEBUG hdfs.DFSClient: DFSClient Replies for seqno 0 are SUCCESS

Note: The logging level of HBase is set to DEBUG, which causes for the above log statements to be printed on the console.

The logs show an increase by one of the number of active connections, and JStack lists the two threads involved in the writing process:

DataNode Log:

2012-03-06 12:04:04,457 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 1

JStack Output:

"PacketResponder 0 for Block blk_1029135823215149276_4209" daemon prio=5 tid=7fb96395e800 nid=0x116ace000 in Object.wait() [116acd000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
     at java.lang.Object.wait(Native Method)
     at org.apache.hadoop.hdfs.server.datanode.BlockReceiver$PacketResponder.lastDataNodeRun(BlockReceiver.java:779)
     - locked <7bcf01050> (a org.apache.hadoop.hdfs.server.datanode.BlockReceiver$PacketResponder)
     at org.apache.hadoop.hdfs.server.datanode.BlockReceiver$PacketResponder.run(BlockReceiver.java:870)
     at java.lang.Thread.run(Thread.java:680)

...

"DataXceiver for client /127.0.0.1:50556 [receiving block blk_1029135823215149276_4209 client=DFSClient_-1401479495]" daemon prio=5 tid=7fb9660c7000 nid=0x1169cb000 runnable [1169ca000]
   java.lang.Thread.State: RUNNABLE
     at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
     at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:136)
     ...

Closing the stream will finalize the block, close all open threads that were needed (two in this example), and log the decreased active connection count: 

HBase Shell:

hbase(main):011:0> s.close
12/03/06 12:08:36 DEBUG hdfs.DFSClient: DFSClient writeChunk allocating new packet seqno=1, src=/testfile, packetSize=65557, chunksPerPacket=127, bytesCurBlock=0
12/03/06 12:08:36 DEBUG hdfs.DFSClient: DataStreamer block blk_1029135823215149276_4209 wrote packet seqno:1 size:32 offsetInBlock:0 lastPacketInBlock:true
12/03/06 12:08:36 DEBUG hdfs.DFSClient: DFSClient Replies for seqno 1 are SUCCESS
12/03/06 12:08:36 DEBUG hdfs.DFSClient: Closing old block blk_1029135823215149276_4209

DataNode Log:

2012-03-06 12:08:36,551 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 2

The interesting part about the log statements is that they are printed before the thread is started, and before it is ended, meaning it will show one less, and one too many respectively. Also recall, that while the block is written to, it is accounted for in the Hadoop fsck's "-blocks" or "-openforwrite".

Read Data

Pretty much the same overall goes for reading data:

hbase(main):012:0> r = dfs.open('/testfile')        
=> #<Java::OrgApacheHadoopHdfs::DFSClient::DFSInputStream:0xde81d48>

When you create the input stream, nothing happens on the server-side, as the client has yet to indicate what part of the file it wants to read. If you start to read, the client is routed to the proper server using the "seek to" call internally:

DataNode Log:

2012-03-06 12:18:02,055 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 1

HBase Shell:

hbase(main):013:0> r.read
12/03/06 12:18:02 DEBUG fs.FSInputChecker: DFSClient readChunk got seqno 0 offsetInBlock 0 lastPacketInBlock true packetLen 11
=> 1

DataNode Log:

2012-03-06 12:18:02,110 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 2

Since an entire buffer size - configured with "io.file.buffer.size", and set to 4096 (4KB) by default - worth of data is read, and our file was very small (3 Bytes), it was read in one go and the server-side thread was released right away. If we were to read a larger file, then a connection remains open for reading of chunk after chunk of the entire block. We can pick a large file from the HBase directory, open the input stream, and start reading byte by byte:

Command Line:

$ hadoop dfs -lsr /hbase
drwxr-xr-x   - larsgeorge supergroup          0 2012-03-03 12:33 /hbase/-ROOT-
...
-rw-r--r--   1 larsgeorge supergroup  111310855 2012-03-05 11:54 /hbase/usertable/12c95922805e2cb5274396a723a94fa8/family/1454340804524549239
...

HBase Shell:

hbase(main):015:0> r2 = dfs.open('/hbase/usertable/12c95922805e2cb5274396a723a94fa8/family/1454340804524549239')
=> #<Java::OrgApacheHadoopHdfs::DFSClient::DFSInputStream:0x92524b0>
hbase(main):016:0> r2.read                                                                                     
12/03/06 12:26:12 DEBUG fs.FSInputChecker: DFSClient readChunk got seqno 0 offsetInBlock 0 lastPacketInBlock false packetLen 66052
=> 68

DataNode Log:

2012-03-06 12:26:12,294 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 1

HBase Shell:

hbase(main):017:0> r2.read
=> 65
hbase(main):018:0> r2.read
=> 84

JStack Output:

"DataXceiver for client /127.0.0.1:52504 [sending block blk_7972699930188289769_4166]" daemon prio=5 tid=7fb965961800 nid=0x116670000 runnable [11666f000]
   java.lang.Thread.State: RUNNABLE
     at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
     ...

The JStack output shows how the thread is kept running on the server to serve more data as requested by the client. If you close the stream, the resource is freed subsequently, just as expected:

HBase Shell:

hbase(main):019:0> r2.close                                                                                     

DataNode Log:

2012-03-06 12:31:16,659 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 2

So after the thread really exits (should be in the nano- or millisecond range) the count will go back to the minimum of one - which is the DataXceiverServer thread, yes you are correct. :)


Back to HBase

Opening all the regions does not need as many resources on the server as you would have expected. If you scan the entire HBase table though, you force HBase to read all of the blocks in all HFiles:

HBase Shell:

hbase(main):003:0> scan 'usertable'
...
1000000 row(s) in 1460.3120 seconds

DataNode Log:

2012-03-05 14:42:20,580 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 6
2012-03-05 14:43:23,293 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 7
2012-03-05 14:43:23,299 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 8
2012-03-05 14:44:00,255 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 7
2012-03-05 14:44:53,566 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 8
2012-03-05 14:44:53,567 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 7
2012-03-05 14:45:33,562 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 8
2012-03-05 14:46:25,074 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 9
2012-03-05 14:46:25,075 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 10
2012-03-05 14:47:07,854 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 9
2012-03-05 14:47:58,244 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 10
2012-03-05 14:47:58,244 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 9
2012-03-05 14:48:30,010 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 10
2012-03-05 14:49:24,332 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 11
2012-03-05 14:49:24,332 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 10
2012-03-05 14:49:59,987 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 11
2012-03-05 14:51:12,603 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 12
2012-03-05 14:51:12,605 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 11
2012-03-05 14:51:46,473 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 12
2012-03-05 14:52:37,052 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 13
2012-03-05 14:52:37,053 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 12
2012-03-05 14:53:20,047 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 13
2012-03-05 14:54:11,859 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 14
2012-03-05 14:54:11,860 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 15
2012-03-05 14:54:43,615 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 14
2012-03-05 14:55:36,214 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 15
2012-03-05 14:55:36,215 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 14
2012-03-05 14:56:10,440 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 15
2012-03-05 14:56:59,419 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 16
2012-03-05 14:56:59,420 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 15
2012-03-05 14:57:31,722 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 16
2012-03-05 14:58:24,909 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 17
2012-03-05 14:58:24,910 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 16
2012-03-05 14:58:57,186 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 17
2012-03-05 14:59:47,294 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 18
2012-03-05 14:59:47,294 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 17
2012-03-05 15:00:23,101 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 18
2012-03-05 15:01:14,853 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 19
2012-03-05 15:01:14,854 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 18
2012-03-05 15:01:47,388 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 19
2012-03-05 15:02:39,900 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 20
2012-03-05 15:02:39,901 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 19
2012-03-05 15:03:18,794 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 20
2012-03-05 15:04:17,688 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 21
2012-03-05 15:04:17,689 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 22
2012-03-05 15:04:54,545 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 21
2012-03-05 15:05:55,901 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 22
2012-03-05 15:05:55,901 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 21

The number of active connections reaches the elusive 22 now. Note that this count already includes the server thread, so we are still a little short of what we could consider the theoretical maximum - based on the number of files HBase has to handle.

What does that all mean?

So, how many "xcievers (sic)" do you need? Given you only use HBase, you could simply monitor the above "storefiles" metric (which you get also through Ganglia or JMX) and add a few percent for intermediate and write-ahead log files. This should work for systems in motion. However, if you were to determine that number on an idle, fully compacted system and assume it is the maximum, you might find this number being too low once you start adding more store files during regular memstore flushes, i.e. as soon as you start to add data to the HBase tables. Or if you also use MapReduce on that same cluster, Flume log aggregation, and so on. You will need to account for those extra files, and, more importantly, open blocks for reading and writing. 

Note again that the examples in this post are using a single DataNode, something you will not have on a real cluster. To that end, you will have to divide the total number of store files (as per the HBase metric) by the number of DataNodes you have. If you have, for example, a store file count of 1000, and your cluster has 10 DataNodes, then you should be OK with the default of 256 xceiver threads per DataNode.

The worst case would be the number of all active readers and writers, i.e. those that are currently sending or receiving data. But since this is hard to determine ahead of time, you might want to consider building in a decent reserve. Also, since the writing process needs an extra - although shorter lived - thread (for the PacketResponder) you have to account for that as well. So a reasonable, but rather simplistic formula could be:

This formula takes into account that you need about two threads for an active writer and another for an active reader. This is then summed up and divided by the number of DataNodes, since you have to specify the "dfs.datanode.max.xcievers" per DataNode.

If you loop back to the HBase RegionServer screenshot above, you saw that there were 22 store files. These are immutable and will only be read, or in other words occupy one thread only. For all memstores that are flushed to disk you need two threads - but only until they are fully written. The files are finalized and closed for good, cleaning up any thread in the process. So these come and go based on your flush frequency. Same goes for compactions, they will read N files and write them into a single new one, then finalize the new file. As for the write-ahead logs, these will occupy a thread once you have started to add data to any table. There is a log file per server, meaning that you can only have twice as many active threads for these files as you have RegionServers.

For a pure HBase setup (HBase plus its own HDFS, with no other user), we can estimate the number of needed DataXceiver's with the following formula:

Since you will be hard pressed to determine the active number of store files, flushes, and so on, it might be better to estimate the theoretical maximum instead. This maximum value takes into account that you can only have a single flush and compaction active per region at any time. The maximum number of logs you can have active matches the number of RegionServers, leading us to this formula:

Obviously, the number of store files will increase over time, and the number of regions typically as well. Same for the numbers of servers, so keep in mind to adjust this number over time. In practice, you can add a buffer of, for example, 20%, as shown in the formula below - in an attempt to not force you to change the value too often. 

On the other hand, if you keep the number of regions fixed per server[9], and rather split them manually, while adding new servers as you grow, you should be able to keep this configuration property stable for each server.

Final Advice & TL;DR

Here is the final formula you want to use:

It computes the maximum number of threads needed, based on your current HBase vitals (no. of store files, regions, and region servers). It also adds a fudge factor of 20% to give you room for growth. Keep an eye on the numbers on a regular basis and adjust the value as needed. You might want to use Nagios with appropriate checks to warn you when any of the vitals goes over a certain percentage of change.

Note: Please make sure you also adjust the number of file handles your process is allowed to use accordingly[10]. This affects the number of sockets you can use, and if that number is too low (default is often 1024), you will get connection issues first. 

Finally, the engineering devil on one of your shoulders should already have started to snicker about how horribly non-Erlang-y this is, and how you should use an event driven approach, possibly using Akka with Scala[11] - if you want to stay within the JVM world. Bear in mind though that the clever developers in the community share the same thoughts and have already started to discuss various approaches[12][13]. 

Links:

Friday, May 28, 2010

HBase File Locality in HDFS

One of the more ambiguous things in Hadoop is block replication: it happens automatically and you should not have to worry about it. HBase relies on it 100% to provide the data safety as it stores its files into the distributed file system. While that works completely transparent, one of the more advanced questions asked though is how does this affect performance? This usually arises when the user starts writing MapReduce jobs against either HBase or Hadoop directly. Especially with larger data being stored in HBase, how does the system take care of placing the data close to where it is needed? This is referred to data locality and in case of HBase using the Hadoop file system (HDFS) there may be doubts how that is working.

First let's see how Hadoop handles this. The MapReduce documentation advertises the fact that tasks run close to the data they process. This is achieved by breaking up large files in HDFS into smaller chunks, or so called blocks. That is also the reason why the block size in Hadoop is much larger than you may know them from operating systems and their file systems. Default setting is 64MB, but usually 128MB is chosen, if not even larger when you are sure all your files are larger than a single block in size. Each block maps to a task run to process the contained data. That also means larger block sizes equal fewer map tasks to run as the number of mappers is driven by the number of blocks that need processing. Hadoop knows where blocks are located and runs the map tasks directly on the node that hosts it (actually one of them as replication means it has a few hosts to chose from). This is how it guarantees data locality during MapReduce.

Back to HBase. When you have arrived at that point with Hadoop and you now understand that it can process data locally you start to question how this may work with HBase. If you have read my post on HBase's storage architecture you saw that HBase simply stores files in HDFS. It does so for the actual data files (HFile) as well as its log (WAL). And if you look into the code it simply uses FileSystem.create(Path path) to create these. When you then consider two access patterns, a) direct random access and b) MapReduce scanning of tables, you wonder if care was taken that the HDFS blocks are close to where they are read by HBase.

One thing upfront, if you do not co-share your cluster with Hadoop and HBase but instead employ a separate Hadoop as well as a stand-alone HBase cluster then there is no data locality - and it can't be. That equals to running a separate MapReduce cluster where it would not be able to execute tasks directly on the datanode. It is imperative for data locality to have them running on the same cluster, Hadoop (as in the HDFS), MapReduce and HBase. End of story.

OK, you them all co-located on a single (hopefully larger) cluster? Then read on. How does Hadoop figure out where data is located as HBase accesses it. Remember the access pattern above, both go through a single piece of software called a RegionServer. Case a) uses random access patterns while b) scans all contiguous rows of a table but does so through the same API. As explained in my referenced post and mentioned above, HBase simply stores files and those get distributed as replicated blocks across all data nodes of the HDFS. Now imagine you stop HBase after saving a lot of data and restarting it subsequently. The region servers are restarted and assign a seemingly random number of regions. At this very point there is no data locality guaranteed - how could it be?

The most important factor is that HBase is not restarted frequently and that it performs house keeping on a regular basis. These so called compactions rewrite files as new data is added over time. All files in HDFS once written are immutable (for all sorts of reasons). Because of that, data is written into new files and as their number grows HBase compacts them into another set of new, consolidated files. And here is the kicker: HDFS is smart enough to put the data where it is needed! How does that work you ask? We need to take a deep dive into Hadoop's source code and see how the above FileSystem.create(Path path) that HBase uses works. We are running on HDFS here, so we are actually using DistributedFileSystem.create(Path path) which looks like this:
public FSDataOutputStream create(Path f) throws IOException {
  return create(f, true);
}
It returns a FSDataOutputStream and that is create like so:
public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
  return new FSDataOutputStream(dfs.create(getPathName(f), permission, overwrite, replication, blockSize, progress, bufferSize), statistics);
}
It uses a DFSClient instance that is the "umbilical" cord connecting the client with the NameNode:
this.dfs = new DFSClient(namenode, conf, statistics);
What is returned though is a DFSClient.DFSOutputStream instance. As you write data into the stream the DFSClient aggregates it into "packages" which are then written as blocks to the data nodes. This happens in DFSClient.DFSOutputStream.DataStreamer (please hang in there, we are close!) which runs as a daemon thread in the background. The magic unfolds now in a few hops on the stack, first in the daemon run() it gets the list of nodes to store the data on:
nodes = nextBlockOutputStream(src);
This in turn calls:
long startTime = System.currentTimeMillis();
lb = locateFollowingBlock(startTime);
block = lb.getBlock();
nodes = lb.getLocations();
We follow further down and see that locateFollowingBlocks() calls:
return namenode.addBlock(src, clientName);
Here is where it all comes together. The name node is called to add a new block and the src parameter indicates for what file, while clientName is the name of the DFSClient instance. I skip one more small method in between and show you the next bigger step involved:
public LocatedBlock getAdditionalBlock(String src, String clientName) throws IOException {
  ...
  INodeFileUnderConstruction pendingFile  = checkLease(src, clientName);
  ...
  fileLength = pendingFile.computeContentSummary().getLength();
  blockSize = pendingFile.getPreferredBlockSize();
  clientNode = pendingFile.getClientNode();
  replication = (int)pendingFile.getReplication();

  // choose targets for the new block tobe allocated.
  DatanodeDescriptor targets[] = replicator.chooseTarget(replication, clientNode, null, blockSize);
  ...
}
We are finally getting to the core of this code in the replicator.chooseTarget() call:
private DatanodeDescriptor chooseTarget(int numOfReplicas, DatanodeDescriptor writer, List<Node> excludedNodes, long blocksize, int maxNodesPerRack, List<DatanodeDescriptor> results) {
  
  if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
    return writer;
  }
  
  int numOfResults = results.size();
  boolean newBlock = (numOfResults==0);
  if (writer == null && !newBlock) {
    writer = (DatanodeDescriptor)results.get(0); 
  }
  
  try {
    switch(numOfResults) {
    case 0:
      writer = chooseLocalNode(writer, excludedNodes, blocksize, maxNodesPerRack, results);
      if (--numOfReplicas == 0) {
        break;
      }
    case 1:
      chooseRemoteRack(1, results.get(0), excludedNodes, blocksize, maxNodesPerRack, results);
      if (--numOfReplicas == 0) {
        break;
      }
    case 2:
      if (clusterMap.isOnSameRack(results.get(0), results.get(1))) {
        chooseRemoteRack(1, results.get(0), excludedNodes, blocksize, maxNodesPerRack, results);
      } else if (newBlock) {
        chooseLocalRack(results.get(1), excludedNodes, blocksize, maxNodesPerRack, results);
      } else {
        chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack, results);
      }
      if (--numOfReplicas == 0) {
        break;
      }
    default:
      chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize, maxNodesPerRack, results);
    }
  } catch (NotEnoughReplicasException e) {
    FSNamesystem.LOG.warn("Not able to place enough replicas, still in need of " + numOfReplicas);
  }
  return writer;
}
Recall that we have started with the DFSClient and created a file which was subsequently filled with data. As the blocks need writing out the above code checks first if that can be done on the same host that the client is on, i.e. the "writer". That is "case 0". In "case 1" the code tries to find a remote rack to have a distant replication of the block. Lastly is fills the list of required replicas with local or machines of another rack.

So this means for HBase that as the region server stays up for long enough (which is the default) that after a major compaction on all tables - which can be invoked manually or is triggered by a configuration setting - it has the files local on the same host. The data node that shares the same physical host has a copy of all data the region server requires. If you are running a scan or get or any other use-case you can be sure to get the best performance.

Finally a good overview over the HDFS design and data replication can be found here. Also note that the HBase team is working on redesigning how the Master is assigning the regions to servers. The plan is to improve it so that regions are deployed on the server where most blocks are. This will particularly be useful after a restart because it would guarantee a better data locality right off the bat. Stay tuned!

Saturday, May 1, 2010

3rd Munich OpenHUG Meeting

I am pleased to invite you to our third Munich Open Hadoop User Group Meeting!

Like always we are looking forward to see everyone again and are welcoming new attendees to join our group. We are enthusiast about all things related to scalable, distributed storage system. We are not limiting us to a particular system but appreciate anyone who would like to share about their experiences.

When: Thursday May 6th, 2010 at 6pm (open end)
Where: eCircle AG, Nymphenburger Straße 86, 80636 München ["Bruckmann" Building, "U1 Mailinger Str", map (in German) and look for the signs]

Thanks again to Bob Schulze from eCircle for providing the infrastructure.

We have a talk scheduled by Stefan Seelmann who is a member of the project committee for the Apache Directory project. This is followed by an open discussion.

Please RSVP at Xing and Yahoo's Upcoming.

Looking forward to seeing you there!

Cheers,
Lars

Friday, February 12, 2010

FOSDEM 2010 NoSQL Talk

Let me take a minute to wrap up my FOSDEM 2010 experience. I was part of the NoSQL DevRoom organized by @stevenn from Outerthought, who I had the pleasure to visit before a few months back as an HBase Ambassador.

First things first, the NoSQL DevRoom was just an absolute success and I had a blast attending it. I also made sure to not walk around and see other talks outside the NoSQL track while there were many and plenty good ones. I did so deliberately to see the other projects and what they have to offer. I thought it was great, a good vibe was felt throughout the whole day as the audience got a whirlwind tour through the NoSQL landscape. The room was full to the brim for most presentations and some folks had to miss out as we could not have had more enter. This did prove the great interest in this fairly new kind of technology. Exciting!

The focus of my talk was about the history I have with HBase starting with it in late 2007. At this point I would like to take to the opportunity to thank Michael Stack, the lead of HBase, as he has helped me many times back then to sort out the problems I ran into. I would also like to say that if you start with HBase today you will not have these problems as HBase has matured tremendously since then. It is an overall stable product and can solve scalability issues you may face with regular RDBMS's today - and that with great ease.

So the talk I gave did not really sell all the features nor did it explain everything fully. I felt this could be left to the reader to look up on the project's website (or here on my blog) and hence I focused on my use case only. First up, here are the slides.


My Life with HBase - FOSDEM 2010 NoSQL -

After my talk and throughout the rest of the day I also had great conversations with the attendees who had many and great questions.

Having listened to the other talks though I felt I probably could have done a better job selling HBase to the audience. I could have reported about use-cases in well known companies, gave better performance numbers and so on. I have learned a lesson and am making sure I will be doing a better job next time around. I guess this is also another facet of what this is about, i.e. learning to achieve a higher level of professionalism.

But as I said above, my intend was to report about my life with HBase. I am grateful though that it was accepted as that and please let me cite Todd Hoff (see Hot Scalability Links for February 12, 2010) who put it in such nice words:
"The hardscabble tale of HBase's growth from infancy to maturity. A very good introduction and overview of HBase."
Thank you!

Finally here is the video of the talk:



I am looking forward to more NoSQL events in Europe in the near future and will attempt to represent HBase once more (including those adjustments I mentioned above). My hope is that we as Europeans are able to adopt these new technologies and stay abreast with the rest of the world. We sure have smart people to do so.

Friday, February 5, 2010

IvyDE and HBase 0.21 (trunk)

If you are staying on top of HBase development and frequently update to the HBase trunk (0.21.0-dev at the time of this post) you may have noticed that we now have support for Apache Ivy (see HBASE-1433). This is good because it allows to better control dependencies of the required jar files. It does have a few drawbacks though. One issue that you must be online to get your initial set of jars. You can also set up a local mirror or reuse the one you need for Hadoop anyways to share some of them.

Another issue is that it pulls in many more libs as part of the dependency resolving process. This reminds me bit of aptitude and when you try to install Java, for example on Debian. It often wants to pull in a litany of "required" packages but upon closer look many are only recommended and need not to be installed.

Finally you need to get the jar files somehow into your development environment. I am using Eclipse 3.5 on my Windows 7 PC as well as on my MacOS machines. If you have not run ant from the command line yet you have no jars downloaded and opening the project in Eclipse yields an endless amount of errors. You have two choices, you can run ant and get all jars and then add them to the project in Eclipse. But that is rather static and does not work well with future changes. It also is not the "ivy" way to resolve the libraries automatically.

The other option you have is adding a plugin to Eclipse that can handle Ivy for you, right within the IDE. Luckily for Eclipse there is IvyDE. You install it according to its documentation and then add a "Classpath Container" as described here. That part works quite well and after a restart IvyDE is ready to go.

A few more steps have to be done to get HBase working now - as in compiling without errors. The crucial one is editing the Ivy library and setting the HBase specific Ivy files. In particular the "Ivy settings path" and the properties file. The latter especially is specifying all the various version numbers that the ivy.xml is using. Without it the Ivy resolve process will fail with many errors all over the place. Please note that in the screen shot I added you see how it looks like on my Windows PC. The paths will be slightly different for your setup and probably even using another format if you are on a Mac or Linux machine. As long as you specify both you should be fine.

The other important issue is that you have to repeat that same step adding the Classpath Container two more times: each of the two larger contrib packages "contrib/stargate" and "contrib/transactional" have their own ivy.xml! For both you have to go into the respective directory and right click on the ivy.xml and follow the steps described in the Ivy documentation. Enter the same information as mentioned above to make the resolve work, leave everything else the way it is. You may notice that the contrib packages have a few more targets unticked. That is OK and can be used as-is.

As a temporary step you have to add two more static libraries that are in the $HBASE_HOME/lib directory: libthrift-0.2.0.jar and zookeeper-3.2.2.jar. Those will eventually be published on the Ivy repositories and then this step is obsolete (see INFRA-2461).

Eventually you end up with three containers as shown in the second and third screen shot. The Eclipse toolbar now also has an Ivy "Resolve All Dependencies" button which you can use to trigger the download process. Personally I had to do this a few times as the mirrors with the jars seem to be flaky at times. I ended up with for example "hadoop-mapred.jar" missing. Another resolve run fixed the problem.

The last screen shot shows the three Ivy related containers once more in the tree view of the Package Explorer in the Java perspective. What you also see is the Ivy console, which also is installed with the plugin. You have to open it as usual using the "Window - Show View - Console" menu (if you do not have the Console View open already) and then use the drop down menu next to the "Open Console" button in that view to open the Ivy console. It gives you access to all the details when resolving the dependencies and can hint when you have done something wrong. Please note though that it also lists a lot of connection errors, one for every mirror or repository that does not respond or yet has the required package available. One of them should respond though or as mentioned above you will have to try later again.

Eclipse automatically compiles the project and if everything worked out it does so now without a hitch. Good luck!

Update: Added info about the yet still static thrift and zookeeper jars. See Kay Kay's comment below.

Saturday, January 30, 2010

HBase Architecture 101 - Write-ahead-Log

What is the Write-ahead-Log you ask? In my previous post we had a look at the general storage architecture of HBase. One thing that was mentioned is the Write-ahead-Log, or WAL. This post explains how the log works in detail, but bear in mind that it describes the current version, which is 0.20.3. I will address the various plans to improve the log for 0.21 at the end of this article. For the term itself please read here.

Big Picture
The WAL is the lifeline that is needed when disaster strikes. Similar to a BIN log in MySQL it records all changes to the data. This is important in case something happens to the primary storage. So if the server crashes it can effectively replay that log to get everything up to where the server should have been just before the crash. It also means that if writing the record to the WAL fails the whole operation must be considered a failure.

Let"s look at the high level view of how this is done in HBase. First the client initiates an action that modifies data. This is currently a call to put(Put), delete(Delete) and incrementColumnValue() (abbreviated as "incr" here at times). Each of these modifications is wrapped into a KeyValue object instance and sent over the wire using RPC calls. The calls are (ideally batched) to the HRegionServer that serves the affected regions. Once it arrives the payload, the said KeyValue, is routed to the HRegion that is responsible for the affected row. The data is written to the WAL and then put into the MemStore of the actual Store that holds the record. And that also pretty much describes the write-path of HBase.

Eventually when the MemStore gets to a certain size or after a specific time the data is asynchronously persisted to the file system. In between that timeframe data is stored volatile in memory. And if the HRegionServer  hosting that memory crashes the data is lost... but for the existence of what is the topic of this post, the WAL!

We have a look now at the various classes or "wheels" working the magic of the WAL. First up is one of the main classes of this contraption.

HLog

The class which implements the WAL is called HLog. What you may have read in my previous post and is also illustrated above is that there is only one instance of the HLog class, which is one per HRegionServer. When a HRegion is instantiated the single HLog is passed on as a parameter to the constructor of HRegion.

Central part to HLog's functionality is the append() method, which internally eventually calls doWrite(). It is what is called when the above mentioned modification methods are invoked... or is it? One thing to note here is that for performance reasons there is an option for put(), delete(), and incrementColumnValue() to be called with an extra parameter set: setWriteToWAL(boolean). If you invoke this method while setting up for example a Put() instance then the writing to WAL is forfeited! That is also why the downward arrow in the big picture above is done with a dashed line to indicate the optional step. By default you certainly want the WAL, no doubt about that. But say you run a large bulk import MapReduce job that you can rerun at any time. You gain extra performance but need to take extra care that no data was lost during the import. The choice is yours.

Another important feature of the HLog is keeping track of the changes. This is done by using a "sequence number". It uses an AtomicLong internally to be thread-safe and is either starting out at zero - or at that last known number persisted to the file system. So as the region is opening its storage file, it reads the highest sequence number which is stored as a meta field in each HFile and sets the HLog sequence number to that value if it is higher than what has been recorded before. So at the end of opening all storage files the HLog is initialized to reflect where persisting has ended and where to continue. You will see in a minute where this is used.

The image to the right shows three different regions. Each of them covering a different row key range. As mentioned above each of these regions shares the the same single instance of HLog. What that means in this context is that the data as it arrives at each region it is written to the WAL in an unpredictable order. We will address this further below.

Finally the HLog has the facilities to recover and split a log left by a crashed HRegionServer. These are invoked by the HMaster before regions are deployed again.

HLogKey

Currently the WAL is using a Hadoop SequenceFile, which stores record as sets of key/values. For the WAL the value is simply the KeyValue sent from the client. The key is represented by an HLogKey instance. If you may recall from my first post in this series the KeyValue does only represent the row, column family, qualifier, timestamp, and value as well as the "Key Type". Last time I did not address that field since there was no context. Now we have one because the Key Type is what identifies what the KeyValue represents, a "put" or a "delete" (where there are a few more variations of the latter to express what is to be deleted, value, column family or a specific column).

What we are missing though is where the KeyValue belongs to, i.e. the region and the table name. That is stored in the HLogKey. What is also stored is the above sequence number. With each record that number is incremented to be able to keep a sequential order of edits. Finally it records the "Write Time", a time stamp to record when the edit was written to the log.

LogFlusher

As mentioned above as data arrives at a HRegionServer in form of KeyValue instances it is written (optionally) to the WAL. And as mentioned as well it is then written to a SequenceFile. While this seems trivial, it is not. One of the base classes in Java IO is the Stream. Especially streams writing to a file system are often buffered to improve performance as the OS is much faster writing data in batches, or blocks. If you write records separately IO throughput would be really bad. But in the context of the WAL this is causing a gap where data is supposedly written to disk but in reality it is in limbo. To mitigate the issue the underlaying stream needs to be flushed on a regular basis. This functionality is provided by the LogFlusher class and thread. It simply calls HLog.optionalSync(), which checks if the  hbase.regionserver.optionallogflushinterval, set to 10 seconds by default, has been exceeded and if that is the case invokes HLog.sync(). The other place invoking the sync method is HLog.doWrite(). Once it has written the current edit to the stream it checks if the hbase.regionserver.flushlogentries parameter, set to 100 by default, has been exceeded and calls sync as well.

Sync itself invokes HLog.Writer.sync() and is implemented in SequenceFileLogWriter. For now we assume it flushes the stream to disk and all is well. That in reality this is all a bit more complicated is discussed below.

LogRoller

Obviously it makes sense to have some size restrictions related to the logs written. Also we want to make sure a log is persisted on a regular basis. This is done by the LogRoller class and thread. It is controlled by the hbase.regionserver.logroll.period parameter in the $HBASE_HOME/conf/hbase-site.xml file. By default this is set to 1 hour. So every 60 minutes the log is closed and a new one started. Over time we are gathering that way a bunch of log files that need to be maintained as well. The HLog.rollWriter() method, which is called by the LogRoller to do the above rolling of the current log file, is taking care of that as well by calling HLog.cleanOldLogs() subsequently. It checks what the highest sequence number written to a storage file is, because up to that number all edits are persisted. It then checks if there is a log left that has edits all less than that number. If that is the case it deletes said logs and leaves just those that are still needed.

This is a good place to talk about the following obscure message you may see in your logs:

2009-12-15 01:45:48,427 INFO org.apache.hadoop.hbase.regionserver.HLog: Too
many hlogs: logs=130, maxlogs=96; forcing flush of region with oldest edits:
foobar,1b2dc5f3b5d4,1260083783909


It is printed because the configured maximum number of log files to keep exceeds the number of log files that are required to be kept because they still contain outstanding edits that have not yet been persisted. The main reason I saw this being the case is when you stress out the file system so much that it cannot keep up persisting the data at the rate new data is added. Otherwise log flushes should take care of this. Note though that when this message is printed the server goes into a special mode trying to force flushing out edits to reduce the number of logs required to be kept.

The other parameters controlling the log rolling are hbase.regionserver.hlog.blocksize and hbase.regionserver.logroll.multiplier, which are set by default to rotate logs when they are at 95% of the blocksize of the SequenceFile, typically 64M. So either the logs are considered full or when a certain amount of time has passed causes the logs to be switched out, whatever comes first.

Replay

Once a HRegionServer starts and is opening the regions it hosts it checks if there are some left over log files and applies those all the way down in Store.doReconstructionLog(). Replaying a log is simply done by reading the log and adding the contained edits to the current MemStore. At the end an explicit flush of the MemStore (note, this is not the flush of the log!) helps writing those changes out to disk.

The old logs usually come from a previous region server crash. When the HMaster is started or detects that region server has crashed it splits the log files belonging to that server into separate files and stores those in the region directories on the file system they belong to. After that the above mechanism takes care of replaying the logs. One thing to note is that regions from a crashed server can only be redeployed if the logs have been split and copied. Splitting itself is done in HLog.splitLog(). The old log is read into memory in the main thread (means single threaded) and then using a pool of threads written to all region directories, one thread for each region.

Issues

As mentioned above all edits are written to one HLog per HRegionServer. You would ask why that is the case? Why not write all edits for a specific region into its own log file? Let's quote the BigTable paper once more:

If we kept the commit log for each tablet in a separate log file, a very large number of files would be written concurrently in GFS. Depending on the underlying file system implementation on each GFS server, these writes could cause a large number of disk seeks to write to the different physical log files.

HBase followed that principle for pretty much the same reasons. As explained above you end up with many files since logs are rolled and kept until they are safe to be deleted. If you do this for every region separately this would not scale well - or at least be an itch that sooner or later is causing pain.

So far that seems to be no issue. But again, it causes problems when things go wrong. As long as you have applied all edits in time and persisted the data safely, all is well. But if you have to split the log because of a server crash then you need to divide into suitable pieces, as described above in the "replay" paragraph. But as you have seen above as well all edits are intermingled in the log and there is no index of what is stored at all. For that reason the HMaster cannot redeploy any region from a crashed server until it has split the logs for that very server. And that can be quite a number if the server was behind applying the edits.

Another problem is data safety. You want to be able to rely on the system to save all your data, no matter what newfangled algorithms are employed behind the scenes. As far as HBase and the log is concerned you can turn down the log flush times to as low as you want - you are still dependent on the underlaying file system as mentioned above; the stream used to store the data is flushed but is it written to disk yet? We are talking about fsync style issues. Now for HBase we are most likely talking Hadoop's HDFS as being the file system that is persisted to.

Up to this point it should be abundantly clear that the log is what keeps data safe. For that reason a log could be kept open for up to an hour (or more if configured so). As data arrives a new key/value pair is written to the SequenceFile and occasionally flushed to disk. But that is not how Hadoop was set out to work. It was meant to provide an API that allows to open a file, write data into it (preferably a lot) and closed right away, leaving an immutable file for everyone else to read many times. Only after a file is closed it is visible and readable to others. If a process dies while writing the data the file is pretty much considered lost. What is required is a feature that allows to read the log up to the point where the crashed server has written it (or as close as possible).

Interlude: HDFS append, hflush, hsync, sync... wth?

It all started with HADOOP-1700 reported by HBase lead Michael Stack. It was committed in Hadoop 0.19.0 and meant to solve the problem. But that was not the case. So the issue was tackled again in HADOOP-4379 aka HDFS-200 and implemented syncFs() that was meant to help syncing changes to a file to be more reliable. For a while we had custom code (see HBASE-1470) that detected a patched Hadoop that exposed that API. But again this did not solve the issue entirely.

Then came HDFS-265, which revisits the append idea in general. It also introduces a Syncable interface that exposes hsync() and hflush().

Lastly SequenceFile.Writer.sync() is not the same as the above, it simply writes a synchronization marker into the file that helps reading it later - or recover data if broken.

While append for HDFS in general is useful it is not used in HBase, but the hflush() is. What it does is writing out everything to disk as the log is written. In case of a server crash we can safely read that "dirty" file up to the last edits. The append in Hadoop 0.19.0 was so badly suited that a hadoop fsck / would report the DFS being corrupt because of the open log files HBase kept.

Bottom line is, without Hadoop 0.21.0 you can very well face data loss. With Hadoop 0.21.0 you have a state-of-the-art system.

Planned Improvements

For HBase 0.21.0 there are quite a few things lined up that affect the WAL architecture. Here are some of the noteworthy ones.

SequenceFile Replacement

One of the central building blocks around the WAL is the actual storage file format. The used SequenceFile has quite a few shortcomings that need to be addressed. One for example is the suboptimal performance as all writing in SequenceFile is synchronized, as documented in HBASE-2105.

As with HFile replacing MapFile in HBase 0.20.0 it makes sense to think about a complete replacement. A first step was done to make the HBase classes independent of the underlaying file format. HBASE-2059 made the class implementing the log configurable.

Another idea is to change to a different serialization altogether. HBASE-2055 proposes such a format using Hadoop's Avro as the low level system. Avro is also slated to be the new RPC format for Hadoop, which does help as more people are familiar with it.

Append/Sync

Even with hflush() we have a problem that calling it too often may cause the system to slow down. Previous tests using the older syncFs() call did show that calling it for every record slows down the system considerably. One step to help is to implement a "Group Commit", done in HBASE-1939. It flushes out records in batches. In addition HBASE-1944 adds the notion of a "deferred log flush" as a parameter of a Column Family. If set to true it leaves the syncing of changes to the log to the newly added LogSyncer class and thread. Finally HBASE-2041 sets the flushlogentries to 1 and optionallogflushinterval to 1000 msecs. The .META. is always synced for every change, user tables can be configured as needed.

Distributed Log Splitting

As remarked splitting the log is an issue when regions need to be redeployed. One idea is to keep a list of regions with edits in Zookeeper. That way at least all "clean" regions can be deployed instantly. Only those with edits need to wait then until the logs are split.

What is left is to improve how the logs are split to make the process faster. Here is how is the BigTable addresses the issue:
One approach would be for each new tablet server to read this full commit log file and apply just the entries needed for the tablets it needs to recover. However, under such a scheme, if 100 machines were each assigned a single tablet from a failed tablet server, then the log file would be read 100 times (once by each server).
and further
We avoid duplicating log reads by first sorting the commit log entries in order of the keys (table, row name, log sequence number). In the sorted output, all mutations for a particular tablet are contiguous and can therefore be read efficiently with one disk seek followed by a sequential read. To parallelize the sorting, we partition the log file into 64 MB segments, and sort each segment in parallel on different tablet servers. This sorting process is coordinated by the master and is initiated when a tablet server indicates that it needs to recover mutations from some commit log file.
This is where its at. As part of the HMaster rewrite (see HBASE-1816) the log splitting will be addressed as well. HBASE-1364 wraps the splitting of logs into one issue. But I am sure that will evolve in more sub tasks as the details get discussed.