0
0

对hbase coprocessor使用方法不当导致的一个程序bug

宝牛 发表于 2014年03月21日 17:42 | Hits: 2127
Tag: 分布式技术 | hbase | NoSQL

在某系统中对一张表数据写入量很大,频繁的compaction导致效率很低。这张表已经presharding过了,有几百个region,由于某些原因,短期内不太允许增大region数。当时采用的方法是每小时生成一张表,每小时的数据只写对应的表。后来发现这24张表对后面的业务处理带来很大的麻烦。需要把这24张表合为一张表,于是写了个DisableRegionCompaction,想对指定时间前的数据禁用compaction。

看了hbase coprocessor的官网介绍(https://blogs.apache.org/hbase/entry/coprocessor_introduction)。hbase的coprocessor分为observer和endpoint两种,coprocessor类似于传统数据库的触发器,endpoint则类似于存储过程。observer又分为三种:RegionObserver,WALObserver和MasterObserver。

RegionObserver: Provides hooks for data manipulation events, Get, Put, Delete, Scan, and so on. There is an instance of a RegionObserver coprocessor for every table region and the scope of the observations they can make is constrained to that region.

WALObserver: Provides hooks for write-ahead log (WAL) related operations. This is a way to observe or intercept WAL writing and reconstruction events. A WALObserver runs in the context of WAL processing. There is one such context per region server.

MasterObserver: Provides hooks for DDL-type operation, i.e., create, delete, modify table, etc. The MasterObserver runs within the context of the HBase master.

如果要控制hbase表的compaction行为,理论上只要写一个针对region的RegionObserver coprocessor就能可以。于是写了个DisableRegionCompaction类,它实现了RegionObserver接口类,重写了preCompactSelection这一个接口,其他的接口都用的是eclipse自动生成的代码。

public void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store, List<StoreFile> candidates) {

    // candidates中保存的是所有要进行compaction的候选的StoreFile

    // 程序里面主要干的活是:对一个小时之前的StoreFile从candidates中剔除(remove)掉不参与compaction
}

测试的时候发现有数据丢失的情况。下图中数据是四条记录,hfile有四个文件:
hfile-log

图中这张表有4个hfile,本意是让其中18:33分的两个hfile不参与compaction,剩余的两个合并。

现象是major_compact后,凡是preCompactSelection代码中remove掉的region数据(18:33分的两个hfile)都存在,剩余参与compaction的StoreFile中数据(18:34和18:35分的两个)都丢失了!

查看region server上的log:

发现确实有2个StoreFile参与了compaction,但是结果数据为null。

查看hbase 0.94.1代码,发现是org/apache/hadoop/hbase/regionserver/Store.java的compactStore()返回的结果为空

compactStore() 代码中发现最可能是这几行有问题:

        /* include deletes, unless we are doing a major compaction */
        scanner = new StoreScanner(this, scan, scanners,
            majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT,
            smallestReadPoint, earliestPutTs);
        if (region.getCoprocessorHost() != null) {
          InternalScanner cpScanner = region.getCoprocessorHost().preCompact(
              this, scanner);
          // NULL scanner returned from coprocessor hooks means skip normal processing
          if (cpScanner == null) {
            return null;
          }

          scanner = cpScanner;
        }

联想到preCompact也是有coprocessor接口的,于是看我自己写的DisableRegionCompaction代码(eclipse自动生成的)发现是这样写的:

public InternalScanner preCompact(

           ObserverContext<RegionCoprocessorEnvironment> c, Store store,

           InternalScanner scanner) {

       // TODO Auto-generated method stub

       return null;

    }

就是这个地方的问题了,返回了一个null的scanner,改为返回传入的scanner就可以了,因为这里并不需要重写preCompact接口。

其实在RegionObserver接口中对preCompact接口的定义:

  /**
   * Called prior to writing the {@link StoreFile}s selected for compaction into
   * a new {@code StoreFile}.  To override or modify the compaction process,
   * implementing classes have two options:
   *
<ul>
   *
	<li>Wrap the provided {@link InternalScanner} with a custom
   *   implementation that is returned from this method.  The custom scanner
   *   can then inspect {@link KeyValue}s from the wrapped scanner, applying
   *   its own policy to what gets written.</li>
*
	<li>Call {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()}
   *   and provide a custom implementation for writing of new
   *   {@link StoreFile}s.  <strong>Note: any implementations bypassing
   *   core compaction using this approach must write out new store files
   *   themselves or the existing data will no longer be available after
   *   compaction.</strong></li>
*</ul>
* @param c the environment provided by the region server
   * @param store the store being compacted
   * @param scanner the scanner over existing data used in the store file
   * rewriting
   * @return the scanner to use during compaction.  Should not be {@code null}
   * unless the implementation is writing new store files on its own.
   * @throws IOException if an error occurred on the coprocessor
   */
  InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c,
      final Store store, final InternalScanner scanner) throws IOException;

对返回值有个说明“@return the scanner to use during compaction. Should not be {@code null}unless the implementation is writing new store files on its own.”

再仔细看了下hbase的代码,发现hbase里面已经有个实现了RegionObserver接口的BaseRegionObserver的抽象类了,它里面的实现就是:

  @Override
  public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
      final Store store, final InternalScanner scanner) throws IOException {
    return scanner;
  }

所以代码里面直接继承BaseRegionObserver这个抽象类就可以了。

在hbase官方文档(https://blogs.apache.org/hbase/entry/coprocessor_introduction)上对BaseRegionObserver类的说明是:

We provide a convenient abstract class BaseRegionObserver, which implements all RegionObserver methods with default behaviors, so you can focus on what events you have interest in, without having to be concerned about process upcalls for all of them.

看起来是对接口使用不当的低级错误。大家引己为戒,多读读hbase官方文档吧。

正如某大牛所说:

一个设计良好的系统,对于包含很多接口的接口类,一般都提供了抽象类供使用。

原文链接: http://www.searchtb.com/2014/03/hbase-coprocessor-bug.html

0     0

我要给这篇文章打分:

可以不填写评论, 而只是打分. 如果发表评论, 你可以给的分值是-5到+5, 否则, 你只能评-1, +1两种分数. 你的评论可能需要审核.

评价列表(0)