Spark2.3 чтение и запись последней практики API Hbase2.0 (CURD)

Большие данные

предисловие

Последний код практики HBase2.x не был найден в блогах некоторых больших парней, а код, вставленный из книги, не может быть выполнен под новой версией, поэтому я написал эту практику и обновился со старой версии, такой как HBase 1.4. .2. Приходите, вы можете узнать из этой статьи, хотите ли вы использовать Spark для чтения и записи HBase2.0 API. ps: Пример, связанный с официальным сайтом, также сообщает об ошибке! (поскольку зависимость o(╯□╰)o не найдена)

среда кода

  • Spark 2.3.1 (все серии 2.2, 2.3.x должны быть доступны)
  • HBase 2.0.0 (несовместим с серией Hbase 1.x)
  • Сообщество IDEA 2019.1

Готов к работе

Таблица создания оболочки HBase

# hbase shell
> list //查看表
> create 'spark_hbase_src', 'info' //创建一张数据源表

> create 'spark_hbase_res', 'info' //创建一张结果表,用来写入计算结果

Вышеупомянутые две таблицы созданы, просто.

Подготовьте образцы данных

Модель данных: имитируйте записи о прохождении транспортных средств на дороге, которые представляют собой текстовые файлы в формате csv (txt).

  • 5 полей: номерной знак, цвет номерного знака, номер камеры, направление движения, время записи
  • Соответствует английскому языку: «число», «цвет», «устройство», «направление», «фото_время».
  • Пример данных: смоделированные данные приведены только для справки.
номерной знак цвет номерного знака идентификатор устройства направление движения время записи
Ю А12345 синий D12C01 север и юг 2019/10/16 12:00:00
Ю Б12121 желтый D13C06 север и юг 2019/10/10 12:11:00
Ю C66666 синий D15C08 Запад Восток 2019/10/29 12:09:00
Ю Д11111 синий D18C07 Север Юг 2019/10/18 12:15:00

Смоделируйте и сгенерируйте некоторые текстовые данные самостоятельно, загрузите их в hdfs или локально.

Зависимости Maven

HBase Server API

        <!-- Hbase server库 提供Hbase读写API-->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase.version}</version>
        </dependency>

Раньше мне нужна была только эта банка HBase, но на практике сообщалось об ошибке:

ошибка 1

  • Ошибка 1: не удалось импортировать org.apache.hadoop.hbase.mapreduce.TableInputFormat

Решение

Импортируйте этот пакет:

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-mapreduce</artifactId>
            <version>${hbase.version}</version>
        </dependency>

Ошибка 2

  • Ошибка 2: класс org.apache.htrace.SamplerBuilder не найден
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/htrace/SamplerBuilder
Caused by: java.lang.ClassNotFoundException: org.apache.htrace.SamplerBuilder
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	... 17 more

Решение

Импортируйте этот пакет:

        <!-- https://mvnrepository.com/artifact/org.apache.htrace/htrace-core -->
        <dependency>
            <groupId>org.apache.htrace</groupId>
            <artifactId>htrace-core</artifactId>
            <version>3.1.0-incubating</version>
        </dependency>

Искра и другие зависимости

Другие зависимости, такие как spark-core, добавляются сами по себе:

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
        </dependency>
        
        <!-- Spark核心库 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!--Spark sql库 提供DF类API -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

Spark пишет в HBase

Кодекс Практика

Ctrl+c самозагрузка:

import java.util.UUID

import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.SparkSession

import scala.util.Try

object SparkWriteHBase {
  val hbaseConfig = HBaseConfiguration.create()
  hbaseConfig.set("hbase.zookeeper.quorum", "zk地址1,zk地址2,zk地址3")
  hbaseConfig.set("hbase.zookeeper.property.clientPort", "2181")
  //根据自己集群设置如下一行配置值
  config.set("zookeeper.znode.parent","/hbase-unsecure")
  //在IDE中设置此项为true,避免出现"hbase-default.xml"版本不匹配的运行时异常
  hbaseConfig.set("hbase.defaults.for.version.skip", "true")

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("Spark-HBase")
      .master("local[2]")
      .getOrCreate()
    //读取的示例数据
    val data = spark.read.csv("hdfs://your-hdfs-host:8020/traffic.txt")
      .toDF("number", "color", "device", "direction", "photo_time")

    println("数据条数是:" + data.count())

    val SRC_FAMILYCOLUMN = "info"

    data.foreachPartition(p => {
      //获取HBase连接
      val hbaseConn = ConnectionFactory.createConnection(hbaseConfig)
      val resultTable = TableName.valueOf("spark_hbase_src")
      //获取表连接
      val table = hbaseConn.getTable(resultTable)
      p.foreach(r => {
        val put = new Put(Bytes.toBytes(UUID.randomUUID().toString))
        put.addColumn(Bytes.toBytes(SRC_FAMILYCOLUMN), Bytes.toBytes("number"), Bytes.toBytes(r.getString(0)))
        put.addColumn(Bytes.toBytes(SRC_FAMILYCOLUMN), Bytes.toBytes("color"), Bytes.toBytes(r.getString(1)))
        put.addColumn(Bytes.toBytes(SRC_FAMILYCOLUMN), Bytes.toBytes("device"), Bytes.toBytes(r.getString(2)))
        put.addColumn(Bytes.toBytes(SRC_FAMILYCOLUMN), Bytes.toBytes("direction"), Bytes.toBytes(r.getString(3)))
        put.addColumn(Bytes.toBytes(SRC_FAMILYCOLUMN), Bytes.toBytes("photo_time"), Bytes.toBytes(r.getString(4)))

        Try(table.put(put)).getOrElse(table.close()) //将数据写入HBase,若出错关闭table
      })
      table.close()
      hbaseConn.close()
    })
  }
}

Посмотреть результат операции записи

Сравнение объема данных до и после записи: 0 -> 1199:

Искра читает HBase

Кодекс Практика

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.SparkSession

object SparkReadHbase {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .appName("Spark-HBase")
      .master("local")
      .getOrCreate()

    val hbaseConfig = HBaseConfiguration.create()
    hbaseConfig.set("hbase.zookeeper.quorum", "zk地址1,zk地址2,zk地址3")
    hbaseConfig.set("hbase.zookeeper.property.clientPort", "2181")
    //在IDE中设置此项为true,避免出现"hbase-default.xml"版本不匹配的运行时异常
    hbaseConfig.set("hbase.defaults.for.version.skip", "true")
    hbaseConfig.set(TableInputFormat.INPUT_TABLE, "spark_hbase_src")

    val SRC_FAMILYCOLUMN = "info"

    //从hbase中读取RDD
    val hbaseRDD = spark.sparkContext.newAPIHadoopRDD(hbaseConfig,
      classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])

    import spark.implicits._

    hbaseRDD.map({ case (_, result) =>
      //      val key = Bytes.toString(result.getRow)
      val number = Bytes.toString(result.getValue(SRC_FAMILYCOLUMN.getBytes, "number".getBytes))
      val color = Bytes.toString(result.getValue(SRC_FAMILYCOLUMN.getBytes, "color".getBytes))
      val device = Bytes.toString(result.getValue(SRC_FAMILYCOLUMN.getBytes, "device".getBytes))
      val direction = Bytes.toString(result.getValue(SRC_FAMILYCOLUMN.getBytes, "direction".getBytes))
      val photo_time = Bytes.toString(result.getValue(SRC_FAMILYCOLUMN.getBytes, "photo_time".getBytes))
      (number, color, device, direction, photo_time)
    }).toDF("number", "color", "device", "direction", "photo_time").show(false)
  }
}

результат операции

Распечатать скриншот show() ~ успешно прочитать данные в HBase:


Пример официального сайта, наступившего на яму

Официальный сайтExample 36. HBaseContext Usage Exampleследующее:

val sc = new SparkContext("local", "test")
val config = new HBaseConfiguration()
...
val hbaseContext = new HBaseContext(sc, config)
  • понятия не имеюHBaseContextКакой пакет jar был введен, и какие зависимости Maven не указаны на официальном сайте! (Узнайте позже, что его можно скомпилировать из проекта, и mvn также предоставляет пакет jar версии 1.0)
  • new SparkContext("local", "test")Это обозначение уникально для этого пакета.Подробности следующие:

10.10.2019 Я скомпилировал этот исходный код, чтобы получить пакет jar, а официальный сайт mvn также предоставляет зависимость версии 1.0, которую можно использовать ↓↓↓ (портал)

Руководство по компиляции Hbase Spark Connector

提供spark读写hbase的api,可作为hbase-server库之外的另一种选择↑↑↑

Напишите HBase, используя Spark RDD

Основное отличие состоит в том, что используемые объекты конфигурационного файла различны.

saveAsHadoopDataset

Используя конфигурацию Hadoop JobConf, класс TableOutputFormat, используемый для инициализации JobConf, находится в пакете org.apache.hadoop.hbase.mapred.

saveAsNewAPIHadoopDataset

Настроенный с помощью конфигурации Hadoop, используемый класс TableInputFormat находится в пакете org.apache.hadoop.hbase.mapreduce.

Использование этих двух API аналогично, примеры следующие:

Код

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.sql.SparkSession

object SparkWriteHBaseByHadoopDataset {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder.appName("SparkWriteHBase2").master("local").getOrCreate()
    val sc = spark.sparkContext
    val tableName = "test_student"

    val config = HBaseConfiguration.create()
    config.set("hbase.zookeeper.quorum", "manager.bigdata,master.bigdata,worker.bigdata")
    config.set("hbase.zookeeper.property.clientPort", "2181")
    config.set("hbase.defaults.for.version.skip", "true")

    val inputDataRDD = sc.parallelize(Array("1,Jack,M,26", "2,Rose,M,17")) //模拟构建两行记录的RDD
    val rdd = inputDataRDD.map(_.split(',')).map { arr => {
      val put = new Put(Bytes.toBytes(arr(0))) //行健的值
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(arr(1))) //info:name列的值
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("gender"), Bytes.toBytes(arr(2))) //info:gender列的值
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(arr(3).toInt)) //info:age列的值
      (new ImmutableBytesWritable, put)
    }
    }

    // 初始化JobConf,TableOutputFormat 是 org.apache.hadoop.hbase.mapred 包下的
    val jobConf = new JobConf(config)
    jobConf.setOutputFormat(classOf[org.apache.hadoop.hbase.mapred.TableOutputFormat])
    rdd.saveAsHadoopDataset(jobConf)

    //TableInputFormat 是 org.apache.hadoop.hbase.mapreduce 包下的
    config.set(TableOutputFormat.OUTPUT_TABLE, tableName)
    val job = Job.getInstance(config)
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setOutputValueClass(classOf[Result])
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
    rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)
  }
}

Оба метода API вызываются внизуSparkHadoopWriterМетод записи объекта не имеет разницы в производительности.

Spark создает таблицу HBase

Пример основного кода API

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{ColumnFamilyDescriptorBuilder, ConnectionFactory, TableDescriptorBuilder}

  def main(args: Array[String]): Unit = {

    val hbaseConn = ConnectionFactory.createConnection(hbaseConfig)
    val admin = hbaseConn.getAdmin

    //如果不存在就创建表
    if (!admin.tableExists(TableName.valueOf("test_hb_new_api"))) {
      val desc = TableDescriptorBuilder.newBuilder(TableName.valueOf("test_hb_new_api"))
      //指定列簇 不需要创建列,列式存储不需要创建列
      val cf1 = ColumnFamilyDescriptorBuilder.newBuilder("cf1".getBytes()).build()
      desc.setColumnFamily(cf1)
      admin.createTable(desc.build())
    }

  }

Просто создайте таблицу, без участия искры, конечно же, его можно запустить и в коде Spark!

Устаревший API

val desc = new HTableDescriptor(TableName.valueOf("hb_test"))

//Эти API отмечены как устаревшие и будут удалены в HBase 3.0!

val hcd = new HColumnDescriptor("cf")

Spark удалить таблицу HBase (продолжить создание таблицы HBase)

Происхождение переменной admin находится в созданном коде. Удаление таблицы не имеет ничего общего со искрой, это просто вызов.

//drop table
admin.disableTable(table_name)
admin.deleteTable(table_name)

Операция SparkSQL HBase (SHC)

Ха ┐(゚~゚)┌, это будет добавлено через полгода (2020-05)

Инженеры Hortonworks привезли нам совершенно новый Apache Spark — Apache HBase Connector, далее именуемый SHC. С помощью этой библиотеки классов мы можем напрямую использовать Spark SQL для записи данных из DataFrame в HBase, а также использовать Spark SQL для запроса данных в HBase.

Взгляните, использование SHC, представленное большим парнем:SHC: эффективное чтение и запись HBase с помощью Spark SQL, в любом случае подобная информация каталога HBase должна быть написана от руки, примерно так:

val catalog = s"""{
  |"table":{"namespace":"default", "name":"iteblog", "tableCoder":"PrimitiveType"},
  |"rowkey":"key",
  |"columns":{
    |"col0":{"cf":"rowkey", "col":"id", "type":"int"},
    |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
    |"col2":{"cf":"cf2", "col":"col2", "type":"double"},
    |"col3":{"cf":"cf3", "col":"col3", "type":"float"},
    |"col4":{"cf":"cf4", "col":"col4", "type":"int"},
    |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
    |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
    |"col7":{"cf":"cf7", "col":"col7", "type":"string"},
    |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
  |}
|}""".stripMargin

На самом деле, toDF() после прочтения приведенных выше данных — это мир SparkSQL, так что не нужно так запутываться. (2020-0603)