Flink 项目中访问 Redis 的方法都是自己进行的实现,推荐使用 Bahir 连接器。
在本地单机情况下:
public static class RedisExampleMapper implements RedisMapper<Tuple2<String, String>>{
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME");
}
@Override
public String getKeyFromData(Tuple2<String, String> data) {
return data.f0;
}
@Override
public String getValueFromData(Tuple2<String, String> data) {
return data.f1;
}
}
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();
DataStream<String> stream = ...;
stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
当然我们也可以使用在集群或者哨兵模式下使用 Redis 连接器。
集群模式:
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
.setNodes(new HashSet<InetSocketAddress>(Arrays.asList(new InetSocketAddress(5601)))).build();
DataStream<String> stream = ...;
stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
哨兵模式:
FlinkJedisSentinelConfig conf = new FlinkJedisSentinelConfig.Builder()
.setMasterName("master").setSentinels(...).build();
DataStream<String> stream = ...;
stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
object JedisUtil {
def getJedisPool(properties: Properties): JedisPool = {
val config = new JedisPoolConfig()
config.setBlockWhenExhausted(properties.getProperty("redis.blockWhenExhausted", "true").trim
().toBoolean)
config.setevictionPolicyClassName(properties.getProperty("redis.evictionPolicyClassName",
"org.apache.commons.pool2.impl.DefaultevictionPolicy").trim())
config.setJmxenabled(properties.getProperty("redis.jmxenabled", "true").trim().toBoolean)
config.setJmxNamePrefix(properties.getProperty("redis.jmxNamePrefix", "pool").trim())
config.setLifo(properties.getProperty("redis.lifo", "true").trim().toBoolean)
config.setMaxIdle(properties.getProperty("redis.maxIdle", "8").trim().toInt)
config.setMaxTotal(properties.getProperty("redis.maxTotal", "8").trim().toInt)
config.setMaxWaitMillis(properties.getProperty("redis.maxWaitMillis", "-1").trim().toInt)
config.setMinevictableIdleTimeMillis(properties.getProperty("redis.minevictableIdleTimeMillis", "1800000").trim().toInt)
config.setMinIdle(properties.getProperty("redis.minIdle", "0").trim().toInt)
config.setNumTestsPerevictionRun(properties.getProperty("redis.numTestsPerevictionRun", "3")
.trim().toInt)
config.setSoftMinevictableIdleTimeMillis(properties.getProperty("redis.softMinevictableIdleTimeMillis", "1800000").trim().toInt)
config.setTestOnBorrow(properties.getProperty("redis.testOnBorrow", "false").trim().toBoolean)
config.setTestWhileIdle(properties.getProperty("redis.testWhileIdle", "false").trim().toBoolean)
config.setTimeBetweenevictionRunsMillis(properties.getProperty("redis.timeBetweenevictionRunsMillis", "-1").trim().toInt)
val host = properties.getProperty("redis.host").trim()
val port = properties.getProperty("redis.port").trim().toInt
val timeOut = properties.getProperty("redis.timeout").trim().toInt
val password = properties.getProperty("redis.password").trim()
new JedisPool(config, host, port, timeOut, password)
}
}
jedisPool = JedisUtil.getJedisPool(redisProperties)
jedis = jedisPool.getResource
pipeline = jedis.pipelined()
for (i <- messageArray) {
// pipeline.hset("", i(0), i(1))
// pipeline.expire("",Int)
pipeline.set(i(0), i(1), "NX", "EX", redisInPutTTL)
}
// pipeline.syncAndReturnAll()
pipeline.sync()
messageArray.clear()
// jedis.disconnect()
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。