Giter Club home page Giter Club logo

reactor-ql's Introduction

用SQL来描述ReactorAPI进行数据处理

Codacy Badge Maven Central Maven metadata URL Build Status codecov

Reactor + JSqlParser = ReactorQL

场景

  1. 规则引擎,在线编写SQL来定义数据处理规则.
  2. 实时统计每分钟平均温度.
  3. 统计每20条滚动数据平均值.
  4. ........

特性

  1. 支持字段映射 select name username from user.
  2. 支持聚合函数 count,sum,avg,max,min.
  3. 支持运算 select val/100 percent from cpu_usage.
  4. 支持分组 select sum(val) sum from topic group by interval('10s'). 按时间分组.
  5. 支持多列分组 select count(1) total,productId,deviceId from messages group by productId,deviceId.
  6. 支持having select avg(temp) avgTemp from temps group by interval('10s') having avgTemp>10 .
  7. 支持case when select case type when 1 then '警告' when 2 then '故障' else '其他' end type from topic.
  8. 支持Join select t1.name,t2.detail from t1,t2 where t1.id = t2.id.
  9. 响应式:数据源,函数执行都是异步非阻塞。

例子

引入依赖

<dependency>
 <groupId>org.jetlinks</groupId>
    <artifactId>reactor-ql</artifactId>
    <version>{version}</version>
</dependency>

用例:

  ReactorQL.builder()
        .sql("select avg(this) total from test group by interval('1s') having total > 2") //按每秒分组,并计算流中数据平均值,如果平均值大于2则下游收到数据.
        .build()
        .start(Flux.range(0, 10).delayElements(Duration.ofMillis(500)))
        .doOnNext(System.out::println)
        .as(StepVerifier::create)
        .expectNextCount(4)
        .verifyComplete();

更多用法请看 单元测试

原理

  1. 解析SQL查询语句,生成SQL抽象语法树。

  2. 遍历SQL抽象语法树,使用策略模式,根据不同的语法类型,编译生成针对Flux的转换函数。

    • 条件使用(FilterFeature)进行创建。
    • 查询列(columnMapper)使用ValueMapFeature,ValueFlatMapFeature,ValueAggMapFeature进行创建。
    • 分组(groupBy)使用GroupFeature进行创建。
    • 数据源(from)使用FromFeature进行创建。
      • 表 ,从上下文中基于表名获取Flux数据流。
      • 子查询,基于子查询语句生成新的ReactorQL对象并执行获取数据源。
      • 函数,使用策略模式获取对应FromFeature进行创建。
    • 排序(orderBy)使用ValueMapFeature编译转换函数,基于转换函数执行结果进行排序。
    • 关联查询(join),支持多种join源。
      • join 表,从上下文中基于表名获取Flux数据流进行数据关联。
      • 子查询,基于子查询语句生成新的ReactorQL对象并执行获取数据源进行数据关联。
  3. 组合编译后各个片段对应的Flux转换函数。

    • 组合顺序: limit->offset->distinct->orderBy->columnMapper->groupBy->where->join->from
  4. 传入上下文执行。

    • 支持参数绑定。
    • 指定数据源获取函数,使用表名获取数据源。

拓展

当内置的特性不满足需求时,可通过自定义的方式进行拓展。

拓展了特性后,在启动时注册到元数据中。

import org.jetlinks.reactor.ql.ReactorQL;


public ReactorQL createQL(String sql) {
    return ReactorQL
            .builder()
            .sql(sql)
            //注册自定义的特性
            .feature(customFeature1, customFeature2)
            .build();
}

拓展转换函数

可通过拓展转换函数特性(ValueMapFeature)来自定义数据转换等操作, 如实现 select device.state(t.deviceId) from "/device/**" t

import org.jetlinks.reactor.ql.supports.map.FunctionMapFeature;

// FunctionMapFeature针对ValueMapFeature实现了基础操作
public class DeviceStateFunction extends FunctionMapFeature {
    public DeviceStateFunction() {
        super("device.state",
              1,//最大参数数量
              1,//最小参数数量
              flux -> flux 
                .collectList()//将响应式参数流中的数据收集为List
                .flatMap(args -> {
                    if (args.size() != 1) {
                        return Mono.empty();
                    }
                    String deviceId = String.valueOf(args.get(0));

                    return getDeviceState(deviceId);
                }));
    }
}

拓展数据源函数

可通过拓展数据源函数特性来自定义数据源, 如实现 select * from mysql(....)

例:

import net.sf.jsqlparser.statement.select.FromItem;
import org.jetlinks.reactor.ql.ReactorQLContext;
import org.jetlinks.reactor.ql.ReactorQLMetadata;
import org.jetlinks.reactor.ql.ReactorQLRecord;
import org.jetlinks.reactor.ql.feature.FeatureId;
import org.jetlinks.reactor.ql.feature.FromFeature;
import reactor.core.publisher.Flux;

import java.util.function.Function;

// select * from mysql(....);
public class MysqlFromFeature implements FromFeature {
   private static final String ID = FeatureId.From.of("mysql").getId();

   @Override
   public Function<ReactorQLContext, Flux<ReactorQLRecord>> createFromMapper(
           FromItem fromItem,
           ReactorQLMetadata metadata) {
      TableFunction from = ((TableFunction) fromItem);
      net.sf.jsqlparser.expression.Function function = from.getFunction();
      //函数参数列表
      ExpressionList list = function.getParameters();
      //别名
      String alias = from.getAlias() != null ? from.getAlias().getName() : null;

      Object params = prepareParameter(list);
      return ctx -> {
         return this
                 .execute0(params)
                 .map(val -> {
                    return ReactorQLRecord.newRecord(alias, val, ctx);
                 });
      };
   }

   public Flux<Object> execute0(Object args) {
      //执行真实逻辑,返回数据流。
   }
}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.