Giter Club home page Giter Club logo

trendrrnsqclient's Introduction

TrendrrNSQClient

A fast netty-based java client for NSQ initially developed and maintained by @dustismo for use in various places in the trendrr.tv and curatorr.com stacks.

I (@mreiferson) am currently maintaining this repo to facilitate bug fixes and minor updates and am interested in finding a longer-tem maintainer.

Consumer

Example usage:

NSQLookup lookup = new NSQLookupDynMapImpl();
lookup.addAddr("localhost", 4161);
NSQConsumer consumer = new NSQConsumer(lookup, "speedtest", "dustin", new NSQMessageCallback() {
            
    @Override
    public void message(NSQMessage message) {
        System.out.println("received: " + message);            
        //now mark the message as finished.
        message.finished();
        
        //or you could requeue it, which indicates a failure and puts it back on the queue.
        //message.requeue();
    }           
    @Override
    public void error(Exception x) {
        //handle errors
        log.warn("Caught", x);
    }
});
        
consumer.start();

Producer

Example usage:

NSQProducer producer = new NSQProducer().addAddress("localhost", 4150, 1);            
producer.start();
for (int i=0; i < 50000; i++) {
    producer.produce("speedtest", ("this is a message" + i).getBytes());
}

The producer also has a Batch collector that will collect messages until some threshold is reached (currently maxbytes or maxmessages) then send as a MPUB request. This gives much greater throughput then producing messages one at a time.

producer.configureBatch("speedtest", 
                new BatchCallback() {
                    @Override
                    public void batchSuccess(String topic, int num) {
                    }
                    @Override
                    public void batchError(Exception ex, String topic, List<byte[]> messages) {
                        ex.printStackTrace();   
                    }
                }, 
            batchsize, 
            null, //use default maxbytes 
            null //use default max seconds
        );

producer.start();
for (int i=0; i < iterations; i++) {
    producer.produceBatch("speedtest", ("this is a message" + i).getBytes());
}

Dependancies

Note: the trendrr-oss dependancy can easily be swapped out by implementing the com.trendrr.nsq.NSQLookup interface using a different json parser

trendrrnsqclient's People

Contributors

df-stripe avatar dustismo avatar gregspurrier avatar kulikov avatar mreiferson avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

trendrrnsqclient's Issues

Couldn't find a license

I was unable to find the license for TrendrrNSQCLient. Is this something others are allowed to use, and if so, what license applies?

Thanks,

Hen

maintainer

I impl java client based on this repo idea with latest netty, I thought I could maintain this repo and migrate our company impl to this. @mreiferson

future.awaitUninterruptibly().getChannel() hangs

Current client use io.netty.netty:3.10.6.Final version, In the com.trendrr.nsq.AbstractNSQClient source code, the code below will hangs which cause the future never to be notified. it seems that it's a bug in
netty with the 3.10.6.Final version, netty/netty@32c4479 ?

// Start the connection attempt.
ChannelFuture future = bootstrap.connect(new InetSocketAddress(address, port));

// Wait until the connection attempt succeeds or fails.
Channel channel = future.awaitUninterruptibly().getChannel();
"Timer-1" #45 prio=5 os_prio=0 tid=0x00007f0ce9825800 nid=0x64fd in Object.wait() [0x00007f0c910ad000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	at java.lang.Object.wait(Object.java:502)
	at org.jboss.netty.channel.DefaultChannelFuture.awaitUninterruptibly(DefaultChannelFuture.java:237)
	- locked <0x00000000e5827060> (a org.jboss.netty.channel.DefaultChannelFuture)
	at com.trendrr.nsq.AbstractNSQClient.createConnection(AbstractNSQClient.java:134)
	at com.trendrr.nsq.AbstractNSQClient.connect(AbstractNSQClient.java:187)
	- locked <0x00000000e0fb89a8> (a com.trendrr.nsq.NSQProducer)
	at com.trendrr.nsq.AbstractNSQClient$1.run(AbstractNSQClient.java:73)
	at java.util.TimerThread.mainLoop(Timer.java:555)
	at java.util.TimerThread.run(Timer.java:505)

   Locked ownable synchronizers:
	- None

Number of messages per batch (enhancement)

I ran a test using this API and I saw that there were many messages in flight and the number was always smaller or equal than the ready count, which is set up (hardcoded) to 200. So, I am thinking that this default number slows down the nsqd for pushing messages, right?

Can this parameter be set up when the Consumer is instantiated, or a new address is added to lookup? It will be better to set a smaller one if we want close to realtime data.

Thanks!

mesage.requeue() not following the NSQ protocol spec

I think the protocol requires that on a re-queue the API should send also a TimeOut inside the message besides the message ID.

REQ (http://bitly.github.io/nsq/clients/tcp_protocol_spec.html)
Re-queue a message (indicate failure to process)

REQ <message_id> \n

<message_id> - message id as 16-byte hex string
- a string representation of integer N where N <= configured max timeout
0 is a special case that will not defer re-queueing

In the code there is no such thing:
public void requeue() {
try {
this.connection.command(NSQCommand.instance("REQ " + new String(id, "ascii")));
} catch (UnsupportedEncodingException e) {
log.error("Caught", e);
}
}

Handle error 500 when topic is not available

Is there a way to implement a retry feature to handle the scenario when the topic is still not available? So the client can retry every second for the topic availabitily. Right now the client hangs with the topic has not been created yet.

Deadlock when closing producer

I'm encountering deadlocks when calling NSQProducer's close method. Here is the report from the JVM thread dump:

Found one Java-level deadlock:
=============================
"New I/O worker #1":
  waiting to lock monitor 0x00007fbbbc103458 (object 0x00000007e0008980, a com.trendrr.nsq.Connections),
  which is held by "main"
"main":
  waiting to lock monitor 0x00007fbbbc1033a8 (object 0x00000007e0008948, a com.trendrr.nsq.NSQProducer),
  which is held by "New I/O worker #1"

Java stack information for the threads listed above:
===================================================
"New I/O worker #1":
        at com.trendrr.nsq.Connections.remove(Connections.java:95)
        - waiting to lock <0x00000007e0008980> (a com.trendrr.nsq.Connections)
        at com.trendrr.nsq.AbstractNSQClient._disconnected(AbstractNSQClient.java:213)
        - locked <0x00000007e0008948> (a com.trendrr.nsq.NSQProducer)
        at com.trendrr.nsq.Connection._disconnected(Connection.java:151)
        at com.trendrr.nsq.netty.NSQHandler.channelDisconnected(NSQHandler.java:50)
        at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:102)
        at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:560)
        at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:787)
        at org.jboss.netty.handler.codec.replay.ReplayingDecoder.cleanup(ReplayingDecoder.java:570)
        at org.jboss.netty.handler.codec.frame.FrameDecoder.channelDisconnected(FrameDecoder.java:365)
        at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:102)
        at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:560)
        at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:555)
        at org.jboss.netty.channel.Channels.fireChannelDisconnected(Channels.java:396)
        at org.jboss.netty.channel.Channels$4.run(Channels.java:386)
        at org.jboss.netty.channel.socket.ChannelRunnableWrapper.run(ChannelRunnableWrapper.java:40)
        at org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:367)
        at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:291)
        at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:88)
        at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
        at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
"main":
        at com.trendrr.nsq.AbstractNSQClient._disconnected(AbstractNSQClient.java:212)
        - waiting to lock <0x00000007e0008948> (a com.trendrr.nsq.NSQProducer)
        at com.trendrr.nsq.Connection._disconnected(Connection.java:151)
        at com.trendrr.nsq.Connection.close(Connection.java:190)
        at com.trendrr.nsq.Connections.close(Connections.java:121)
        - locked <0x00000007e0008980> (a com.trendrr.nsq.Connections)
        at com.trendrr.nsq.AbstractNSQClient.close(AbstractNSQClient.java:218)
        at com.trendrr.nsq.NSQProducer.close(NSQProducer.java:230)
        at workbench.to_nsq$_main.doInvoke(to_nsq.clj:32)
        at clojure.lang.RestFn.invoke(RestFn.java:421)
        at clojure.lang.Var.invoke(Var.java:383)
        at user$eval3.invoke(form-init1476674585784519325.clj:1)
        at clojure.lang.Compiler.eval(Compiler.java:6703)
        at clojure.lang.Compiler.eval(Compiler.java:6693)
        at clojure.lang.Compiler.load(Compiler.java:7130)
        at clojure.lang.Compiler.loadFile(Compiler.java:7086)
        at clojure.main$load_script.invoke(main.clj:274)
        at clojure.main$init_opt.invoke(main.clj:279)
        at clojure.main$initialize.invoke(main.clj:307)
        at clojure.main$null_opt.invoke(main.clj:342)
        at clojure.main$main.doInvoke(main.clj:420)
        at clojure.lang.RestFn.invoke(RestFn.java:421)
        at clojure.lang.Var.invoke(Var.java:383)
        at clojure.lang.AFn.applyToHelper(AFn.java:156)
        at clojure.lang.Var.applyTo(Var.java:700)
        at clojure.main.main(main.java:37)

Found 1 deadlock.

A possible solution is to remove the call to this._disconnected() in Connection.close(). _disconnected() will be invoked later from the NIO worker thread by NSQHandler.channelDisconnected() when the socket closes. This breaks the deadlock for me, but I'm not sure whether this change will have any unintended consequences.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.