IteratorX 介绍
IteratorX 是一个极简主义的 jdbc/file reader。
参见:
maven:https://mvnrepository.com/artifact/io.iteratorx/iteratorx
使用:
1. Reader: JdbcReader, FileReader
超简单,从 jdbc 或 file 中读取数据到 JSONObject 中。
1.1. JdbcReader:读取 jdbc
// create jdbc reader final JdbcReader jdbcReader = new JdbcReader( new JdbcDataSourceBuilder().setUrl(jdbc:postgresql://10.23.112.2:3333/dbname) .setUser(username).setPassword(password).build()); // fetch by iterable for (final JSONObject item : jdbcReader.read(select * from tablename)) { System.err.println(item); } // fetch all into one collection final Collection items = jdbcReader.readAll(select * from tablename where type = ?, param); for (final JSONObject item : items) { System.err.println(item); }
1.2. FileReader:读取 file
// create file reader final FileReader fileReader = new FileReader(); // fetch by iterable for (final JSONObject item : fileReader.read(new File(data.json), utf-8)) { System.err.println(item); } // fetch all into one collection final Collection items = fileReader.readAll(new File(data.json), utf-8); for (final JSONObject item : items) { System.err.println(item); }
2. Parallels: Threads, Flink, RxJava
简单易用的 多线程处理。
2.1. Threads: 使用 ThreadPool 并行处理
// process each item parallelly using thread pool Threads.from(jdbcReader.read(select * from tablename)).forEach(item -> { System.err.println(item); }); // process batch data parallelly Threads.from(jdbcReader.read(select * from tablename)).forBatch(items -> { for (final JSONObject item : items) { System.err.println(item); } });
2.2. Flink: 使用 Flink 并行处理
// process each item parallelly using Flink engine Flink.from(jdbcReader.read(select * from tablename)).forEach(item -> { System.err.println(item); }); // process batch data parallelly Flink.from(jdbcReader.read(select * from tablename)).forBatch(items -> { for (final JSONObject item : items) { System.err.println(item); } }); // use DataSet directly to enable all Flink power Flink.from(jdbcReader.read(select * from tablename)).dataSet().distinct().count();
2.3. RxJava: 使用 RxJava 并行处理
// process each item parallely using RxJava engine RxJava.from(jdbcReader.read(select * from tablename)).forEach(item -> { System.err.println(item); }); // process batch data parallely RxJava.from(jdbcReader.read(select * from tablename)).forBatch(items -> { for (final JSONObject item : items) { System.err.println(item); } }); // use Observable directly RxJava.from(jdbcReader.read(select * from tablename)).observable().distinct().count();
Bug:使用 RxJava 的时候,程序结束了停不下来,谁能帮忙解决吗?
参见:
maven:https://mvnrepository.com/artifact/io.iteratorx/iteratorx
IteratorX 官网
https://github.com/iteratorx/iteratorx
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。