微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

sparkstreaming对接kafka将数据批量插入数据库(java版本)

话不多说先上代码

import dbutils.Databases;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*;
import scala.Tuple2;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.sqlException;
import java.text.ParseException;
import java.util.*;

public class NB_Test {
    public static void main (String[] args) throws InterruptedException {
        //创建conf对象,context对象以及流context对象
        SparkConf conf = new SparkConf().setAppName("kafka_Spark").setMaster("local[4]");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(5));

        //创建map类型以传参
        Map<String,Object> kafkaParams = new HashMap<String, Object>();
        String brokers = "10.204.118.101:9092,10.204.118.102:9092,10.204.118.103:9092";
        kafkaParams.put("bootstrap.servers",brokers);
        kafkaParams.put("group.id","test-consumer");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer",StringDeserializer.class);

        HashSet topicSet = new HashSet<String>();
        topicSet.add("test");

        kafkaParams.put("auto.offset.reset","latest");
        kafkaParams.put("enable.auto.commit",false);
        JavaInputDStream<ConsumerRecord<String,String>> DStream = KafkaUtils.createDirectStream(
                ssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.Subscribe(topicSet,kafkaParams)
        );
        //将数据流转换为<k,v>键值对类型的数据流
        JavaPairDStream<String, String> kv = DStream.mapToPair(record -> new Tuple2<String, String>(record.key(), record.value()));
        kv.print();

        //此时得到开始时间戳
        long start = System.currentTimeMillis();
        //对流遍历rdd
        DStream.foreachRDD(rdd -> {
            OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd.rdd()).offsetRanges();
            //对每个rdd遍历分区
            rdd.foreachPartition(partitions->{
                OffsetRange o = offsetRanges[TaskContext.getPartitionId()];
                System.out.println(o.partition()+"                 "+o.fromOffset()+"                     "+o.untilOffset());
                //创建jdbc类对象
                Databases databases = new Databases();
                //使用类对象调用方法得到jdbc连接
                Connection connection = databases.getConnection();
                //设置false,即认为手动提交jdbc连接任务
                connection.setAutoCommit(false);
                //本地测试MysqL插入
                String sql ="insert into db_test  values(?,?,?,?,?,?,?,?,?)";
                PreparedStatement ps = connection.prepareStatement(sql);

                //针对不同key值创建不同集合用以存放数据
                List<String> listNull = new ArrayList<>();
                List<String> list1001 = new ArrayList<>();
                List<String> list1002 = new ArrayList<>();

                //对每个分区遍历,将符合条件的数据分别放到不同集合中便于后续逻辑操作
                partitions.forEachRemaining(line->{
                    //可以对不同key值判断然后执行不同逻辑处理操作
                   /* if(line._1()==null){
                        listNull.add(line._2().toString());
                    }else if(line._1().equals("1001")){
                        list1001.add(line._2().toString());
                    }else{
                        list1002.add(line._2().toString());
                    }*/
                   listNull.add(line.value());

                });
                //调用方法处理数据
                NB_Test.Batch_sqlServer(listNull.size(),2000,listNull,ps,connection);
                //每个分区使用完jdbc连接后,关闭连接
                databases.closeConn(connection);
            });
            ((CanCommitOffsets)DStream.inputDStream()).commitAsync(offsetRanges);
        });
        //插入数据结束时的时间戳
        long end = System.currentTimeMillis();
        System.out.println("程序执行时间为:"+(end-start)+"ms");
        ssc.start();
        ssc.awaitTermination();
    }


//批量插入
    /**
     * @param len           集合中总数据长度
     * @param batch         批量导入条数
     * @param list          数据集合
     * @param PS            sql提交对象
     * @param connection    jdbc连接对象
     * @throws sqlException 抛异常
     */
    public static void Batch_sqlServer(int len,int batch,List<String> list,PreparedStatement PS,Connection connection) throws sqlException, ParseException {
        //首先判定集合有数据
        if(len!=0){
            //循环次数
            int times = len/batch;
            for(int k =0;k<=times;k++){
                //判断是否刚好整数分批次
                if(len%batch==0 && k*batch<len){
                    //分段将不同批次数据循环批量插入
                    for(int m = batch*k;m<(k+1)*batch;m++){
                        String[] split = list.get(m).split(",");
                        System.out.println("m"+m);
                        PS.setInt(1,Integer.parseInt(split[0]));
                        PS.setString(2,split[1]);
                        PS.setInt(3,Integer.parseInt(split[2]));
                        PS.setString(4,split[3]);
                        PS.setString(5,split[4]);
                        PS.setString(6,split[5]);
                        PS.setString(7,split[6]);
                        PS.setString(8,split[7]);
                        PS.setString(9,split[8]);
                        PS.addBatch();
                    }
                    PS.executeBatch();
                    connection.commit();
                    //判断当集合长度不是批次整数倍时
                }else if(len%batch!=0){
                    //判断循环到此时是否数据够一个批次,当超过本次循环的批次数量时
                    if(k*batch<len-batch){
                        for(int n = k*batch;n<(k+1)*batch;n++){
                            String[] split1 = list.get(n).split(",");
                            System.out.println("n"+n);
                            PS.setInt(1,Integer.parseInt(split1[0]));
                            PS.setString(2,split1[1]);
                            PS.setInt(3,Integer.parseInt(split1[2]));
                            PS.setString(4,split1[3]);
                            PS.setString(5,split1[4]);
                            PS.setString(6,split1[5]);
                            PS.setString(7,split1[6]);
                            PS.setString(8,split1[7]);
                            PS.setString(9,split1[8]);
                            PS.addBatch();
                        }
                        PS.executeBatch();
                        connection.commit();
                        //判断当数据最后一批,即:不够一个次数量,但是还是一个批次导入
                    }else{
                        for(int p = k*batch;p<(k*batch+(len%batch));p++){
                            String[] split2 = list.get(p).split(",");
                            System.out.println("p"+p);
                            PS.setInt(1,Integer.parseInt(split2[0]));
                            PS.setString(2,split2[1]);
                            PS.setInt(3,Integer.parseInt(split2[2]));
                            PS.setString(4,split2[3]);
                            PS.setString(5,split2[4]);
                            PS.setString(6,split2[5]);
                            PS.setString(7,split2[6]);
                            PS.setString(8,split2[7]);
                            PS.setString(9,split2[8]);
                            PS.addBatch();
                        }
                        PS.executeBatch();
                        connection.commit();
                    }
                }
            }
        }
    }

哦了 ,大白一枚,把自己知道的写个博客以后慢慢看。。。哇咔咔

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐