Giter Club home page Giter Club logo

storm-jms's Introduction

About Storm JMS

Storm JMS is a generic framework for integrating JMS messaging within the Storm framework.

The Storm Rationale page explains what storm is and why it was built.

Storm-JMS allows you to inject data into Storm via a generic JMS spout, as well as consume data from Storm via a generic JMS bolt.

Both the JMS Spout and JMS Bolt are data agnostic. To use them, you provide a simple Java class that bridges the JMS and Storm APIs and encapsulates and domain-specific logic.

Components

JMS Spout

The JMS Spout component allows for data published to a JMS topic or queue to be consumed by a Storm topology.

A JMS Spout connects to a JMS Destination (topic or queue), and emits Storm "Tuple" objects based on the content of the JMS message received.

JMS Bolt

The JMS Bolt component allows for data within a Storm topology to be published to a JMS destination (topic or queue).

A JMS Bolt connects to a JMS Destination, and publishes JMS Messages based on the Storm "Tuple" objects it receives.

Project Location

Primary development of storm-jms will take place at: https://github.com/ptgoetz/storm-jms

Maven artifacts for releases will be available on maven central.

Documentation

Documentation and tutorials can be found on the Storm-JMS wiki.

License

Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Contributors

storm-jms's People

Contributors

andytoone avatar boneill42 avatar parth-brahmbhatt avatar pcodding avatar ptgoetz avatar tylerbenson avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

storm-jms's Issues

examples can't succes submit to cluster

as title descrption , when submit with the jar "storm-jms-examples-0.1-SNAPSHOT-jar-with-dependencies.jar" , throw exception : "Exception in thread "main" org.springframework.beans.factory.parsing.BeanDefinitionParsingException: Configuration problem: Unable to locate Spring NamespaceHandler for XML schema namespace [http://activemq.apache.org/schema/core]
Offending resource: class path resource [jms-activemq.xml]"

this exception because of the content in activemq-core-5.4.0.jar/META-INF/spring-handlers file was override

consider use "maven-shade-plugin" replace "maven-assembly-plugin"

JMS Session instance should not be used from multiple threads

In JmsSpout.java javax.jms.Session instance is used by multiple threads what is errornous according to Javadoc:

Once a connection has been started, any session with one or more registered message listeners is dedicated to the thread of control that delivers messages to it. It is erroneous for client code to use this session or any of its constituent objects from another thread of control. The only exception to this rule is the use of the session or message consumer close method.

The close method is the only session method that can be called while some other session method is being executed in another thread.

In JmsSpout.java message listener is registered (line 179), so the Session instance is dedicated to the thread delivering messages. At the same time scheduled executor periodically runs RecoveryTask calling recover() method on the same Session instance in new thread (line 356).

According to Javadoc this is erroneous usage of Session.

Ack is called even in case of AUTO_ACKNOWLEDGE

Hello,
Even in my code I set
queueSpout.setJmsAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
I can see in my logs the following WARNING
b.s.c.j.s.JmsSpout [WARN] Couldn't acknowledge unknown JMS message ID .....

In the JmsSpout class , I can see that "ack" should not be called in case of AUTO_ACKNOWLEDGE.

So what is the problem ?
Tks a lot
Nicolas

Release request

Hi there,

Is it possible to make an official release of storm-jms targeting storm 0.9.5?

My organisation requires that artifacts are built and hosted externally, as opposed to us making our own builds.

Thanks in advance!

Neil Grant

Incorrect handling of topology.message.timeout.secs

Attempting to set the topology.message.timeout.secs config value results in a class cast exception:

java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.Integer
    at backtype.storm.contrib.jms.spout.JmsSpout.open(JmsSpout.java:160) ~[stormjar.jar:?]

(Note: this stack trace was produced from the 0.9.0 release, but it looks like the current code has the same issue.)

It looks like the typical method in other parts of Storm is to null-check the value, then cast to Number and call intValue on the result.

See also: https://groups.google.com/forum/#!topic/storm-user/SX2gEBfNEFg

JMS Spout ack() and fail() implementation

Hi,

The JMS Spout implements the ack() and fail() mechanisms by JMS message acknowledgment and JMS session recovery. One very important fact that is usually misunderstood in JMS is that the JMS message.acknowledge() call acknowledges the whole JMS session up to that moment, not just that particular message.

http://docs.oracle.com/javaee/5/api/javax/jms/Message.html#acknowledge%28%29
void acknowledge()
throws JMSException
Acknowledges all consumed messages of the session of this consumed message.

So, if you have received 2 JMS messages, msg1 and msg2 (in that order), msg2 is successfully processed, msg1 has failed and the acknowledgment for the msg2 is received first, msg1 won’t be redelivered.

Probably the message reply mechanism in the JMS case shall use some intermediate (semi) persistent storage (Hazelcast map for example).

open connections

I'm having an issue with open connections.
Using AMQ 5.11 and Storm 9.5. I see a never ending number of open connections (sudo ss -s). The thing keeps growing until AMQ or Storm exhaust the machine resources.
The session mode is AUTO_ACKNOWLEDGE and I´m producing +- 5k messages per minute. I have 3 topologies and equal number of spout instances (distributed = true). I only have a producer instance at all times.
I've checked JMX info from AMQ and the reported number of consumers is 3, as expected. I do have more sessionIds them I would expect (+-20).

After reading the code thoroughly, it doesn't seem related to the spout at all, but I'm struggling with this one...
Anybody experienced something like this with @ptgoetz storm-jms?
Thanks

Ack logic in JMS Spout for batch

The ack logic in JMS Spout says :

    Message msg = this.pendingMessages.remove(msgId);
    JmsMessageID oldest = this.toCommit.first();
    if(msgId.equals(oldest)) { ... }

Is this "if" check for Batch acknowledgment mode as it does for a batch only .

JMSSpout open() is throwing classCastException

Getting a class cast on
Integer topologyTimeout = (Integer)conf.get("topology.message.timeout.secs");
in JmsSpout.java

This should be similar to;
Integer topologyTimeout = Integer.parseInt(String.valueOf(conf.get("topology.message.timeout.secs")));

duplicated emitted messages after a fail recover

Hi,

Reading the documentation of the method session.recover(), I think the recovery process after a failure causes messages inside within the topology (after spout), are discarded by jms queue and re-send to spout. This causes duplicate messages to be processed.

Maybe an explicit emit after fail event can solve this problem (with some kind of retry policy)

I open this issue to discuss a possible solution.

Thanks in advance!

mvn clean install ERROR

[ERROR] Failed to execute goal on project storm-jms: Could not resolve dependencies for project com.github.ptgoetz:storm-jms:jar:0.8.2-SNAPSHOT: Could not find artifact storm:storm:jar:0.8.1 in central (http://repo1.maven.org/maven2) -> [Help 1]

Storm JMS spout Reconnection Problem.

Hi All,

I am using Storm JMS Spout to connect to TibcoEMS and it works great.

We are having one problem with making the storm spout to reconnect when there is a failover in TIBCO EMS Server.

Is this possible in Storm to automatically reconnect to jmsURL?

How can we configure that in JMS spout Application Context ????

Getting following error when pushing the topology to storm

Exception in thread "main" org.springframework.beans.factory.parsing.BeanDefinitionParsingException: Configuration problem: Unable to locate Spring NamespaceHandler for XML schema namespace [http://activemq.apache.org/schema/core]
Offending resource: class path resource [jms-activemq.xml]

Getting java.util.NoSuchElementException sporadically!

I am getting the following error while calling ack() on OutputCollector.

Can anybody help me to find a cause of this exception?

2016-10-07 08:46:46.856 [Thread-19-JMS_QUEUE_SPOUT] WARN  Message failed: org.apache.storm.jms.spout.JmsMessageID@77eb
2016-10-07 08:46:46.860 [Thread-19-JMS_QUEUE_SPOUT] ERROR  Async loop died!
java.lang.RuntimeException: java.util.NoSuchElementException
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:87) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.disruptor$consume_batch.invoke(disruptor.clj:76) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.daemon.executor$fn__4654$fn__4669$fn__4698.invoke(executor.clj:542) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.util$async_loop$fn__458.invoke(util.clj:463) ~[storm-core-0.9.4.jar:0.9.4]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:619) [na:1.6.0_21]
Caused by: java.util.NoSuchElementException: null
        at java.util.TreeMap.key(TreeMap.java:1206) ~[na:1.6.0_21]
        at java.util.TreeMap.firstKey(TreeMap.java:267) ~[na:1.6.0_21]
        at java.util.TreeSet.first(TreeSet.java:377) ~[na:1.6.0_21]
        at org.apache.storm.jms.spout.JmsSpout.ack(JmsSpout.java:243) ~[stormjar.jar:3.7]
        at backtype.storm.daemon.executor$ack_spout_msg.invoke(executor.clj:384) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.daemon.executor$fn__4654$tuple_action_fn__4660.invoke(executor.clj:446) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.daemon.executor$mk_task_receiver$fn__4645.invoke(executor.clj:401) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.disruptor$clojure_handler$reify__1446.onEvent(disruptor.clj:58) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.4.jar:0.9.4]
        ... 6 common frames omitted
2016-10-07 08:46:46.860 [Thread-19-JMS_QUEUE_SPOUT] ERROR
java.lang.RuntimeException: java.util.NoSuchElementException
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:87) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.disruptor$consume_batch.invoke(disruptor.clj:76) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.daemon.executor$fn__4654$fn__4669$fn__4698.invoke(executor.clj:542) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.util$async_loop$fn__458.invoke(util.clj:463) ~[storm-core-0.9.4.jar:0.9.4]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]

I am setting up my topology with a JmsSpout as it is mentioned in the example. I am connecting to an activeMQ via this spout. Everything works fine but sporadically I find the above stack trace in my logs and one of the spout instances stops dequeuing the messages from activeMQ. When I re-submit my topology, everything goes back to normal again.

JmsProvider jmsQueueProvider = new SpringJmsProvider(
            "file:" + options.activemqConfFile,
            Constants.JMS_CONNECTION_FACTORY,
            options.queueName
        );
        JmsTupleProducer producer = new JsonTupleProducer();
        // JMS Queue Spout
        JmsSpout queueSpout = new JmsSpout();
        queueSpout.setJmsProvider(jmsQueueProvider);
        queueSpout.setJmsTupleProducer(producer);
        queueSpout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        queueSpout.setDistributed(true); // allow multiple instances
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout(Constants.JMS_QUEUE_SPOUT, queueSpout, 5);

What could be the problem? How do I debug this?

P.S.: I am not setting a recoveringPeriod and hence it is defaulted to -1(if this is of any relevance).

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.