微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

kuduJavaApi操作和Spark操作Kudu代码总结

***** 首先 导入kudu java maven 依赖 *****
<!-- 指定仓库位置,依次为aliyun、cloudera和jboss仓库 -->
<repositories>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>jboss</id>
<url>http://repository.jboss.com/nexus/content/groups/public</url>
</repository>
</repositories>
<!-- 版本属性 -->
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<kudu.version>1.9.0-cdh6.2.1</kudu.version>
<junit.version>4.12</junit.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>${kudu.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client-tools</artifactId>
<version>${kudu.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
#----------------------------------------------代码区--------------------------------------------------------
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
--------------------------java 获取kudu连接--------------
/**
* 基于Java API对Kudu进行CRUD操作,包含创建表及删除表的操作
*/
public class KuduTableDemo {
// 声明KuduClient实例对象
private KuduClient kuduClient = null ;
@Before
public void init() {
// KuduMaster地址信息
String masteraddresses = "node2:7051" ;
// 初始化KuduClient实例对象
kuduClient = new KuduClient.KuduClientBuilder(masteraddresses)
// 设置对此Kudu进行操作时超时时间,认值为30s
.defaultOperationTimeoutMs(10000)
.build();
}
@Test
public void testKuduClient(){
System.out.println(kuduClient);
}
@After
public void close() throws KuduException {
// 测试完成以后,关闭连接
if(null != kuduClient) {
kuduClient.close();
}
}
#----------------------------------------------------------java Kudu创建哈希分区表---------------------------------------------
/**
* 用于构建Kudu表中每列的字段信息Schema
*
* @param name 字段名称
* @param type 字段类型
* @param isKey 是否为Key
* @return ColumnSchema对象
*/
private ColumnSchema newColumnSchema(String name, Type type, boolean isKey) {
// 创建ColumnSchemaBuilder实例对象
ColumnSchema.ColumnSchemaBuilder column = new ColumnSchema.ColumnSchemaBuilder(name, type);
// 设置是否为主键
column.key(isKey) ;
// 构建 ColumnSchema
return column.build() ;
}
/**
* 创建Kudu中的表,表的结构如下所示:
create table test_users(
id int,
name string,
age byte,
primary key(id)
)
paritition by hash(id) partitions 3
stored as kudu ;
*/
@Test
public void createKuduTable() throws KuduException {
// a. 定义Schema信息,列名称和列类型
List<ColumnSchema> columns = new ArrayList<>();
columns.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build());
columns.add(newColumnSchema("name", Type.STRING, false));
columns.add(newColumnSchema("age", Type.INT8, false));
Schema schema = new Schema(columns) ;
// b. 设置表的属性
CreateTableOptions options = new CreateTableOptions() ;
// 设置分区策略
options.addHashPartitions(Arrays.asList("id"), 3);
// 设置副本数目
options.setNumReplicas(1) ;
// c. 传递参数,创建表
/*
public KuduTable createTable(String name, Schema schema, CreateTableOptions builder)
*/
KuduTable kuduTable = kuduClient.createTable("test_users", schema, options);
System.out.println("Kudu Table ID = " + kuduTable.getTableId());
}
#-----------------------------java kudu 删除表---------------------
/**
* 判断表是否存在,如果存在,将表删除
*/
@Test
public void dropKuduTable() throws KuduException {
// 判断表是否存在
if(kuduClient.tableExists("test_users")){
// 传递表的名称,进行删除
kuduClient.deleteTable("test_users") ;
}
}
#---------------------------------------java kudu 插入数据----------
/**
* 将数据插入到Kudu Table中: INSERT INTO (id, name, age) VALUES (1001, "zhangsan", 26)
*/
@Test
public void insertKuduData() throws KuduException {
// a. 获取操作表句柄
KuduTable kuduTable = kuduClient.openTable("test_users");
// b. 获取KuduSession实例对象
KuduSession kuduSession = kuduClient.newSession();
// c. 插入数据,获取Insert对象
Insert insert = kuduTable.newInsert();
// d. 获取Row对象
PartialRow insertRow = insert.getRow();
// 设置值
insertRow.addInt("id", 1001);
insertRow.addString("name", "xiaohong");
insertRow.addByte("age", (byte)25);
// e. 插入数据
kuduSession.apply(insert);
// f. 关闭连接
kuduSession.close();
}
#-------------------------------- java kudu 批量插入数据-----------------------------------
/**
* 将数据插入到Kudu Table中: INSERT INTO (id, name, age) VALUES (1001, "zhangsan", 26)
*/
@Test
public void insertKuduData() throws KuduException {
// a. 获取操作表句柄
KuduTable kuduTable = kuduClient.openTable("test_users");
// b. 获取KuduSession实例对象
KuduSession kuduSession = kuduClient.newSession();
// 设置手动提交,刷新数据
kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
// 设置缓存数据量
kuduSession.setMutationBufferSpace(1000);
Random random = new Random();
for(int index = 0; index < 100; index ++){
// c. 插入数据,获取Insert对象
Insert insert = kuduTable.newInsert();
// d. 获取Row对象
PartialRow insertRow = insert.getRow();
// 设置值
insertRow.addInt("id", 100 + index);
insertRow.addString("name", "zhangsan-" + index);
insertRow.addByte("age", (byte)(random.nextInt(10) + 21));
// e. 插入数据
kuduSession.apply(insert);
}
// 手动提交
kuduSession.flush();
// f. 关闭连接
kuduSession.close();
}
#--------------------------------------------java kudu 查询数据-全量查询-----------------------------
/**
* 从Kudu表中全量加载数据
*/
@Test
public void queryKuduData() throws KuduException {
// 1. 获取表的句柄
KuduTable kuduTable = kuduClient.openTable("test_users");
// 2. 获取扫描器对象
KuduScanner.KuduScannerBuilder scannerBuilder = kuduClient.newScannerBuilder(kuduTable);
KuduScanner kuduScanner = scannerBuilder.build();
// 3. 遍历获取的数据
int index = 0 ;
while (kuduScanner.hasMoreRows()){ // 判断是否还有表的Tablet数据为获取
index += 1;
System.out.println("tablet index = " + index);
// 获取每个tablet中扫描的数据
RowResultIterator rowResults = kuduScanner.nextRows();
// 遍历每个Tablet中数据
while (rowResults.hasNext()){
RowResult rowResult = rowResults.next();
System.out.println(
"id = " + rowResult.getInt("id")
+ ", name = " + rowResult.getString("name")
+ ", age = " + rowResult.getByte("age")
);
}
}
}
#---------------------------------------------java kudu 过滤查询 -------------------------------------
/**
* 从Kudu表中全量加载数据
*/
@Test
public void queryKuduData() throws KuduException {
// 1. 获取表的句柄
KuduTable kuduTable = kuduClient.openTable("test_users");
// 2. 获取扫描器对象
KuduScanner.KuduScannerBuilder scannerBuilder = kuduClient.newScannerBuilder(kuduTable);
// Todo: 设置过滤条件
/*
查询id和age两个字段的值,年龄age小于25,id大于150
*/
// Todo: 查询id和age两个字段
scannerBuilder.setProjectedColumnNames(Arrays.asList("id", "age"));
// Todo: 年龄age小于25,id大于150
scannerBuilder.addPredicate(
KuduPredicate.newComparisonPredicate(
newColumnSchema("id", Type.INT32, true),
KuduPredicate.Comparisonop.GREATER,
150
)
);
scannerBuilder.addPredicate(
KuduPredicate.newComparisonPredicate(
newColumnSchema("age", Type.INT8, false),
KuduPredicate.Comparisonop.LESS,
(byte)25
)
);
KuduScanner kuduScanner = scannerBuilder.build();
// 3. 遍历获取的数据
int index = 0 ;
while (kuduScanner.hasMoreRows()){ // 判断是否还有表的Tablet数据为获取
index += 1;
System.out.println("tablet index = " + index);
// 获取每个tablet中扫描的数据
RowResultIterator rowResults = kuduScanner.nextRows();
// 遍历每个Tablet中数据
while (rowResults.hasNext()){
RowResult rowResult = rowResults.next();
System.out.println(
"id = " + rowResult.getInt("id")
+ ", age = " + rowResult.getByte("age")
);
}
}
}
#----------------------------------------- kuud 更新删除数据 ---------------------------------------
/**
* 更新Kudu表中数据
*/
@Test
public void updateKuduData() throws KuduException {
// a. 获取操作表句柄
KuduTable kuduTable = kuduClient.openTable("test_users");
// b. 获取KuduSession实例对象
KuduSession kuduSession = kuduClient.newSession();
// c. 获取更新数据update对象
Update newUpdate = kuduTable.newUpdate();
// 获取Row对象
PartialRow updateRow = newUpdate.getRow();
// 设置更新的数据
updateRow.addInt("id", 153);
updateRow.addString("name", "张三疯");
// e. 更新数据
kuduSession.apply(newUpdate);
// f. 关闭连接
kuduSession.close();
}
####常用 upsert 主键存在时更新数据,不存在时插入数据 ------------------------------------
/**
* 更新Kudu表中数据
*/
@Test
public void upsertKuduData() throws KuduException {
// a. 获取操作表句柄
KuduTable kuduTable = kuduClient.openTable("test_users");
// b. 获取KuduSession实例对象
KuduSession kuduSession = kuduClient.newSession();
// c. 获取更新数据update对象
Upsert newUpsert = kuduTable.newUpsert();
// 获取Row对象
PartialRow upsertRow = newUpsert.getRow();
// 设置更新的数据
upsertRow.addInt("id", 253);
upsertRow.addString("name", "张疯");
upsertRow.addByte("age", (byte)50);
// e. 更新数据
kuduSession.apply(newUpsert);
kuduSession.flush();
// f. 关闭连接
kuduSession.close();
}
################################## kudu 按照主键id 删除数据**************************
/**
* 删除Kudu表中数据
*/
@Test
public void deleteKuduData() throws KuduException {
// a. 获取操作表句柄
KuduTable kuduTable = kuduClient.openTable("test_users");
// b. 获取KuduSession实例对象
KuduSession kuduSession = kuduClient.newSession();
// c. 获取删除数据对象
Delete newDelete = kuduTable.newDelete();
// 获取Row对象
PartialRow deleteRow = newDelete.getRow();
// 设置主键
deleteRow.addInt("id", 253);
// e. 更新数据
kuduSession.apply(newDelete);
kuduSession.flush();
// f. 关闭连接
kuduSession.close();
}
#--------------------------------------------------- kudu 创建范围分区表 --------------------------------------------------
/**
* 创建Kudu中的表,采用对id进行Range范围分区
*/
@Test
public void createKuduTableByRange() throws KuduException {
// a. 定义Schema信息,列名称和列类型
List<ColumnSchema> columns = new ArrayList<>();
columns.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build());
columns.add(newColumnSchema("name", Type.STRING, false));
columns.add(newColumnSchema("age", Type.INT8, false));
Schema schema = new Schema(columns) ;
// b. 设置表的属性
CreateTableOptions options = new CreateTableOptions() ;
// 设置分区策略
options.setRangePartitionColumns(Arrays.asList("id")); // 设置范围分区字段名称
/*
id < 100
100 <= id < 500
id > 500
*/
// id < 100
PartialRow upper100 = new PartialRow(schema);
upper100.addInt("id", 100);
options.addRangePartition(new PartialRow(schema), upper100);
// 100 <= id < 500 
PartialRow lower100 = new PartialRow(schema);
lower100.addInt("id", 100);
PartialRow upper500 = new PartialRow(schema);
upper500.addInt("id", 500);
options.addRangePartition(lower100, upper500);
// id > 500
PartialRow lower500 = new PartialRow(schema);
lower500.addInt("id", 500);
options.addRangePartition(lower500, new PartialRow(schema));
// 设置副本数目
options.setNumReplicas(1) ;
// c. 传递参数,创建表
/*
public KuduTable createTable(String name, Schema schema, CreateTableOptions builder)
*/
KuduTable kuduTable = kuduClient.createTable("test_users_range", schema, options);
System.out.println("Kudu Table ID = " + kuduTable.getTableId());
}
#----------------------------------------------- kudu 创建多级分区表 --------------------------------------------------------
/**
* 创建Kudu中的表,采用多级分区策略,结合哈希分区和范围分区组合使用
*/
@Test
public void createKuduTableMulti() throws KuduException {
// a. 构建表的Schema信息
List<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();
columnSchemas.add(newColumnSchema("id", Type.INT32, true)) ;
columnSchemas.add(newColumnSchema("age", Type.INT8, true)) ;
columnSchemas.add(newColumnSchema("name", Type.STRING, false)) ;
// 定义Schema信息
Schema schema = new Schema(columnSchemas) ;
// b. Kudu表的分区策略及分区副本数目设置
CreateTableOptions tableOptions = new CreateTableOptions() ;
// Todo: e.1. 设置哈希分区
List<String> columnsHash = new ArrayList<>() ;
columnsHash.add("id") ;
tableOptions.addHashPartitions(columnsHash, 5) ;
// Todo: e.2. 设值范围分区
/*
age 做 range分区,分3个区
- < 21(小于等于20岁)
- 21 - 41(21岁到40岁)
- 41(41岁以上,涵盖41岁)
*/
List<String> columnsRange = new ArrayList<>() ;
columnsRange.add("age") ;
tableOptions.setRangePartitionColumns(columnsRange) ;
// 添加范围分区
PartialRow upper21 = new PartialRow(schema) ;
upper21.addByte("age", (byte)21);
tableOptions.addRangePartition(new PartialRow(schema), upper21) ;
// 添加范围分区
PartialRow lower21 = new PartialRow(schema) ;
lower21.addByte("age", (byte)21);
PartialRow upper41 = new PartialRow(schema) ;
upper41.addByte("age", (byte)41);
tableOptions.addRangePartition(lower21, upper41) ;
// 添加范围分区
PartialRow lower41 = new PartialRow(schema) ;
lower41.addByte("age", (byte)41);
tableOptions.addRangePartition(lower41, new PartialRow(schema)) ;
// 副本数设置
tableOptions.setNumReplicas(1) ;
// c. 在Kudu中创建表
KuduTable userTable = kuduClient.createTable("test_users_multi", schema, tableOptions);
System.out.println(userTable.toString());
}
#--------------------------------------- kudu 添加列 --------------------------------------
/**
* 对Kudu中表进行修改增加列:address,String
*/
@Test
public void alterKuduTableAddColumn() throws KuduException {
// 添加列
AlterTableOptions ato = new AlterTableOptions() ;
ato.addColumn("address",Type.STRING, "shanghai");
// 修改表
AlterTableResponse response = kuduClient.alterTable("test_users", ato);
System.out.println(response.getTableId());
}
#---------------------------------------------- kudu 删除列 -------------------------------------------
/**
* 对Kudu中表进行修改删除列:address
*/
@Test
public void alterKuduTableDropColumn() throws KuduException {
// 添加列
AlterTableOptions ato = new AlterTableOptions() ;
ato.dropColumn("address");
// 修改表
AlterTableResponse response = kuduClient.alterTable("test_users", ato);
System.out.println(response.getTableId());
}
}
======================kudu spark 集成 较为常用且方便========
####导入maven 依赖
<!-- 指定仓库位置,依次为aliyun、cloudera和jboss仓库 -->
<repositories>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>jboss</id>
<url>http://repository.jboss.com/nexus/content/groups/public</url>
</repository>
</repositories>
<!-- 版本属性 -->
<properties>
<scala.version>2.11.12</scala.version>
<scala.binary.version>2.11</scala.binary.version>
<spark.version>2.4.0-cdh6.2.1</spark.version>
<hadoop.version>3.0.0-cdh6.2.1</hadoop.version>
<kudu.version>1.9.0-cdh6.2.1</kudu.version>
</properties>
<!-- 依赖jar包 -->
<dependencies>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client-tools</artifactId>
<version>${kudu.version}</version>
</dependency>
<!-- Kudu Client 依赖包 -->
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>${kudu.version}</version>
</dependency>
<!-- Junit 依赖包 -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kudu/kudu-spark2 -->
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-spark2_2.11</artifactId>
<version>${kudu.version}</version>
</dependency>
<!-- 依赖Scala语言 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- Spark Core 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Spark sql 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Hadoop Client 依赖 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
<build>
<outputDirectory>target/classes</outputDirectory>
<testOutputDirectory>target/test-classes</testOutputDirectory>
<resources>
<resource>
<directory>${project.basedir}/src/main/resources</directory>
</resource>
</resources>
<!-- Maven 编译的插件 -->
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
#---------------------------------------------------------------------------- 代码区 --------------------------------------------------------------
import java.util
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.spark.kudu.KuduContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
----------------------------------------------------------------------------------------------------------------------------------------------------------
/**
* Kudu与Spark集成,使用KuduContext创建表和删除表
*/
object KuduSparkTableDemo {
/**
* 创建Kudu表,指定名称
*
* @param tableName 表的名称
* @param kuduContext KuduContext实例对象
*/
def createKuduTable(tableName: String, kuduContext: KuduContext): Unit = {
// a. 表的Schema信息
val schema: StructType = StructType(
Array(
StructField("id", IntegerType, nullable = false),
StructField("name", StringType, nullable = true),
StructField("age", IntegerType, nullable = true),
StructField("gender", StringType, nullable = true)
)
)
// b. 表的主键
val keys: Seq[String] = Seq("id")
// c. 创建表的选项设置
val options: CreateTableOptions = new CreateTableOptions()
options.setNumReplicas(1)
options.addHashPartitions(util.Arrays.asList("id"), 3)
// 调用创建表方法
/*
def createTable(
tableName: String,
schema: StructType,
keys: Seq[String],
options: CreateTableOptions
): KuduTable
*/
val kuduTable = kuduContext.createTable(tableName, schema, keys, options)
println("Kudu Table ID: " + kuduTable)
}
/**
* 删除Kudu中表
* @param tableName 表的名称
* @param kuduContext KuduContext实例对象
*/
def dropKuduTable(tableName: String, kuduContext: KuduContext) = {
// 判断表是否存在,如果存在,就删除表
if(kuduContext.tableExists(tableName)){
kuduContext.deleteTable(tableName)
}
}
def main(args: Array[String]): Unit = {
// 1. 构建SparkSession实例对象
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[2]")
.config("spark.sql.shuffle.partitions", "2")
.getorCreate()
import spark.implicits._
// Todo: 创建KuduContext对象
val kuduContext: KuduContext = new KuduContext("node2:7051", spark.sparkContext)
println(s"KuduContext: ${kuduContext}")
// 任务1: 创建表
//createKuduTable("kudu_test_users", kuduContext)
// 任务2: 删除表
dropKuduTable("kudu_test_users", kuduContext)
// 应用结束,关闭资源
spark.stop()
}
}
------------------------------------------------------------ CRUD 操作-----------------------------------------------------------------
/**
* 对Kudu表的数据,进行CRUD操作
*/
object KuduSparkDataDemo {
/**
* 向Kudu表中插入数据
*/
def insertData(spark: SparkSession, kuduContext: KuduContext, tableName: String): Unit = {
// a. 模拟产生数据
// Todo: 当RDD或Seq中数据类型为元组时,直接调用toDF,指定列名称,转换为DataFrame
val usersDF: DataFrame = spark.createDataFrame(
Seq(
(1001, "zhangsan", 23, "男"),
(1002, "lisi", 22, "男"),
(1003, "xiaohong", 24, "女"),
(1004, "zhaoliu2", 33, "男")
)
).toDF("id", "name", "age", "gender")
// b. 将数据保存至Kudu表
kuduContext.insertRows(usersDF, tableName)
}
def main(args: Array[String]): Unit = {
// 1. 构建SparkSession实例对象
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[2]")
.config("spark.sql.shuffle.partitions", "2")
.getorCreate()
import spark.implicits._
// Todo: 创建KuduContext对象
val kuduContext: KuduContext = new KuduContext("node2:7051", spark.sparkContext)
//println(s"KuduContext: ${kuduContext}")
val tableName = "kudu_test_users"
// 插入数据
insertData(spark, kuduContext, tableName)
// 查询数据
//selectData(spark, kuduContext, tableName)
// 更新数据
//updateData(spark, kuduContext, tableName)
// 插入更新数据
//upsertData(spark, kuduContext, tableName)
// 删除数据
//deleteData(spark, kuduContext, tableName)
// 应用结束,关闭资源
spark.stop()
}
}
-------------------------------------------------------------------------查询数据将数据封装到 RDD 数据集---------------------------------------------------------------------------------------
/**
* 从Kudu表中读取数据,封装到RDD数据集
*/
def selectData(spark: SparkSession, kuduContext: KuduContext, tableName: String): Unit = {
/*
def kuduRDD(
sc: SparkContext,
tableName: String,
columnProjection: Seq[String] = Nil,
options: KuduReadOptions = KuduReadOptions()
): RDD[Row]
*/
val kuduRDD: RDD[Row] = kuduContext.kuduRDD(spark.sparkContext, tableName, Seq("name", "age"))
// 遍历数据
kuduRDD.foreach{row =>
println(
"name = " + row.getString(0) + ", age = " + row.getInt(1)
)
}
}
-----------------------------------------------------------------基于Sparksql提供外部数据源方式从Kudu数据库中加载load和保存save数据,封装DataFrame中--------------------------------
/**
* 编写Sparksql程序,从Kudu表加载load数据,进行转换,最终保存到Kudu表中。
*/
object KuduSparksqlDemo {
def main(args: Array[String]): Unit = {
// 1. 构建SparkSession实例对象
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[2]")
.config("spark.sql.shuffle.partitions", "2")
.getorCreate()
import spark.implicits._
// Todo: 2. 从Kudu表加载数据
val kuduDF: DataFrame = spark.read
.format("kudu")
.option("kudu.table", "kudu_test_users")
.option("kudu.master", "node2.test.cn:7051")
.load()
//kuduDF.printSchema()
//kuduDF.show(10, truncate = false)
/*
+----+--------+---+------+
|id |name |age|gender|
+----+--------+---+------+
|1001|zhangsan|23 |男 | -> M
|1002|lisi |22 |男 |
|1004|zhaoliu2|33 |男 |
|1003|xiaohong|24 |女 | -> F
+----+--------+---+------+
*/
// 自定义UDF函数,转换gender性别
val gender_to_udf: UserDefinedFunction = udf(
(gender: String) => {
gender match {
case "男" => "M"
case "女" => "F"
case _ => "M"
}
}
)
// Todo调用UDF函数,进行转换
val etlDF: DataFrame = kuduDF.select(
$"id", $"name", //
$"age".plus(1).as("age"),
gender_to_udf($"gender").as("gender")
)
//etlDF.printSchema()
//etlDF.show(10, truncate = false)
// Todo: 保存数据到Kudu表
etlDF.write
.mode(SaveMode.Append)
.format("kudu")
.option("kudu.table", "kudu_test_users")
.option("kudu.master", "node2:7051")
.option("kudu.operation", "upsert")
.save()
// 应用结束,关闭资源
spark.stop()
}
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐