Компиляция и использование официального барной пакета искры на HBASE

Spark

предисловие

После некоторых поисков в Spark появился специальный пакет зависимостей Maven для чтения и записи HBase.HBase предоставляет проект HBase Spark Connector.На официальном сайте hbase упоминается, что этот проект можно скомпилировать из исходного кода. Таким образом, существуют зависимости hbase-spark, похожие на spark-kafka и spark-hive. Библиотека mvn теперь предоставляет версию 1.0 https://mvnrepository.com/artifact/org.apache.hbase.connectors.spark/hbase-spark/1.0.0 со Spark 2.4.0, Scala 2.11.12, другие компилироваться самостоятельно.

Официальный сайт 1.0jar maven зависимости:

<!-- https://mvnrepository.com/artifact/org.apache.hbase.connectors.spark/hbase-spark -->
<dependency>
    <groupId>org.apache.hbase.connectors.spark</groupId>
    <artifactId>hbase-spark</artifactId>
    <version>1.0.0</version>
</dependency>

С этим проектом очень легко запутаться: https://mvnrepository.com/artifact/org.apache.hbase/hbase-spark, это не проект HBase Connector Spark!

Скомпилируйте исходный код hbase-spark

Адрес источника

Apache Hbase поддерживает проект, отныне, пакет сжатия исходного кода:Исходный код Hbase Connectors-Spark

Соединитель Apache HBase™ Spark

  • Scala and Spark Versions

To generate an artifact for a different spark version and/or scala version, pass command-line options as follows (changing version numbers appropriately):

$ mvn -Dspark.version=2.3.1 -Dscala.version=2.11.8 -Dscala.binary.version=2.11 clean install

Подготовьтесь к компиляции зависимостей hbase-spark для Spark2.3.1 и Scala2.11.8:

unzip hbase-connectors-master.zip
cd hbase-connectors-master/
mvn -Dspark.version=2.3.1 -Dscala.version=2.11.8 -Dscala.binary.version=2.11 clean install

ошибка компиляции

  1. Версия maven слишком низкая, установите maven3.5.4 и настройте переменные среды.
  2. Ошибка TestJavaHBaseContext, добавьте -DskipTests при компиляции
    Окончательный оператор компиляции:

mvn -Dspark.version=2.3.1 -Dscala.version=2.11.8 -Dscala.binary.version=2.11 -DskipTests clean install

Скомпилировано успешно

Расположение: ~/hbase-connectors-master/spark/hbase-spark/target

этоhbase-spark-1.0.1-SNAPSHOT.jar, который теперь доступен в проекте.

Используйте hbase-spark-1.0.1-SNAPSHOT.jar

Запись данных с использованием HBaseContext

import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}

//从编译的hbase-spark-1.0.1-SNAPSHOT.jar中引入
import org.apache.hadoop.hbase.spark.HBaseContext

import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import java.util.UUID
/**
  * 引入了从Hbase官网编译的hbase-spark jar,调用HBaseContext
  * Spark批量写数据到HBase
  */
object SparkWithHBase {
  def main(args: Array[String]): Unit = {
    //Spark统一入口
    val spark = SparkSession.builder()
      .appName("Spark JDBC Test")
      .master("local")
      .getOrCreate()
    //列族名称
    val SRC_FAMILYCOLUMN = "info"
    //Hbase配置
    val config = HBaseConfiguration.create()
    config.set("hbase.zookeeper.quorum", "manager.bigdata")
    config.set("hbase.zookeeper.property.clientPort", "2181")
    //Hbase上下文,是API的核心
    val hbaseContext = new HBaseContext(spark.sparkContext, config)
    //读取数据源,封装成<RowKey,Values>这种格式
    val rdd: RDD[(String, Array[(String, String)])] = spark.read.csv("hdfs://manager.bigdata:8020/traffic.txt")
      .rdd
      .map(r => {
        (UUID.randomUUID().toString, Array((r.getString(0), "c1"), (r.getString(1), "c2"), (r.getString(2), "c3")))
      })
    //使用批量put方法写入数据
    hbaseContext.bulkPut[(String, Array[(String, String)])](rdd,
      TableName.valueOf("spark_hbase_bulk_put"),
      row => {
        val put = new Put(Bytes.toBytes(row._1))
        row._2.foreach(putValue => put.addColumn(
          Bytes.toBytes(SRC_FAMILYCOLUMN),
          Bytes.toBytes(putValue._2),
          Bytes.toBytes(putValue._1)))
        put
      })

  }
}

По запросу данные успешно записываются в HBase.