ScalaHbase 使用scala 操作hbase

  • Post author:
  • Post category:其他


package DAO

import java.io.IOException
import java.util.List
import java.util.UUID
import java.util.Map
import java.util.concurrent.ConcurrentHashMap
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
import org.apache.hadoop.hbase.filter.FilterList
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter
import org.apache.hadoop.hbase.filter.SubstringComparator
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase._
import org.slf4j.Logger
import org.slf4j.LoggerFactory

object ScalaHbase {
  //private val LOG: Logger = LoggerFactory.getLogger(classOf[ScalaHbase])
  def LOG = LoggerFactory.getLogger(getClass)

  def getHbaseConf: Configuration = {
    val conf: Configuration = HBaseConfiguration.create
    conf.set("hbase.zookeeper.property.clientPort", "2181")
    conf.set("spark.executor.memory", "3000m")
    conf.set("hbase.zookeeper.quorum", Constant.HBASE_ZOOKEEPER)
    conf.set("hbase.master", Constant.HBASE_MASTER)
    conf.set("hbase.rootdir", Constant.HBASE_ROOTDIR)
    conf
  }

  @throws(classOf[MasterNotRunningException])
  @throws(classOf[ZooKeeperConnectionException])
  @throws(classOf[IOException])
  def createTable(conf: Configuration,hbaseconn: Connection, tablename: String, columnFamily: Array[String]) {
    val admin: HBaseAdmin = hbaseconn.getAdmin.asInstanceOf[HBaseAdmin]

    if (admin.tableExists(tablename)) {
      LOG.info(tablename+" Table exists!")
//      val tableDesc: HTableDescriptor = new HTableDescriptor(TableName.valueOf(tablename))
//      tableDesc.addCoprocessor("org.apache.hadoop.hbase.coprocessor.AggregateImplementation")
    }
    else {
      val tableDesc: HTableDescriptor = new HTableDescriptor(TableName.valueOf(tablename))
      tableDesc.addCoprocessor("org.apache.hadoop.hbase.coprocessor.AggregateImplementation")
      for ( i <- 0 to (columnFamily.length - 1)) {
          val columnDesc: HColumnDescriptor = new HColumnDescriptor(columnFamily(i));
          tableDesc.addFamily(columnDesc);
        }
      admin.createTable(tableDesc)
      LOG.info(tablename+" create table success!")
    }
    admin.close
  }

  @throws(classOf[IOException])
  def addRow(table: Table, rowKey: String, columnFamily: String, key: String, value: String) {
//    LOG.info("put '" + rowKey + "', '" + columnFamily + ":" + key + "', '" + value + "'")

    val rowPut: Put = new Put(Bytes.toBytes(rowKey))
    if (value == null) {
      rowPut.addColumn(columnFamily.getBytes, key.getBytes, "".getBytes)
    } else {
      rowPut.addColumn(columnFamily.getBytes, key.getBytes, value.getBytes)
    }
    table.put(rowPut)
  }

  @throws(classOf[IOException])
  def putRow(rowPut: Put,table: HTable, rowKey: String, columnFamily: String, key: String, value: String) {
    //    val rowPut: Put = new Put(Bytes.toBytes(rowKey))
//    println("put '" + rowKey + "', '" + columnFamily + ":" + key + "', '" + value + "'")
    if (value == null) {
      rowPut.add(columnFamily.getBytes, key.getBytes, "".getBytes)
    } else {
    rowPut.add(columnFamily.getBytes, key.getBytes, value.getBytes)
  }
    table.put(rowPut)

  }

  @throws(classOf[IOException])
  def getRow(table: HTable, rowKey: String): Result = {
    val get: Get = new Get(Bytes.toBytes(rowKey))
    val result: Result = table.get(get)
    return result
  }

  /**
   * 鎵归噺娣诲姞鏁版?
   *
   * @param list
   * @throws IOException
   */
  def addDataBatch(table: HTable, list: List[Put]) {
    try {
      table.put(list)
    }
    catch {
      case e: RetriesExhaustedWithDetailsException => {
        LOG.error(e.getMessage)
      }
      case e: IOException => {
        LOG.error(e.getMessage)
      }
    }
  }

  /**
   * 鏌ヨ鍏ㄩ儴
   */
  def queryAll(table: HTable) {
    val scan: Scan = new Scan
    try {
      val s = new Scan()
      val results = table.getScanner(s)

    }
    catch {
      case e: IOException => {
        LOG.error(e.toString)
      }
    }
  }

  def queryBySingleColumn(table: HTable, queryColumn: String, value: String, columns: Array[String]): ResultScanner = {
    if (columns == null || queryColumn == null || value == null) {
      return null
    }
    try {
      val filter: SingleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes(queryColumn), Bytes.toBytes(queryColumn), CompareOp.EQUAL, new SubstringComparator(value))
      val scan: Scan = new Scan
      for (columnName <- columns) {
        scan.addColumn(Bytes.toBytes(columnName), Bytes.toBytes(columnName))
      }
      scan.setFilter(filter)
      return table.getScanner(scan)
    }
    catch {
      case e: Exception => {
        LOG.error(e.toString)
      }
    }
    return null
  }

//  @throws(classOf[Exception])
//  def main(args: Array[String]) {
//    val conf: Configuration = ScalaConn.getHbaseConf
//    val table: HTable = new HTable(conf, "test")
//    var rowPut:Put = null
//    try {
//      val familyColumn: Array[String] = Array[String]("test1", "test2")
//      createTable(conf, "test", familyColumn)
//      val uuid: UUID = UUID.randomUUID
//      val s_uuid: String = uuid.toString
//      if(rowPut == null){
//        rowPut = new Put(Bytes.toBytes(s_uuid))
//      }
//      ScalaHbase.putRow(rowPut,table, s_uuid, "uuid", "col1", s_uuid)
//      rowPut = null;
//      ScalaHbase.getRow(table, s_uuid)
//    }
//    catch {
//      case e: Exception => {
//        if (e.getClass == classOf[MasterNotRunningException]) {
//          System.out.println("MasterNotRunningException")
//        }
//        if (e.getClass == classOf[ZooKeeperConnectionException]) {
//          System.out.println("ZooKeeperConnectionException")
//        }
//        if (e.getClass == classOf[IOException]) {
//          System.out.println("IOException")
//        }
//        e.printStackTrace
//      }
//    } finally {
//      if (null != table) {
//        table.close
//      }
//    }
//  }
  def dropTable(conf: Configuration, tableName: String) {
    try {
      val admin: HBaseAdmin = new HBaseAdmin(conf)
      admin.disableTable(tableName)
      admin.deleteTable(tableName)
    }
    catch {
      case e: MasterNotRunningException => {
        LOG.error(e.toString)
      }
      case e: ZooKeeperConnectionException => {
        LOG.error(e.toString)
      }
      case e: IOException => {
        LOG.error(e.toString)
      }
    }
  }
}



版权声明:本文为m0_37845836原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。