微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Flink如何做维表关联?

使用

RichAsyncFunction 加 CacheBuilder

@H_502_4@ CacheBuilder.newBuilder() //最多存储10000条 .maximumSize(10000) //过期时间为1分钟 .expireAfterWrite(60, TimeUnit.SECONDS) .build(); @H_502_4@public class LRU extends RichAsyncFunction<String,Order> { private static final Logger LOGGER = LoggerFactory.getLogger(LRU.class); String table = "info"; Cache<String, String> cache = null; private HBaseClient client = null; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //创建hbase客户端 client = new HBaseClient("127.0.0.1","7071"); cache = CacheBuilder.newBuilder() //最多存储10000条 .maximumSize(10000) //过期时间为1分钟 .expireAfterWrite(60, TimeUnit.SECONDS) .build(); } @Override public void asyncInvoke(String input, ResultFuture<Order> resultFuture) throws Exception { JSONObject jsonObject = JSONObject.parSEObject(input); Integer cityId = jsonObject.getInteger("city_id"); String userName = jsonObject.getString("user_name"); String items = jsonObject.getString("items"); //读缓存 String cacheCityName = cache.getIfPresent(cityId); //如果缓存获取失败再从hbase获取维度数据 if(cacheCityName != null){ Order order = new Order(); order.setCityId(cityId); order.setItems(items); order.setUserName(userName); order.setCityName(cacheCityName); resultFuture.complete(Collections.singleton(order)); }else { client.get(new GetRequest(table,String.valueOf(cityId))).addCallback((Callback<String, ArrayList<keyvalue>>) arg -> { for (keyvalue kv : arg) { String value = new String(kv.value()); Order order = new Order(); order.setCityId(cityId); order.setItems(items); order.setUserName(userName); order.setCityName(value); resultFuture.complete(Collections.singleton(order)); cache.put(String.valueOf(cityId), value); } return null; }); } } }

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐