微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Flink生产---15Redis、JedisUtil

Flink 项目中访问 Redis方法都是自己进行的实现,推荐使用 Bahir 连接器。

在本地单机情况下:

public static class RedisExampleMapper implements RedisMapper<Tuple2<String, String>>{
    @Override
    public RedisCommandDescription getCommandDescription() {
        return new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME");
    }
    @Override
    public String getKeyFromData(Tuple2<String, String> data) {
        return data.f0;
    }
    @Override
    public String getValueFromData(Tuple2<String, String> data) {
        return data.f1;
    }
}
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();
DataStream<String> stream = ...;
stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());

当然我们也可以使用在集群或者哨兵模式下使用 Redis 连接器。

集群模式:

FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
    .setNodes(new HashSet<InetSocketAddress>(Arrays.asList(new InetSocketAddress(5601)))).build();
DataStream<String> stream = ...;
stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
哨兵模式:
FlinkJedisSentinelConfig conf = new FlinkJedisSentinelConfig.Builder()
    .setMasterName("master").setSentinels(...).build();
DataStream<String> stream = ...;
stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());

object JedisUtil {
  def getJedisPool(properties: Properties): JedisPool = {
    val config = new JedisPoolConfig()
config.setBlockWhenExhausted(properties.getProperty("redis.blockWhenExhausted", "true").trim
    ().toBoolean)
	config.setevictionPolicyClassName(properties.getProperty("redis.evictionPolicyClassName",
      "org.apache.commons.pool2.impl.DefaultevictionPolicy").trim())


    config.setJmxenabled(properties.getProperty("redis.jmxenabled", "true").trim().toBoolean)

    config.setJmxNamePrefix(properties.getProperty("redis.jmxNamePrefix", "pool").trim())

    config.setLifo(properties.getProperty("redis.lifo", "true").trim().toBoolean)

    config.setMaxIdle(properties.getProperty("redis.maxIdle", "8").trim().toInt)

    config.setMaxTotal(properties.getProperty("redis.maxTotal", "8").trim().toInt)
   
    config.setMaxWaitMillis(properties.getProperty("redis.maxWaitMillis", "-1").trim().toInt)
	config.setMinevictableIdleTimeMillis(properties.getProperty("redis.minevictableIdleTimeMillis", "1800000").trim().toInt)

    config.setMinIdle(properties.getProperty("redis.minIdle", "0").trim().toInt)
    config.setNumTestsPerevictionRun(properties.getProperty("redis.numTestsPerevictionRun", "3")
      .trim().toInt)
    config.setSoftMinevictableIdleTimeMillis(properties.getProperty("redis.softMinevictableIdleTimeMillis", "1800000").trim().toInt)

    config.setTestOnBorrow(properties.getProperty("redis.testOnBorrow", "false").trim().toBoolean)

    config.setTestWhileIdle(properties.getProperty("redis.testWhileIdle", "false").trim().toBoolean)

    config.setTimeBetweenevictionRunsMillis(properties.getProperty("redis.timeBetweenevictionRunsMillis", "-1").trim().toInt)
    val host = properties.getProperty("redis.host").trim()
    val port = properties.getProperty("redis.port").trim().toInt
    val timeOut = properties.getProperty("redis.timeout").trim().toInt
    val password = properties.getProperty("redis.password").trim()
    new JedisPool(config, host, port, timeOut, password)
  }

}
 jedisPool = JedisUtil.getJedisPool(redisProperties)
 jedis = jedisPool.getResource
         pipeline = jedis.pipelined()
        for (i <- messageArray) {
          //          pipeline.hset("", i(0), i(1))
          //          pipeline.expire("",Int)
          pipeline.set(i(0), i(1), "NX", "EX", redisInPutTTL)
        }
        // pipeline.syncAndReturnAll()
        pipeline.sync()
        messageArray.clear()
        //        jedis.disconnect()

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐