предисловие
Этот вопрос много искали в Интернете.Почему я должен обобщать его сам только потому, что моя компания недавно добавила HBase, а затем использовала Spark для чтения данных MySQL и записи их в HBase.Я столкнулся с некоторыми проблемами и побеспокоил его.для долго
Сейчас я подробно опишу процесс написания этой программы и как меняется код.
Запись процесса разработки
Теперь нам нужно сделать две вещи: во-первых, таблицы в MySQL нужно перенести в HBase, это полная синхронизация, а во-вторых, сделать инкрементную синхронизацию данных, которая сейчас не входит в наши требования.
1.1 Spark SQL читает данные в MySQL
Мой код ниже - это код Scala, очень простой, сначала подключитесь к MySQL, а затем получите его через DataFrame.
val url = "jdbc:mysql://xxx:xxx/xxx?characterEncoding=utf-8&useSSL=false"
val connectProperties = new Properties()
connectProperties.setProperty("user","xxx")
connectProperties.setProperty("password","xxx")
connectProperties.setProperty("driver","com.mysql.jdbc.Driver")
connectProperties.setProperty("partitionColumn","xxx")
val columnName = "Id"
val lowerBound = 1
val upperBound = 30
val numPartitions = 2
val tableName = "xxx"
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("JuejinDemo").setMaster("local[2]")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
val jdbcDF = spark.read.jdbc(url,
tableName,columnName,lowerBound,upperBound,numPartitions,connectProperties)
//展示表结构
jdbcDF.printSchema()
//展示数据
jdbcDF.show()
}
Несколько строк кода, кратко объясните параметры
// MySQL的URL
var url = "jdbc:mysql://xxx:xxx/xxx?characterEncoding=utf-8&useSSL=false"
// MySQL表名
val tableName = "xxx"
// 连接配置
var connectProperties = new Properties()
// MySQL用户名
connectProperties.setProperty("user","xxx")
// MySQL密码
connectProperties.setProperty("password","xxx")
// 驱动
connectProperties.setProperty("driver","com.mysql.jdbc.Driver")
val columnName = "Id"
// 从id为1开始读
val lowerBound = 1
// 下方解释
val upperBound = 100
val numPartitions = 2
Результат напечатан:
Текущие данные:
Примечание: параметры upperBound и numPartitions связаны,upperBound / numPartitions = сколько частей данных нужно записать в каждый раздел, так что лучше узнать какой общий объем данных, так как автор столкнулся с такой проблемой, общий объем данных 4000Вт, автор поставил upperBound=30млн, numPartitions=300, то на каждый раздел нужно записать Данные по 10 Вт.
Правило этого раздела заключается в том, что первые 299 разделов будут записывать данные 10 Вт, а последний 300-й раздел будет записывать данные 10 Вт + 4000 Вт-3000 Вт = 1010 Вт, что приводит к тому, что программа несколько раз выдает ошибку OOM и не может найти причину. памяти = 3G не хватает, так что будьте осторожны
Конечно, вы также можете использовать SQL для запроса MySQL в это время, а затем использовать результат запроса в качестве данных, которые вы хотите записать, потому что при использовании метода jdbc фиксируется чтение всей таблицы MySQL, поэтому будет неконтролируемым , код выглядит следующим образом
// 连接MySQL读取数据
val jdbcDF = spark.read.format("jdbc")
.option("url", "jdbc:mysql://" +host+ ":xxx/" +dbName+ "?characterEncoding=utf-8&useSSL=false")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", username)
.option("password", password)
.option("query","select * from " +tableName).load()
Здесь используется метод форматирования, и параметры, которые вам нужно предоставить, также очень просты, как и в jdbc выше.
1.2 Часть обработки данных
В настоящее время мы успешно прочитали данные, и интеграция с источником данных является первым шагом нашей разработки, а затем данные обрабатываются и отправляются в соответствующий нисходящий поток.На самом деле, многие из соответствующих нисходящих потоков предоставляются Поддержка API, нам нужно только преобразовать данные в формат, требуемый этим интегрированным API.
Здесь я использую атрибут dtype, где .var исходит непосредственно из массива (строка, строка)
Как вы можете видеть из вывода, наш массив типов записывает поля и типы полей в этой таблице MySQL, поэтому в настоящее время я могу использовать цикл для его обхода и обработки данных каждого поля, давайте сначала получим данные
В это время я преобразовал DataFrame в RDD для обработки.Я сначала использовал foreach для его вывода, а затем вместо этого использовал карту
Теперь, когда у нас есть данные, нам нужно отобразить их с помощью HBase.Хранилище HBase только что сказало, что это хранилище столбцов, которое является ключом строки, который соответствует нескольким полям нескольких семейств столбцов. Здесь я предполагаю, что есть только 1 информация о семействе столбцов. Теперь нам нужно следующее условие: rowkey, columnFamily = info, field, field value
Здесь я просто использую идентификатор MySQL в качестве значения rowkey и определяю метод getString, который должен получить значение, соответствующее этому полю, в соответствии с различными полями value_type этой строки данных, поскольку полей несколько, поэтому они используются как массив существует, я является индексом этого массива
/**
* 根据每个字段的字段类型调用不同的函数,将字段值转换为HBase可读取的字节形式
* 解决数字导入到HBase中变成乱码的问题
*
* @param value_type
* @param row
* @param i
* @return
*/
def getString(value_type: String, row: Row, i: Int): String = {
if (row != null && row.length != 0) {
var str = ""
if ("IntegerType" == value_type) {
str = row.getInt(i).toString
}
else if ("StringType" == value_type) {
str = row.getString(i)
}
else if ("FloatType" == value_type) {
str = row.getFloat(i).toString
}
else if ("DoubleType" == value_type) {
str = row.getDouble(i).toString
}
else if ("TimestampType" == value_type) {
str = row.getTimestamp(i).toString
}
str
}
else ""
}
Затем мы прямо сейчас вызываем вышеуказанный метод в rdd, вынимаем идентификатор и назначаем его для rowkey
// 遍历所有的字段
for (j <- 0 to (types.length-1)){
// 取出值为Id的那个字段
if (types.apply(j)._1 == "Id"){
// 将Id字段对应的值赋给rowKey
rowKey = getString(types.apply(j)._2,row,j)
println("rowkey的值为:"+rowKey)
}
}
Текущий результат выглядит следующим образом, потому что сейчас у меня есть только 2 фрагмента данных в этой таблице, поэтому есть только два ключа строки.
На данный момент нам нужно создать объект Put.Put — это объект, который нам нужно создавать при вставке данных в HBase.
Перекинув его на перевод Baidu, мы можем увидеть информацию о параметре
Чтобы создать новый объект Put, вам нужно передать rowkey, а затем вызвать метод addColumn. Здесь addColumn требует 4 параметра, первый массив byte[] — это rowkey, второй массив byte[] — это семейство столбцов, long — временная метка этой строки (это не нужно передавать нам самим), потому что hbase удаляет И обновленные данные просто помечаются, а не удаляются физически, обновление заключается в вставке данных с последней отметкой времени, а последний массив байтов - это значение, соответствующее этому полю.
// 列族名,定义在main方法外层即可
val columnFamily = "info"
val put = new Put(Bytes.toBytes(rowKey))
for (i <- 0 until row.size){
put.addColumn(Bytes.toBytes(columnFamily),
Bytes.toBytes(types.apply(i)._1), Bytes.toBytes(getString(types.apply(i)._2, row, i)))
//打印HBase中的数据
println("rowKey: "+rowKey+" , "+"columnFamily: "+columnFamily+" , "+
"column: "+types.apply(i)._1+" , "+"cell: "+getString(types.apply(i)._2, row, i))
}
Я приведу вам соответствующие распечатанные данные, и вы это очень четко увидите, потому что для addColumn требуется массив байтов, поэтому между сохраненными данными и распечатанными данными будут некоторые расхождения.
На этом этапе мы меняем foreach обратно на карту, а затем используем Put в качестве возвращаемого значения.Обратите внимание, что структура данных называется ImmutableBytesWritable
Я сделал скриншот всего результата, чтобы вы могли сравнить
2.3 Запись данных в нисходящий HBase
Этот шаг очень прост.Чтобы вызвать API, давайте добавим, что зависимости, которые мы использовали до сих пор, следующие:
import java.util.Properties
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants}
import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SparkSession}
Теперь начните писать, это фиксированная рутина
val sc = spark.sparkContext
sc.setLogLevel("ERROR")
val hadoopConfiguration: Configuration = sc.hadoopConfiguration
val hbaseTableName = "zbchatmsg"
hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, hbaseTableName)
val hbaseConf = getHBaseConf(hadoopConfiguration)
val job = new Job(hbaseConf)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
Метод getHBaseConf просто определяет адресную информацию zookeeper.
/**
* 获取 HBase相关参数
* @param hadoopConf
* @return
*/
def getHBaseConf(hadoopConf: Configuration): Configuration = {
val hbaseConf = HBaseConfiguration.create(hadoopConf)
// zookeeper的地址
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "")
// zookeeper的端口
hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "")
hbaseConf
}
Здесь я сначала удаляю оригинальный zbchatmsg после отключения, а потом создаю новый.После выполнения программы сканирую его, и я могу выводить свои данные.
На данный момент наша программа MySQLToHBase завершена.
2.4 Объясните проблему таблицы HBase и динамической передачи параметров
Мы можем написать метод самостоятельно и позволить программе помочь создать таблицу, когда таблица еще не создана, или мы можем создать таблицу заранее.
val hBaseTableName = "你的HBase表名"
/**
* 创建HBase表
* @param tableName 表名
*/
def createHTable(tableName: String, hBaseConf : Configuration) = {
val connection = ConnectionFactory.createConnection(hBaseConf)
val hBaseTableName = TableName.valueOf(tableName)
val admin = connection.getAdmin
if (!admin.tableExists(hBaseTableName)) {
val tableDesc = new HTableDescriptor(hBaseTableName)
tableDesc.addFamily(new HColumnDescriptor("info".getBytes))
admin.createTable(tableDesc)
}
connection.close()
}
Конечно, вы обнаружите, что наша текущая программа очень неудобна, потому что у нас должна быть производственная среда и тестовая среда, когда мы запускаем программу.Мы хотим передавать параметры при запуске программы, чтобы определить, какую среду программа должна использовать.Какую Таблица MySQL должна быть прочитана, в какую таблицу HBase должна быть записана.
Итак, мы начали меняться
2.4.1 CheckArguments для оценки рабочей среды
Во-первых, это среда выполнения. Я написал метод checkArguments для суждения. При получении dev это тестовая среда, а pro — производственная среда. В это время наш URL и connectProperties должны быть определены как
var url = new String
var connectProperties = new Properties()
Метод проверки аргументов:
/**
* 检验运行时参数的方法 --- args = pro or dev
* @param args
*/
def checkArguments(args: String): Unit ={
val runType = mutable.HashSet("dev", "pro")
if (args == null || args.length == 0 || !runType.contains(args)) {
throw new Exception("Illegal starting parameter ......")
}
if (args == "pro"){
url = ""
connectProperties.setProperty("user","")
connectProperties.setProperty("password","")
connectProperties.setProperty("driver","com.mysql.jdbc.Driver")
connectProperties.setProperty("partitionColumn","Id")
}
if (args == "dev"){
url = ""
connectProperties.setProperty("user","")
connectProperties.setProperty("password","")
connectProperties.setProperty("driver","com.mysql.jdbc.Driver")
connectProperties.setProperty("partitionColumn","Id")
}
}
Логика очень проста. Это коллекция, в которой хранятся dev и pro. Если вы не введете ни одного из этих двух, это будет ненормально. Если это правильно, URL, имя пользователя и пароль MySQL, соответствующие производству и тестированию в dev и pro соответственно Can
2.4.2 Небольшие изменения в getHBaseConf
/**
* 获取 HBase相关参数
* (这里需要修改 HConstants.ZOOKEEPER_QUORUM 与 "hbase.master" 参数)
*
* @param hadoopConf
* @return
*/
def getHBaseConf(hadoopConf: Configuration,runType:String): Configuration = {
val hbaseConf = HBaseConfiguration.create(hadoopConf)
if (runType== "prototest"){
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "")
hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")
}
if (runType == "dev"){
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "")
hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")
}
hbaseConf
}
2.4.3 основной метод
Добавьте следующий код
// 运行环境 dev or pro
val runType = args(0)
checkArguments(runType)
// MySQL表名
val tableName = args(1)
// HBase表名
val hbaseTableName = args(2)
upperBound = args(3).toInt
numPartitions = args(4).toInt
Поскольку upperBound и numPartitions управляют объемом данных, записываемых каждой из наших задач, их также можно передавать в качестве параметров во время выполнения. Например, чтобы запустить этот код в это время, я нажму в IDEA
Передайте необходимые параметры
нормально работать
2. Используйте Bulkload для загрузки данных в HBase.
2.1 Некоторые жалобы
При использовании этого метода нет смысла тестировать с маленькой таблицей, потому что с маленькой таблицей из десятков тысяч данных проблем нет, здесь я использую большую таблицу около 5000 Вт, а затем записываю запись о наступлении на яму. .
2.2 Обычное описание
Эта процедура предназначена для того, чтобы сначала поместить данные таблицы MySQL в соответствии с потребностями HFile.
(rowkey,Array(info,column,columnName))
Формат сначала считывается в HDFS, а затем напрямую сопоставляется с HBase.
Процедура BulkloadToHBase заключается в том, сколько полей существует в вашей таблице MySQL, когда она вставляется, она записывается в соответствии с (rowkey, columnName, field, fieldValue), поэтому, если в моей таблице есть фрагменты данных 5000W, есть 26 полей в таблица, а потом когда в программе задано 1000 партиций, подсчитано, что одна задача будет запускать 5W кусков данных, а потом 26 полей по 5W кусков данных, то есть всего 130W сообщений
На данный момент вы можете видеть, что каждая задача будет запускать сообщения объемом 130 Вт, поэтому для таблицы с большим количеством полей и большим количеством таблиц этот объем задачи очень удивителен.
2.3 Код Описание
На самом деле объяснять особо нечего, просто учтите, что запись в HDFS и запись в код HBase можно разделить.
import java.io.IOException
import java.sql.{DriverManager, SQLException}
import java.util.Properties
import com.dataserver.vzan.confmanager.JobProperties
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client.{ClusterConnection, ConnectionFactory, HRegionLocator, Put}
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles, TableOutputFormat}
import org.apache.hadoop.hbase.spark.{HBaseContext, KeyFamilyQualifier}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark._
import org.apache.spark.sql.{Row, SparkSession}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.control.Breaks
/**
* 需要传入的参数
* runType:dev or pro
* MySQL表名
* HBase表名
* example:pro zbchatmsg2003 zbchatmsg2003 100000000 1000 /user/hive/bulkload
*
* 需要手动修改的参数为 columnFamily,默认为 info
*/
object BulkLoadToHBase {
var path = new String
var url = new String
var connectProperties = new Properties()
val columnName = "Id"
val lowerBound = 1
var upperBound = 0
var numPartitions = 0
def main(args: Array[String]): Unit = {
// 运行环境 dev or pro
val runType = args(0)
checkArguments(runType)
// MySQL表名
val tableName = args(1)
// HBase表名
val hbaseTableName = args(2)
// 决定了每个task分配的数据量
upperBound = args(3).toInt
numPartitions = args(4).toInt
// HDFS的路径
path = args(5)
val sparkConf = new SparkConf().setAppName("BulkLoadToHBase")
.set("spark.shuffle.file.buffer","128")
.set("spark.reducer.maxSizeInFlight","96")
.set("spark.shuffle.io.maxRetries","10")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
val jdbcDF = spark.read.jdbc(url,
tableName,columnName,lowerBound,upperBound,numPartitions,connectProperties)
jdbcDF.printSchema()
var fields = jdbcDF.columns
// 可以通过这行代码去把不必要的字段给删除
//fields = fields.dropWhile(_ == "")
val types = jdbcDF.dtypes
val sc = spark.sparkContext
val hadoopConfiguration: Configuration = sc.hadoopConfiguration
hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, hbaseTableName)
val hbaseConf = getHBaseConf(hadoopConfiguration,runType)
//表不存在则建HBase临时表
createHTable(hbaseTableName, hbaseConf)
// 将DataFrame转换成BulkLoad需要的RDD形式
val data = jdbcDF.rdd.map(row => {
var rowkey = new String
// -----------------------这里开始写你的处理逻辑---------------------------------
// 简单把Id作为rowkey了
for (j <- 0 to (types.length-1)){
if (types.apply(j)._1 == "Id"){
rowkey = getString(types.apply(j)._2,row,j)
}
}
fields.map(field => {
val fieldValue = row.getAs[Any](field).toString
(Bytes.toBytes(rowkey),Array((Bytes.toBytes("info"), Bytes.toBytes(field), Bytes.toBytes(fieldValue))))
})
}).flatMap({array =>{
(array)
}})
//--------------------处理逻辑完成------------------------
//----------------------写入HDFS--------------------------
val hBaseContext = new HBaseContext(sc, hbaseConf)
hBaseContext.bulkLoad(data.map(record => {
val put = new Put(record._1)
record._2.foreach(putValue => put.addColumn(putValue._1, putValue._2, putValue._3))
put
}), TableName.valueOf(tableName), (t : Put) => putForLoad(t), path)
//----------------------写入HDFS--------------------------
//----------------------写入HBase----------------------------
val conn = ConnectionFactory.createConnection(hbaseConf)
val hbTableName = TableName.valueOf(hbaseTableName.getBytes())
val regionLocator = new HRegionLocator(hbTableName, classOf[ClusterConnection].cast(conn))
val realTable = conn.getTable(hbTableName)
HFileOutputFormat2.configureIncrementalLoad(Job.getInstance(), realTable, regionLocator)
// bulk load start
val loader = new LoadIncrementalHFiles(hbaseConf)
val admin = conn.getAdmin()
loader.doBulkLoad(new Path(path),admin,realTable,regionLocator)
sc.stop()
//----------------------写入HBase----------------------------
}
/**
* 获取 HBase相关参数
*
* @param hadoopConf
* @return
*/
def getHBaseConf(hadoopConf: Configuration,runType:String): Configuration = {
val hbaseConf = HBaseConfiguration.create(hadoopConf)
if (runType == "dev"){
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, JobProperties.DEV_ZOOKEEPER_QUORUM)
hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, JobProperties.ZOOKEEPER_CLIENT_PORT)
}
else {
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, JobProperties.PRO_ZOOKEEPER_QUORUM)
hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, JobProperties.ZOOKEEPER_CLIENT_PORT)
}
hbaseConf
}
/**
* 创建HBase表
* @param tableName 表名
*/
def createHTable(tableName: String, hBaseConf : Configuration) = {
val connection = ConnectionFactory.createConnection(hBaseConf)
val hBaseTableName = TableName.valueOf(tableName)
val admin = connection.getAdmin
if (!admin.tableExists(hBaseTableName)) {
val tableDesc = new HTableDescriptor(hBaseTableName)
tableDesc.addFamily(new HColumnDescriptor("info".getBytes))
admin.createTable(tableDesc)
}
connection.close()
}
/**
* 根据每个字段的字段类型调用不同的函数,将字段值转换为HBase可读取的字节形式
* 解决数字导入到HBase中变成乱码的问题
*
* @param value_type
* @param row
* @param i
* @return
*/
def getString(value_type: String, row: Row, i: Int): String = {
if (row != null && row.length != 0) {
var str = ""
if ("IntegerType" == value_type) {
str = row.getInt(i).toString
}
else if ("StringType" == value_type) {
str = row.getString(i)
}
else if ("FloatType" == value_type) {
str = row.getFloat(i).toString
}
else if ("DoubleType" == value_type) {
str = row.getDouble(i).toString
}
else if ("TimestampType" == value_type) {
str = row.getTimestamp(i).toString
}
str
}
else ""
}
/**
* Prepare the Put object for bulkload function.
* @param put The put object.
* @throws java.io.IOException
* @throws java.lang.InterruptedException
* @return Tuple of (KeyFamilyQualifier, bytes of cell value)
*/
@throws(classOf[IOException])
@throws(classOf[InterruptedException])
def putForLoad(put: Put): Iterator[(KeyFamilyQualifier, Array[Byte])] = {
val ret: mutable.MutableList[(KeyFamilyQualifier, Array[Byte])] = mutable.MutableList()
import scala.collection.JavaConversions._
for (cells <- put.getFamilyCellMap.entrySet().iterator()) {
val family = cells.getKey
for (value <- cells.getValue) {
val kfq = new KeyFamilyQualifier(CellUtil.cloneRow(value), family, CellUtil.cloneQualifier(value))
ret.+=((kfq, CellUtil.cloneValue(value)))
}
}
ret.iterator
}
def getResult(str: String): String = {
val resultStr = new StringBuffer
val loop = new Breaks
for (i <- 0 until str.length) {
val getChar = str.charAt(i)
loop.breakable{
if (getChar == '-'|| getChar == ':' ||
getChar == '.'|| getChar == ' ')
loop.break() else resultStr.append(getChar)
}
}
resultStr.toString
}
/**
* 检验运行时参数的方法 --- args = pro or dev
* @param args
*/
def checkArguments(args: String): Unit ={
val runType = mutable.HashSet("dev", "pro")
connectProperties.setProperty("driver","com.mysql.jdbc.Driver")
connectProperties.setProperty("partitionColumn","Id")
connectProperties.setProperty("fetchsize","1000")
if (args == null || args.length == 0 || !runType.contains(args)) {
throw new Exception("Illegal starting parameter ......")
}
if (args == "pro"){
url = JobProperties.pro_mysql_url
connectProperties.setProperty("user",JobProperties.pro_mysql_user)
connectProperties.setProperty("password",JobProperties.pro_mysql_password)
}
if (args == "dev"){
url = JobProperties.dev_mysql_url
connectProperties.setProperty("user",JobProperties.dev_mysql_user)
connectProperties.setProperty("password",JobProperties.dev_mysql_password)
}
}
}
2.4 Инструкция по эксплуатации
Посередине вкраплены несколько записей, поэтому есть таблица bigtable и таблица chatmsg2003.
Заходите на страницу HDFS, теперь файл начал записываться
Посмотрите еще раз на интерфейс Spark UI
Он начал нормально писать в HDFS
2.5 Возникшие проблемы
JOB этого пользовательского интерфейса Spark будет автоматически создавать 16 задач каждый раз, когда он заканчивает чтение MySQL и сохраняет файл массовой загрузки в HDFS, Определитель этих 16 задач должен определяться предварительным разделением в HBase.
Мой оператор создания таблицы HBase:
create 'bigtable','info',SPLITS=>
['10|','20|','30|','40|','50|','60|','70|','80|','90|',
'd0|','h0|','l0|','p0|','t0|','x0|']
В сумме получается всего 16. Некоторые задания в соленой рыбе, которые, возможно, придется решать позже.
И есть относительно большая проблема перекоса данных.Эти две задачи запускают большую часть данных.Когда вся задача завершена, программа начинает сообщать об ошибках, таких как
Проблема выручения, эта проблема давно решена, в интернете много решений, я видел одно, которое кажется более надежным, это модифицировать
hbase.hregion.max.filesize
Этот параметр, это максимальный размер, который может принять HFile.Если он превысит, он будет разделен.Получается, что этот параметр не имеет никакого отношения к ситуации, с которой я столкнулся, потому что я прочитал таблицу, которую я успешно выполнил после, это вот так
Это проблема, что файл не будет разделен, если файл не превышает 30G после изменения этого параметра
Другой параметр
hbase.bulkload.retries.number
Сказали, что количества сегментаций недостаточно, поэтому надо модифицировать параметр этой повторной попытки, когда он равен 0, она будет повторяться до тех пор, пока не получится.
Я посмотрел на это в кластереhbase.hregion.max.filesizeЗначение по умолчанию 10, я ставлю 30, а потом hbase.bulkload.retries.number ставлю 0, перезапускаю программу, либо сообщаю об ошибке, конечно, друзья, не перезапускайте всю программу , это слишком трудоёмко, я прямо разделил свою программу, потому что запись этого кода в HDFS и запись в HBase можно разделить
Здесь также есть небольшой эпизод, то есть, когда я запускал свою программу отдельно, я перенес свои файлы HDFS в другую папку, оставив только файл размером более 190 МБ, чтобы запустить его в одиночку, и обнаружил, что все еще сообщаю об ошибке
Exception in thread "main" java.io.IOException:
Retry attempted 30 times without completing, bailing out
Я отчаянно нуждаюсь в этой ошибке.
Я тоже пробовал читать маленькую таблицу в таком же формате, а в HDFS было всего 57М.Когда программа не разделена и запущена вместе, она может съесть мои файлы больше 190М при обработке своих 57М., очень странно, потому что мой файл 190M остался в HDFS, а файл 190M был прочитан в HBase кстати, когда программа работала.
Почему у меня возникла идея сделать это?Причина очень проста.Помните, что я говорил ранее, этот метод массовой загрузки не будет иметь никаких проблем при чтении небольшой таблицы, поэтому я просто делаю это.
Но когда я хочу использовать ту же процедуру, чтобы съесть оставшиеся файлы размером в несколько гигабайт или десятки гигабайт, она также терпит неудачу, и также сообщается об указанной выше ошибке.
Полное решение этой проблемы это модификация количества пре-разметок.До модификации кол-ва пре-разделов была еще проблема с этой программой.Потом после того, как я увеличил количество пре-разделов с 16 до 22, эта проблема спасения не возникнет.
Теперь задача запущена, а затем файл HDFS будет очищен, а затем сопоставлен с HBase здесь.
Я также столкнулся с проблемой с кучей JVM, но для этого нужно только вручную увеличить объем памяти вне кучи, отправив команду в программу Spark. Это вопрос ресурса.
Исходный код BulkLoadToHBase
Щелкните doBulkload, это исходный код пакета hbase-spark.
Если догадка secureClient верна, это должен быть прокси-объект клиента RPC. Это должен быть метод пакета hbase-spark, который вызывается как собственный метод.
Нажмите на метод подготовки, чтобы войти, и вы увидите два метода.
Метод DiscoverLoadQueue использует visitBulkHFiles для обхода нашего каталога HDFS, а затем выполняет серию проверок для каждого файла hfile, а размер одного файла hfile не должен превышатьHREGION_MAX_FILESIZE, значение определяется параметромhbase.hregion.max.filesizeУправление, по умолчанию 10 ГБ.
Метод validateFamiliesInHFiles в основном предназначен для проверки того, соответствует ли семейство столбцов, существующее в этом HFile, текущему семейству столбцов HBase.Я не читал его внимательно, но взгляну на его логику журнала печати.Во всяком случае, это то, что он делает.Вернитесь к doBulkLoad и нажмитеperformBulkload
Каждая итерация цикла while в основном выполняетсяgroupOrSplitа такжеbulkLoadдваphaseоперация:
groupOrSplitPhaseметод
Все файлы в очереди сгруппированы в соответствии с метаданными региона целевой таблицы, и каждый файл разделен на свой собственный регион.
groupOrSplitPhaseа такжеbulkLoadPhaseЯ не понял код, поэтому я Baidu
Если [firstkey, lastkey] h-файла не находится в диапазоне [starkey, endkey] любого региона, h-файл будет разделен на два файла (разделенные файлы имеют суффиксы .top и .bottom). является конечным ключом региона, в котором находится первый ключ.
Два h-файла, полученные после разделения, будут инкапсулированы в LQI и добавлены обратно в очередь LQI, поэтому необходим цикл while, чтобы определить, пуста ли очередь LQI. Следует отметить, что после разделения первый LQI обязательно будет в пределах определенного региона (если только целевая область не будет снова разделена перед загрузкой LQI в следующей итерации), а второй LQI, возможно, все же придется разделить.
После завершения groupOrSplitPhase все загружаемые LQI будут помещены в группы регионов. regionGroups — это Multimap, ключ — это начальный ключ региона, значение — это соответствующий LQI, а регион может соответствовать нескольким LQI.
думаю логика должна быть такой
bulkLoadPhase
Для каждого ключа в regionGroups (то есть начального ключа региона) вызовите метод tryAtomicRegionLoad, чтобы загрузить все соответствующие LQI в целевую таблицу. Если загрузка не удалась, добавьте неудачный LQI в очередь LQI,
Для следующего обнаружения и загрузки цикла (я думаю, это место для повторной попытки 17, 22, regionGroups.asMap().entrySet().iterator() должно быть числом предварительных разделов, и итератор regionGroup проходит это имя что-то вроде этого). Метод tryAtomicRegionLoad подключится к серверу региона hbase и отправит запрос SecureBulkLoadHFilesRequest.
Операции groupOrSplit и bulkLoad выполняются одновременно для всех файлов hfile через созданный выше пул потоков. В дополнение к операциям этих двух фаз в цикле while также обнаруживаются некоторые исключения:
Для цикла while(!queue.isEmpty) в doBulkLoad, если очередь LQI не пуста после попыток maxRetries, будет выдано исключение. maxRetries управляется параметром hbase.bulkload.retries.number, который по умолчанию равен 10:
Массовая загрузка сопоставит [firstkey, lastkey] файла hfile с [startkey, endkey] целевой области таблицы. Если совпадение не удается, файл будет разделен.
Finally
Позже он может быть изменен или дополнен по ситуации, если будут какие-то проблемы, надеюсь, вы меня поправите.
Другая проблема в том, что у bulkloadToHBase есть проблема многих запросов на чтение без причины.Пока я не знаю, что происходит.Если кто-то знает, пожалуйста, дайте мне знать.
В последнее время я управляю своей собственной планетой знаний, целью которой является помощь друзьям, которые хотят понять большие данные, начать работу с большими данными и развиваться вместе с разработчиками, которые занимались большими данными.Это бесплатно, но это не значит, что вы ничего не приобретете.Если вам интересно, вы можете добавить его, и он будет обновляться чаще.