spark访问hbase

import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat

import org.apache.spark.rdd.NewHadoopRDD

val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, "tmp")

var hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])

hBaseRDD.count()

import scala.collection.JavaConverters._

hBaseRDD.map(tuple => tuple._2).map(result => result.getColumn("cf".getBytes(), "val".getBytes())).map(keyValues => {
( keyValues.asScala.reduceLeft {
    (a, b) => if (a.getTimestamp > b.getTimestamp) a else b
  }.getRow,
  keyValues.asScala.reduceLeft {
    (a, b) => if (a.getTimestamp > b.getTimestamp) a else b
  }.getValue
)
}).take(10)

hBaseRDD.map(tuple => tuple._2).map(result => (result.getRow, result.getColumn("cf".getBytes(), "val".getBytes()))).map(row => {
(
  row._1.map(_.toChar).mkString,
  row._2.asScala.reduceLeft {
    (a, b) => if (a.getTimestamp > b.getTimestamp) a else b
  }.getValue.map(_.toChar).mkString
)
}).take(10)


conf.set(TableInputFormat.INPUT_TABLE, "test1")

//var hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])

hBaseRDD.map(tuple => tuple._2).map(result => (result.getRow, result.getColumn("lf".getBytes(), "app1".getBytes()))).map(row => if (row._2.size > 0) {
(
  row._1.map(_.toChar).mkString,
  row._2.asScala.reduceLeft {
    (a, b) => if (a.getTimestamp > b.getTimestamp) a else b
  }.getValue.map(_.toInt).mkString
)
}).take(10)

import java.nio.ByteBuffer
hBaseRDD.map(tuple => tuple._2).map(result => (result.getRow, result.getColumn("lf".getBytes(), "app1".getBytes()))).map(row => if (row._2.size > 0) {
(
  row._1.map(_.toChar).mkString,
  ByteBuffer.wrap(row._2.asScala.reduceLeft {
    (a, b) => if (a.getTimestamp > b.getTimestamp) a else b
  }.getValue).getLong
)
}).take(10)


//conf.set(TableInputFormat.SCAN_COLUMN_FAMILY, "lf")
conf.set(TableInputFormat.SCAN_COLUMNS, "lf:app1")

//var hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])

import java.nio.ByteBuffer
hBaseRDD.map(tuple => tuple._2).map(result => {
  ( result.getRow.map(_.toChar).mkString,
    ByteBuffer.wrap(result.value).getLong
  )
}).take(10)


val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, "test1")

var hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])

var rows = hBaseRDD.map(tuple => tuple._2).map(result => result.getRow.map(_.toChar).mkString)
rows.map(row => row.split("\\|")).map(r => if (r.length > 1) (r(0), r(1)) else (r(0), "") ).groupByKey.take(10)

网站标题:spark访问hbase
URL地址:http://bzwzjz.com/article/pdgpdg.html

其他资讯

Copyright © 2007-2020 广东宝晨空调科技有限公司 All Rights Reserved 粤ICP备2022107769号
友情链接: 成都企业网站设计 网站建设改版 成都网站制作 成都网站建设 成都网站建设 成都网站制作 定制网站设计 LED网站设计方案 自适应网站建设 上市集团网站建设 成都网站设计公司 成都商城网站建设 网站建设开发 手机网站制作 营销网站建设 古蔺网站建设 重庆网站建设 响应式网站建设 手机网站建设 专业网站建设 成都定制网站建设 重庆网站建设