Есть сценарий, когда в HBase требуется пейджинговый запрос, а фильтрация выполняется по значению определенного столбца.
В отличие от СУБД, которая естественным образом поддерживает запросы на подкачку, HBase должна сама реализовывать подкачку. Насколько мне известно, в настоящее время существует два решения. Одно из них — использовать цикл PageFilter plus для динамической установки startRow, как указано в «Авторитетном руководстве по HBase».здесь. Однако этот метод относительно неэффективен и имеет избыточные запросы. Поэтому компания JD.com разработала метод, использующий дополнительную таблицу для хранения номеров строк.строить планы. Такая схема более эффективна, но сложнее в реализации и требует ведения дополнительной таблицы.
Независимо от плана или человека, нет лучшего, есть только самое подходящее. В нашем сценарии использования требования к производительности невысоки, поэтому принимается первое решение. Первоначально использовалась красота, но однажды вам нужно отфильтровать по значению определенного столбца при поиске постраничного запроса. Для фильтрации по значению столбца естественно использовать SingleColumnValueFilter (далее SCVFilter). Код примерно такой, указана только логика, относящаяся к теме этой статьи,
Scan scan = initScan(xxx);
FilterList filterList=new FilterList();
scan.setFilter(filterList);
filterList.addFilter(new PageFilter(1));
filterList.addFilter(new SingleColumnValueFilter(FAMILY,ISDELETED, CompareFilter.CompareOp.EQUAL, Bytes.toBytes(false)));
Данные следующие
row1 column=f:content, timestamp=1513953705613, value=content1
row1 column=f:isDel, timestamp=1513953705613, value=1
row1 column=f:name, timestamp=1513953725029, value=name1
row2 column=f:content, timestamp=1513953705613, value=content2
row2 column=f:isDel, timestamp=1513953744613, value=0
row2 column=f:name, timestamp=1513953730348, value=name2
row3 column=f:content, timestamp=1513953705613, value=content3
row3 column=f:isDel, timestamp=1513953751332, value=0
row3 column=f:name, timestamp=1513953734698, value=name3
в коде выше. Для сканирования добавляются два фильтра: сначала добавляется PageFilter, чтобы ограничить количество запросов на этот раз до 1, а затем добавляется SCVFilter, чтобы ограничить только возвратisDeleted=false
линия.
Приведенный выше код выглядит безупречно, но во время выполнения данные не запрашиваются!
Недавно я просматривал код HBase и локально отлаживал процесс запроса, связанный с фильтром сервера HBase.
Процесс фильтрации
Сначала посмотрите на процесс HBase Filter, как показано на рисунке: [Ошибка загрузки изображения...(image-a30991-1514982234552)]
Затем посмотрите на логику реализации PageFilter.
public class PageFilter extends FilterBase {
private long pageSize = Long.MAX_VALUE;
private int rowsAccepted = 0;
/**
* Constructor that takes a maximum page size.
*
* @param pageSize Maximum result size.
*/
public PageFilter(final long pageSize) {
Preconditions.checkArgument(pageSize >= 0, "must be positive %s", pageSize);
this.pageSize = pageSize;
}
public long getPageSize() {
return pageSize;
}
@Override
public ReturnCode filterKeyValue(Cell ignored) throws IOException {
return ReturnCode.INCLUDE;
}
public boolean filterAllRemaining() {
return this.rowsAccepted >= this.pageSize;
}
public boolean filterRow() {
this.rowsAccepted++;
return this.rowsAccepted > this.pageSize;
}
}
На самом деле это очень просто, внутри есть счетчик, при каждом вызове filterRow счетчик будет увеличиваться на 1. Если значение счетчика больше, чем pageSize, filterrow вернет true, а затем будут отфильтрованы последующие строки. .
Посмотрите на логику реализации SCVFilter.
public class SingleColumnValueFilter extends FilterBase {
private static final Log LOG = LogFactory.getLog(SingleColumnValueFilter.class);
protected byte [] columnFamily;
protected byte [] columnQualifier;
protected CompareOp compareOp;
protected ByteArrayComparable comparator;
protected boolean foundColumn = false;
protected boolean matchedColumn = false;
protected boolean filterIfMissing = false;
protected boolean latestVersionOnly = true;
/**
* Constructor for binary compare of the value of a single column. If the
* column is found and the condition passes, all columns of the row will be
* emitted. If the condition fails, the row will not be emitted.
* <p>
* Use the filterIfColumnMissing flag to set whether the rest of the columns
* in a row will be emitted if the specified column to check is not found in
* the row.
*
* @param family name of column family
* @param qualifier name of column qualifier
* @param compareOp operator
* @param comparator Comparator to use.
*/
public SingleColumnValueFilter(final byte [] family, final byte [] qualifier,
final CompareOp compareOp, final ByteArrayComparable comparator) {
this.columnFamily = family;
this.columnQualifier = qualifier;
this.compareOp = compareOp;
this.comparator = comparator;
}
@Override
public ReturnCode filterKeyValue(Cell c) {
if (this.matchedColumn) {
// We already found and matched the single column, all keys now pass
return ReturnCode.INCLUDE;
} else if (this.latestVersionOnly && this.foundColumn) {
// We found but did not match the single column, skip to next row
return ReturnCode.NEXT_ROW;
}
if (!CellUtil.matchingColumn(c, this.columnFamily, this.columnQualifier)) {
return ReturnCode.INCLUDE;
}
foundColumn = true;
if (filterColumnValue(c.getValueArray(), c.getValueOffset(), c.getValueLength())) {
return this.latestVersionOnly? ReturnCode.NEXT_ROW: ReturnCode.INCLUDE;
}
this.matchedColumn = true;
return ReturnCode.INCLUDE;
}
private boolean filterColumnValue(final byte [] data, final int offset,
final int length) {
int compareResult = this.comparator.compareTo(data, offset, length);
switch (this.compareOp) {
case LESS:
return compareResult <= 0;
case LESS_OR_EQUAL:
return compareResult < 0;
case EQUAL:
return compareResult != 0;
case NOT_EQUAL:
return compareResult == 0;
case GREATER_OR_EQUAL:
return compareResult > 0;
case GREATER:
return compareResult >= 0;
default:
throw new RuntimeException("Unknown Compare op " + compareOp.name());
}
}
public boolean filterRow() {
// If column was found, return false if it was matched, true if it was not
// If column not found, return true if we filter if missing, false if not
return this.foundColumn? !this.matchedColumn: this.filterIfMissing;
}
}
В HBase для каждого столбца каждой строки вызывается filterKeyValue, логика обработки этого метода SCVFilter следующая:
1. 如果已经匹配过对应的列并且对应列的值符合要求,则直接返回INCLUE,表示这一行的这一列要被加入到结果集
2. 否则如latestVersionOnly为true(latestVersionOnly代表是否只查询最新的数据,一般为true),并且已经匹配过对应的列(但是对应的列的值不满足要求),则返回EXCLUDE,代表丢弃该行
3. 如果当前列不是要匹配的列。则返回INCLUDE,否则将matchedColumn置为true,代表以及找到了目标列
4. 如果当前列的值不满足要求,在latestVersionOnly为true时,返回NEXT_ROW,代表忽略当前行还剩下的列,直接跳到下一行
5. 如果当前列的值满足要求,将matchedColumn置为true,代表已经找到了对应的列,并且对应的列值满足要求。这样,该行下一列再进入这个方法时,到第1步就会直接返回,提高匹配效率
Посмотрите еще раз на метод filterRow, метод вызывается только один раз для каждой строки после filterKeyValue. Логика этого метода в SCVFilter очень проста:
1. 如果找到了对应的列,如其值满足要求,则返回false,代表将该行加入到结果集,如其值不满足要求,则返回true,代表过滤该行
2. 如果没找到对应的列,返回filterIfMissing的值。
предполагать:
Это из-за добавления PageFilter в начало SCVFilter, при оценке первой строки вызывается filterRow PageFilter, что приводит к тому, что счетчик PageFilter равен +1, но когда достигается filterRow SCVFilter, строка снова отфильтровывается. В следующей строке, поскольку счетчик PageFilter достиг установленного нами pageSize, следующая строка будет отфильтрована, а в возвращаемом результате нет данных.
проверять:
В FilterList сначала добавьте SCVFilter, затем добавьте PageFilter.
Scan scan = initScan(xxx);
FilterList filterList=new FilterList();
scan.setFilter(filterList);
filterList.addFilter(new SingleColumnValueFilter(FAMILY,ISDELETED, CompareFilter.CompareOp.EQUAL, Bytes.toBytes(false)));
filterList.addFilter(new PageFilter(1));
Результатом является значение, которое мы ожидаем для строки 2.
В заключение
При использовании PageFilter и других фильтров лучше всего добавлять PageFilter в конец FilterList, иначе количество результатов может оказаться меньше ожидаемого. (На самом деле количество результатов, возвращаемых PageFilter, может быть больше установленного значения при нормальных обстоятельствах, поскольку PageFilter кластера серверов изолирован.)
пасхальные яйца
На самом деле в процессе устранения проблемы все было не так гладко, потому что проблема была онлайн, поэтому я создал некоторые тестовые данные, когда проверял проблему локально, Удивительно, но даже если я сначала добавил SCVFilter, а потом добавил PageFilter, Возвращаемый результат также соответствует ожиданиям. Данные теста следующие:
row1 column=f:isDel, timestamp=1513953705613, value=1
row1 column=f:name, timestamp=1513953725029, value=name1
row2 column=f:isDel, timestamp=1513953744613, value=0
row2 column=f:name, timestamp=1513953730348, value=name2
row3 column=f:isDel, timestamp=1513953751332, value=0
row3 column=f:name, timestamp=1513953734698, value=name3
В то время проблему нельзя было воспроизвести локально. Очень огорчился и, наконец, обнаружил, что результаты запроса с использованием SCVFilter также связаны с порядком столбцов данных.
На стороне сервера HBase инкапсулирует фильтр, переданный клиентом, в FilterWrapper.
class RegionScannerImpl implements RegionScanner {
RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
throws IOException {
this.region = region;
this.maxResultSize = scan.getMaxResultSize();
if (scan.hasFilter()) {
this.filter = new FilterWrapper(scan.getFilter());
} else {
this.filter = null;
}
}
....
}
При запросе данных в методе nextInternal класса HRegion будет вызываться метод filterRowCellsWithRet класса FilterWrapper.
Соответствующий код FilterWrapper выглядит следующим образом:
/**
* This is a Filter wrapper class which is used in the server side. Some filter
* related hooks can be defined in this wrapper. The only way to create a
* FilterWrapper instance is passing a client side Filter instance through
* {@link org.apache.hadoop.hbase.client.Scan#getFilter()}.
*
*/
final public class FilterWrapper extends Filter {
Filter filter = null;
public FilterWrapper( Filter filter ) {
if (null == filter) {
// ensure the filter instance is not null
throw new NullPointerException("Cannot create FilterWrapper with null Filter");
}
this.filter = filter;
}
public enum FilterRowRetCode {
NOT_CALLED,
INCLUDE, // corresponds to filter.filterRow() returning false
EXCLUDE // corresponds to filter.filterRow() returning true
}
public FilterRowRetCode filterRowCellsWithRet(List<Cell> kvs) throws IOException {
this.filter.filterRowCells(kvs);
if (!kvs.isEmpty()) {
if (this.filter.filterRow()) {
kvs.clear();
return FilterRowRetCode.EXCLUDE;
}
return FilterRowRetCode.INCLUDE;
}
return FilterRowRetCode.NOT_CALLED;
}
}
Здесь kvs — это столбец, который не фильтруется после прохождения строки данных через filterKeyValue.
Видно, что когда kvs не пуст, метод filterRowCellsWithRet будет вызывать метод filterRow указанного фильтра, как было сказано выше, счетчик PageFilter увеличивается в его методе filterRow.
А когда kvs пуст, счетчик PageFilter не увеличится. Посмотрите еще раз на наши тестовые данные, потому что первый столбец строки является целевым столбцом isDeleted SCVFilter. Оглядываясь назад на объяснение SCVFilter выше, мы знаем, что когда значение целевого столбца строки не соответствует требованиям, оставшиеся столбцы строки будут напрямую отфильтрованы!
Для первой строки тестовых данных kvs пуст при переходе к filterRowCellsWithRet. Счетчик PageFilter не имеет +1. Он также продолжит обход оставшихся строк. Это заставляет возвращаемые результаты выглядеть нормально.
Для рассматриваемых данных, поскольку есть содержимое столбца перед столбцом isDeleted, когда isDeleted строки не соответствует требованиям, kvs не будет пустым. Потому что значение содержимого столбца было добавлено в kvs (время вызова этих данных в filterrow SCVFilter будет отфильтровано).
впечатление
С точки зрения реализации реализация фильтра HBase все еще относительно грубая. Эффективность также впечатляет, независимо от сетевой передачи и потребления памяти на стороне клиента, что в основном совпадает с фильтрацией на стороне клиента.