微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

IteratorX 极简的 JDBC/File 读取器

程序名称:IteratorX

授权协议: Apache-2.0

操作系统: 跨平台

开发语言: Java

IteratorX 介绍


IteratorX 是一个极简主义的 jdbc/file reader。

参见:

maven:https://mvnrepository.com/artifact/io.iteratorx/iteratorx

使用:

1. Reader: JdbcReader, FileReader

超简单,从 jdbc 或 file 中读取数据到 JSONObject 中。

1.1. JdbcReader:读取 jdbc

  // create jdbc reader
  final JdbcReader jdbcReader = new JdbcReader(
      new JdbcDataSourceBuilder().setUrl(jdbc:postgresql://10.23.112.2:3333/dbname)
          .setUser(username).setPassword(password).build());
  
  // fetch by iterable
  for (final JSONObject item : jdbcReader.read(select * from tablename)) {
    System.err.println(item);
  }
  
  // fetch all into one collection
  final Collection items = jdbcReader.readAll(select * from tablename where type = ?, param);
  for (final JSONObject item : items) {
    System.err.println(item);
  }

1.2. FileReader:读取 file

  // create file reader
  final FileReader fileReader = new FileReader();

  // fetch by iterable
  for (final JSONObject item : fileReader.read(new File(data.json), utf-8)) {
    System.err.println(item);
  }

  // fetch all into one collection
  final Collection items = fileReader.readAll(new File(data.json), utf-8);
  for (final JSONObject item : items) {
    System.err.println(item);
  }

2. Parallels: Threads, Flink, RxJava

简单易用的 多线程处理。

2.1. Threads: 使用 ThreadPool 并行处理

  // process each item parallelly using thread pool
  Threads.from(jdbcReader.read(select * from tablename)).forEach(item -> {
    System.err.println(item);
  });
  
  // process batch data parallelly
  Threads.from(jdbcReader.read(select * from tablename)).forBatch(items -> {
    for (final JSONObject item : items) {
      System.err.println(item);
    }
  });

2.2. Flink: 使用 Flink 并行处理

  // process each item parallelly using Flink engine
  Flink.from(jdbcReader.read(select * from tablename)).forEach(item -> {
    System.err.println(item);
  });
  
  // process batch data parallelly
  Flink.from(jdbcReader.read(select * from tablename)).forBatch(items -> {
    for (final JSONObject item : items) {
      System.err.println(item);
    }
  });
  
  // use DataSet directly to enable all Flink power
  Flink.from(jdbcReader.read(select * from tablename)).dataSet().distinct().count();

2.3. RxJava: 使用 RxJava 并行处理

  // process each item parallely using RxJava engine
  RxJava.from(jdbcReader.read(select * from tablename)).forEach(item -> {
    System.err.println(item);
  });
  
  // process batch data parallely
  RxJava.from(jdbcReader.read(select * from tablename)).forBatch(items -> {
    for (final JSONObject item : items) {
      System.err.println(item);
    }
  });
  
  // use Observable directly
  RxJava.from(jdbcReader.read(select * from tablename)).observable().distinct().count();

Bug:使用 RxJava 的时候,程序结束了停不下来,谁能帮忙解决吗?

参见:

maven:https://mvnrepository.com/artifact/io.iteratorx/iteratorx

IteratorX 官网

https://github.com/iteratorx/iteratorx

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐