sunsetyan / stream-compute Goto Github PK
View Code? Open in Web Editor NEWThis project forked from yoruichi/stream-compute
This project forked from yoruichi/stream-compute
多线程流式计算模型 http://wiki.tenddata.com/pages/viewpage.action?pageId=7143677 现在的Count引擎使用的一个框架。非常轻量级。全部使用了JDK自带的线程池和队列做实现。使用起来也相当方便。 一、QuickStart 1. 2.三大基础类 2.1 EmitItem : 发布和处理的KV对的封装 2.2 ModeHandler : 执行业务逻辑的单元 已改名为Bolt 2.3 FirstModeHandler : 读取文件/队列/存储等用于生产第一批EmitItem的单元 已改名为Spout 3.拓扑类 3.1 Topology包含了一系列的ModeHandler和一种FirstModeHandler的实现。 3.2 可以按照实际的业务需求来任意组合ModeHandler的执行顺序,但是不能动态调整,每次调整都需要重新编译 3.3 拓扑的启动可以直接main方法启动,也可以放置于容器内部署启动 二、SimpleExample 1.利用map统计单词出现的次数 public class CountModeHandler extends ModeHandler { ... @Override public int execute() { //业务逻辑实现方法 EmitItem item = null; int num = 0; String word = null; Map<String, Integer> map = new HashMap<String, Integer>(); while ((item = getReadMessageQueue().poll()) != null) {//从前置队列中获取item try { word = (String) item.getMessage(0);//从item中得到要统计的word if (map.containsKey(word)) map.put(word, map.get(word) + 1);//利用map来统计word出现的次数 else map.put(word, 1); num++; } catch (Exception e) { e.printStackTrace(); } } if (num > 0) { emit(0, map);//将统计结果存放的map继续发射到下一个处理单元 } return num; } ... } 2.将不同统计Mode的map结果合并成一个map public class MergeModeHandler extends ModeHandler { ... @Override public int execute() { EmitItem item = null; int num = 0; while ((item = getReadMessageQueue().poll()) != null) { try { Map<String, Integer> m = (Map<String, Integer>) item.getMessage(0); for (String key : m.keySet()) { if (map.containsKey(key)) map.put(key, map.get(key) + m.get(key)); else map.put(key, m.get(key)); } } catch (Exception e) { e.printStackTrace(); } num++; } return num; } ... } 3.模拟读入文章并将单词发射出去的FirstModeHandler public class TestFirstModeHandler extends FirstModeHandler { public String[][] message = new String[][] { { "one", "apple", "a", "day", "doctor", "keeps", "away" }, { "when", "a", "man", "loves", "a", "woman" }, { "what", "doesn't", "kill", "you", "makes", "you", "stronger" } }; @Override public int execute() { int index = 0; for (int i = 0; i < message.length; i++) for (int j = 0; j < message[i].length; j++) { emit(index, message[i][j]); index++; } return index; } ... } 4.将各个单元组装成拓扑结构并运行 ... public static void main(String[] args) { TestTopology tt = new TestTopology(); TestFirstModeHandler h1 = new TestFirstModeHandler(); CountModeHandler h2 = new CountModeHandler(); MergeModeHandler h3 = new MergeModeHandler(); tt.prepare();//拓扑的准备阶段,可以初始化一些参数 h3.prepare(tt.map);//ModeHandler的准备阶段,同样可以初始化参数 tt.setFirstModeHandler(h1, 1).setModeHandler(h2, 4).setModeHandler(h3, 2); //拓扑设置各个单元的数量和顺序,先set哪个ModeHandler,哪个就先被执行。 tt.start();//启动 System.out.println("The last result is : " + tt.map);//输入统计单词的结果 tt.shutdown();//停止 } ... 5.TestTopology只简单的初始化一个map用来存放最终统计的结果就可以了 public class TestTopology extends NewTopology { public Map<String, Integer> map; @Override public void prepare() { map = new ConcurrentHashMap<String, Integer>(); }; } 三、TODOLIST · 分流/整流/订阅 三种拓扑结构的实现 · 监控的一些整合
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.