spark +hbase
Hbase:一个高可靠、高性能、面向列、可伸缩的分布式数据库,主要用来存储非结构化和半结构化的松散数据
基于Scala语言新建一个maven项目来测试如何读写Hbase中的数据
该项目将读取我们在介绍Hbase时创建的usr_beha表中的数据
该表包含了两个列族:attr和beha。
attr: attr列族主要存储用户属性数据,目前只包含了一个名为name的列
beha: beha列族主要存储用户的行为数据,目前只包含了一个名为watch的列
项目的任务:首先从该表中读取数据,然后计算每个名字的长度,并将长度作为一个新的列写入到attr列族下。
import org.apache.hadoop.hbase.{
HBaseConfiguration}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.spark._
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.util.Bytes
object SparkHbase {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local")
.setAppName("sparkHbase")
//设置不去验证输出设置,否则在写入数据到Hbase时出错
.set("spark.hadoop.validateOutputSpecs","false")
val sc = new SparkContext(sparkConf)
//(1)从Hbase中读取数据
//读取Hbase的配置
val conf = HBaseConfiguration.create()
//Hbase中的表,这个表要事先创建
val tablename = "usr_beha"//表名
conf.set(TableInputFormat.INPUT_TABLE, tablename)//设置读数据的表
//设置zooKeeper集群地址
conf.set("hbase.zookeeper.quorum", "localhost")//本地运行时,zookeeper运行在localhost
//设置zookeeper连接端口,默认2181
conf.set("hbase.zookeeper.property.clientPort", "2181")
//读取Hbase数据并转化成RDD
//这里借用了SparkContext的newAPIHadoopRDD方法来读取Hbase并创建RDD
//从该接口可以看出,生成的RDD中是<key, value>形式的键值对数据
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],//键值对中key值类型
classOf[org.apache.hadoop.hbase.client.Result])//返回键值对中value值类型
//通过RDD的foreach操作打印输出表中的每一行数据
//hbase返回的result对象封装了表的一行数据
//可以通过getRow和getValue等方法获取一行中的行键以及各个单元格中的数据
hBaseRDD.foreach {
case (_, result) => {
val id = Bytes.toString(result.getRow)//获取行键
//通过列族和列名获取列
val name = Bytes.toString(result.getValue("attr".getBytes, "name".getBytes))
val watch = Bytes.toString(result.getValue("beha".getBytes, "watch".getBytes))
println("id: "+id+", name: "+name+", watch: "+watch)
}
}
//通过RDD的map操作来计算每个名字的长度
val result = hBaseRDD.map(tuple=> {
val item=tuple._2//元组的第二个值即为hbase返回键值对中的result对象
val id=Bytes.toString(item.getRow)//获取行键
val name = Bytes.toString(item.getValue("attr".getBytes, "name".getBytes))//用户名字
(id, name, name.length())//返回一个元组
})
//(2)写入数据到Hbase中
//写入hbase时的写入配置
var resultConf = HBaseConfiguration.create()
//设置zooKeeper集群地址
resultConf.set("hbase.zookeeper.quorum", "localhost")
//设置zookeeper连接端口,默认2181
resultConf.set("hbase.zookeeper.property.clientPort", "2181")
//设置输出的Hbase表,这里我们仍然输出usr_beha表
resultConf.set(TableOutputFormat.OUTPUT_TABLE, "usr_beha")
var job = Job.getInstance(resultConf)
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
val hbaseOut = result.map(tuple=>{
val put = new Put(Bytes.toBytes(tuple._1))//行键
put.addColumn(Bytes.toBytes("attr"), //列族
Bytes.toBytes("name_length"),//列
Bytes.toBytes(tuple._3.toString()))//名字长度,转成String方便查看
(new ImmutableBytesWritable, put)
})
hbaseOut.saveAsNewAPIHadoopDataset(job.getConfiguration)//写入数据
sc.stop()
}
}
pom 文件
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.liu</groupId>
<artifactId>SparkHbase</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>1.5.0</version>
</dependency>
</dependencies>
</project>
文章评论